Remove piper

This commit is contained in:
Stjepan Glavina 2020-06-21 18:46:49 +02:00
parent 0e93b92744
commit 9c14bfb127
2 changed files with 20 additions and 20 deletions

View File

@ -32,7 +32,6 @@ fastrand = "1.1.0"
futures-io = { version = "0.3.5", default-features = false, features = ["std"] } futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] } futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
once_cell = "1.3.1" once_cell = "1.3.1"
piper = "0.1.2"
scoped-tls = "1.0.0" scoped-tls = "1.0.0"
slab = "0.4.2" slab = "0.4.2"
socket2 = { version = "0.3.12", features = ["pair", "unix"] } socket2 = { version = "0.3.12", features = ["pair", "unix"] }
@ -50,10 +49,11 @@ libc = "0.2.70"
wepoll-sys-stjepang = "1.0.0" wepoll-sys-stjepang = "1.0.0"
[dev-dependencies] [dev-dependencies]
criterion = "0.3"
futures = { version = "0.3.5", default-features = false, features = ["std"] } futures = { version = "0.3.5", default-features = false, features = ["std"] }
num_cpus = "1.13.0" num_cpus = "1.13.0"
piper = "0.1.2"
tempfile = "3.1.0" tempfile = "3.1.0"
criterion = "0.3"
[workspace] [workspace]
members = [ members = [

View File

@ -26,7 +26,7 @@ use std::os::unix::io::RawFd;
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::io::{FromRawSocket, RawSocket}; use std::os::windows::io::{FromRawSocket, RawSocket};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker}; use std::task::{Poll, Waker};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -54,17 +54,17 @@ pub(crate) struct Reactor {
ticker: AtomicUsize, ticker: AtomicUsize,
/// Registered sources. /// Registered sources.
sources: piper::Mutex<Slab<Arc<Source>>>, sources: Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor. /// Temporary storage for I/O events when polling the reactor.
events: piper::Mutex<sys::Events>, events: Mutex<sys::Events>,
/// An ordered map of registered timers. /// An ordered map of registered timers.
/// ///
/// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
/// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
/// timer. /// timer.
timers: piper::Mutex<BTreeMap<(Instant, usize), Waker>>, timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
/// A queue of timer operations (insert and remove). /// A queue of timer operations (insert and remove).
/// ///
@ -79,9 +79,9 @@ impl Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor { static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"), sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0), ticker: AtomicUsize::new(0),
sources: piper::Mutex::new(Slab::new()), sources: Mutex::new(Slab::new()),
events: piper::Mutex::new(sys::Events::new()), events: Mutex::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()), timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000), timer_ops: ConcurrentQueue::bounded(1000),
}); });
&REACTOR &REACTOR
@ -98,7 +98,7 @@ impl Reactor {
#[cfg(unix)] raw: RawFd, #[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket, #[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> { ) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock(); let mut sources = self.sources.lock().unwrap();
let vacant = sources.vacant_entry(); let vacant = sources.vacant_entry();
// Put the I/O handle in non-blocking mode. // Put the I/O handle in non-blocking mode.
@ -121,7 +121,7 @@ impl Reactor {
let source = Arc::new(Source { let source = Arc::new(Source {
raw, raw,
key, key,
wakers: piper::Mutex::new(Wakers { wakers: Mutex::new(Wakers {
tick_readable: 0, tick_readable: 0,
tick_writable: 0, tick_writable: 0,
readers: Vec::new(), readers: Vec::new(),
@ -133,7 +133,7 @@ impl Reactor {
/// Deregisters an I/O source from the reactor. /// Deregisters an I/O source from the reactor.
pub fn remove_io(&self, source: &Source) -> io::Result<()> { pub fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock(); let mut sources = self.sources.lock().unwrap();
sources.remove(source.key); sources.remove(source.key);
self.sys.deregister(source.raw) self.sys.deregister(source.raw)
} }
@ -173,7 +173,7 @@ impl Reactor {
/// Attempts to lock the reactor. /// Attempts to lock the reactor.
pub fn try_lock(&self) -> Option<ReactorLock<'_>> { pub fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().map(|events| { self.events.try_lock().ok().map(|events| {
let reactor = self; let reactor = self;
ReactorLock { reactor, events } ReactorLock { reactor, events }
}) })
@ -183,7 +183,7 @@ impl Reactor {
/// ///
/// Returns the duration until the next timer before this method was called. /// Returns the duration until the next timer before this method was called.
fn fire_timers(&self) -> Option<Duration> { fn fire_timers(&self) -> Option<Duration> {
let mut timers = self.timers.lock(); let mut timers = self.timers.lock().unwrap();
// Process timer operations, but no more than the queue capacity because otherwise we could // Process timer operations, but no more than the queue capacity because otherwise we could
// keep popping operations forever. // keep popping operations forever.
@ -232,7 +232,7 @@ impl Reactor {
/// A lock on the reactor. /// A lock on the reactor.
pub(crate) struct ReactorLock<'a> { pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor, reactor: &'a Reactor,
events: piper::MutexGuard<'a, sys::Events>, events: MutexGuard<'a, sys::Events>,
} }
impl ReactorLock<'_> { impl ReactorLock<'_> {
@ -269,13 +269,13 @@ impl ReactorLock<'_> {
// At least one I/O event occurred. // At least one I/O event occurred.
Ok(_) => { Ok(_) => {
// Iterate over sources in the event list. // Iterate over sources in the event list.
let sources = self.reactor.sources.lock(); let sources = self.reactor.sources.lock().unwrap();
let mut ready = Vec::new(); let mut ready = Vec::new();
for ev in self.events.iter() { for ev in self.events.iter() {
// Check if there is a source in the table with this key. // Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) { if let Some(source) = sources.get(ev.key) {
let mut wakers = source.wakers.lock(); let mut wakers = source.wakers.lock().unwrap();
// Wake readers if a readability event was emitted. // Wake readers if a readability event was emitted.
if ev.readable { if ev.readable {
@ -345,7 +345,7 @@ pub(crate) struct Source {
key: usize, key: usize,
/// Tasks interested in events on this source. /// Tasks interested in events on this source.
wakers: piper::Mutex<Wakers>, wakers: Mutex<Wakers>,
} }
/// Tasks interested in events on a source. /// Tasks interested in events on a source.
@ -370,7 +370,7 @@ impl Source {
let mut ticks = None; let mut ticks = None;
future::poll_fn(|cx| { future::poll_fn(|cx| {
let mut wakers = self.wakers.lock(); let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a readability event. // Check if the reactor has delivered a readability event.
if let Some((a, b)) = ticks { if let Some((a, b)) = ticks {
@ -414,7 +414,7 @@ impl Source {
let mut ticks = None; let mut ticks = None;
future::poll_fn(|cx| { future::poll_fn(|cx| {
let mut wakers = self.wakers.lock(); let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a writability event. // Check if the reactor has delivered a writability event.
if let Some((a, b)) = ticks { if let Some((a, b)) = ticks {