diff --git a/.gitignore b/.gitignore index 88ef933..2c96eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ target/ Cargo.lock -socket diff --git a/Cargo.toml b/Cargo.toml index e515964..d60a6e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ async-task = "3.0.0" crossbeam = "0.7.3" futures = { version = "0.3.4", default-features = false, features = ["std"] } once_cell = "1.3.1" -piper = { path = "piper" } +piper = { git = "https://github.com/stjepang/piper.git" } scoped-tls-hkt = "0.1.2" slab = "0.4.2" socket2 = { version = "0.3.12", features = ["pair", "unix"] } diff --git a/README.md b/README.md index 4d90fd3..2800b37 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,26 @@ # smol -A small and fast async runtime. +A small and fast async runtime in Rust. https://discord.gg/5RxMVnr -## Goals +### Goals -* Small - Around 1500 lines of code. -* Fast - On par with async-std and Tokio. -* Safe - Written in 100% safe Rust. -* Complete - Fully featured and ready for production. -* Documented - Simple code, easy to understand and modify. -* Lightweight - Small dependencies, relies on epoll/kqueue/wepoll. +* Small - 1500 lines of code. +* Simple - Well-documented, easy to maintain. +* Fast - Based on [async-task], [crossbeam], and [piper]. +* Safe - Just 1 line of unsafe code for Windows support. * Portable - Linux, Android, macOS, iOS, Windows, FreeBSD, OpenBSD, NetBSD, DragonFly BSD. -## Features +### Features -* Executor - Configurable threads, work stealing, supports non-send futures. -* Blocking - Thread pool for isolating blocking code. -* Networking - TCP, UDP, Unix domain sockets, and custom files/sockets. -* Process - Spawns child processes and interacts with their I/O. -* Files - Filesystem manipulation operations. -* Stdio - Asynchronous stdin, stdout, and stderr. +* Async I/O - TCP, UDP, Unix domain sockets, and custom FDs/sockets. +* Thread-local executor - For non-send futures. +* Work-stealing executor - adapts to uneven workload. +* Blocking executor - For files, processes, and standard I/O. * Timer - Efficient userspace timers. -# Documentation +### Documentation ``` cargo doc --document-private-items --no-deps --open diff --git a/examples/async-h1-server.rs b/examples/async-h1-server.rs index fed35c3..77e3a1d 100644 --- a/examples/async-h1-server.rs +++ b/examples/async-h1-server.rs @@ -20,7 +20,7 @@ use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor}; use futures::prelude::*; use http_types::{Request, Response, StatusCode}; -use piper::{Lock, Shared}; +use piper::{Arc, Mutex}; use smol::{Async, Task}; /// Serves a request and returns a response. @@ -50,13 +50,13 @@ async fn listen(listener: Async, tls: Option) -> Resul // Spawn a background task serving this connection. let task = match &tls { None => { - let stream = Shared::new(stream); + let stream = Arc::new(stream); Task::spawn(async move { async_h1::accept(&host, stream, serve).await }) } Some(tls) => { // In case of HTTPS, establish a secure TLS connection first. let stream = tls.accept(stream).await?; - let stream = Shared::new(Lock::new(stream)); + let stream = Arc::new(Mutex::new(stream)); Task::spawn(async move { async_h1::accept(&host, stream, serve).await }) } }; diff --git a/examples/chat-client.rs b/examples/chat-client.rs index 7e227b5..c5edfff 100644 --- a/examples/chat-client.rs +++ b/examples/chat-client.rs @@ -19,6 +19,42 @@ use futures::prelude::*; use smol::Async; fn main() -> io::Result<()> { + for _ in 0..1 { + std::thread::spawn(|| smol::run(future::pending::<()>())); + } + + smol::Task::spawn(async { + // Connect to the server and create async stdin and stdout. + let stream = Async::::connect("127.0.0.1:6000").await.unwrap(); + let stdin = smol::reader(std::io::stdin()); + let mut stdout = smol::writer(std::io::stdout()); + + // Intro messages. + println!("Connected to {}", stream.get_ref().peer_addr().unwrap()); + println!("My nickname: {}", stream.get_ref().local_addr().unwrap()); + println!("Type a message and hit enter!\n"); + + // References to `Async` also implement `AsyncRead` and `AsyncWrite`. + let stream_r = &stream; + let mut stream_w = &stream; + + smol::Task::local(async { + let mut cnt = 0; + loop { + cnt += 1; + eprintln!("BEFORE TIMER {}", cnt); + smol::Timer::after(std::time::Duration::from_millis(20)).await; + eprintln!("AFTER TIMER"); + } + + }).detach(); + // Wait until the standard input is closed or the connection is closed. + futures::select! { + _ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"), + _ = io::copy(stream_r, &mut stdout).fuse() => println!("Server disconnected!"), + } + }).detach(); + smol::run(async { // Connect to the server and create async stdin and stdout. let stream = Async::::connect("127.0.0.1:6000").await?; @@ -34,6 +70,16 @@ fn main() -> io::Result<()> { let stream_r = &stream; let mut stream_w = &stream; + smol::Task::local(async { + let mut cnt = 0; + loop { + cnt += 1; + eprintln!("BEFORE TIMER {}", cnt); + smol::Timer::after(std::time::Duration::from_millis(20)).await; + eprintln!("AFTER TIMER"); + } + + }).detach(); // Wait until the standard input is closed or the connection is closed. futures::select! { _ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"), diff --git a/examples/chat-server.rs b/examples/chat-server.rs index 7420d0b..6a78943 100644 --- a/examples/chat-server.rs +++ b/examples/chat-server.rs @@ -17,10 +17,10 @@ use std::net::{SocketAddr, TcpListener, TcpStream}; use futures::io::{self, BufReader}; use futures::prelude::*; -use piper::{Receiver, Sender, Shared}; +use piper::{Receiver, Sender, Arc}; use smol::{Async, Task}; -type Client = Shared>; +type Client = Arc>; enum Event { Join(SocketAddr, Client), @@ -53,7 +53,8 @@ async fn dispatch(receiver: Receiver) -> io::Result<()> { // Send the event to all active clients. for stream in map.values_mut() { - stream.write_all(output.as_bytes()).await?; + // Ignore errors because the client might disconnect at any point. + let _ = stream.write_all(output.as_bytes()).await; } } Ok(()) @@ -87,7 +88,7 @@ fn main() -> io::Result<()> { loop { // Accept the next connection. let (stream, addr) = listener.accept().await?; - let client = Shared::new(stream); + let client = Arc::new(stream); let sender = sender.clone(); // Spawn a background task reading messages from the client. diff --git a/examples/compat-async-std.rs b/examples/compat-async-std.rs deleted file mode 100644 index 16e4933..0000000 --- a/examples/compat-async-std.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::time::{Duration, Instant}; - -use async_std::task::sleep; - -fn main() { - smol::run(async { - let start = Instant::now(); - println!("Sleeping..."); - sleep(Duration::from_secs(1)).await; - println!("Woke up after {:?}", start.elapsed()); - }) -} diff --git a/examples/compat-reqwest.rs b/examples/compat-reqwest.rs deleted file mode 100644 index 5be57c8..0000000 --- a/examples/compat-reqwest.rs +++ /dev/null @@ -1,11 +0,0 @@ -use anyhow::Result; - -fn main() -> Result<()> { - smol::run(async { - let resp = reqwest::get("https://www.rust-lang.org").await?; - let body = resp.text().await?; - - println!("{:?}", body); - Ok(()) - }) -} diff --git a/examples/compat-surf.rs b/examples/compat-surf.rs deleted file mode 100644 index 3a77413..0000000 --- a/examples/compat-surf.rs +++ /dev/null @@ -1,7 +0,0 @@ -fn main() -> http_types::Result<()> { - smol::run(async { - let body = surf::get("https://www.rust-lang.org").recv_string().await?; - println!("{:?}", body); - Ok(()) - }) -} diff --git a/examples/compat-tokio.rs b/examples/compat-tokio.rs deleted file mode 100644 index 942c482..0000000 --- a/examples/compat-tokio.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::time::{Duration, Instant}; - -use tokio::time::delay_for; - -fn main() { - smol::run(async { - let start = Instant::now(); - println!("Sleeping..."); - delay_for(Duration::from_secs(1)).await; - println!("Woke up after {:?}", start.elapsed()); - }) -} diff --git a/examples/ctrl-c.rs b/examples/ctrl-c.rs index 95c12a2..4ec94d6 100644 --- a/examples/ctrl-c.rs +++ b/examples/ctrl-c.rs @@ -1,3 +1,4 @@ +// TODO: document // Uses the `ctrlc` crate to set a handler that sends a message // through an async channel. diff --git a/examples/linux-inotify.rs b/examples/linux-inotify.rs index f244ce8..cb4a90e 100644 --- a/examples/linux-inotify.rs +++ b/examples/linux-inotify.rs @@ -1,3 +1,4 @@ +// TODO: document #[cfg(target_os = "linux")] fn main() -> std::io::Result<()> { use std::ffi::OsString; diff --git a/examples/linux-timerfd.rs b/examples/linux-timerfd.rs index d6a9fab..387bb9e 100644 --- a/examples/linux-timerfd.rs +++ b/examples/linux-timerfd.rs @@ -1,3 +1,4 @@ +// TODO: document #[cfg(target_os = "linux")] fn main() -> std::io::Result<()> { use std::io; diff --git a/examples/other-runtimes.rs b/examples/other-runtimes.rs new file mode 100644 index 0000000..14ca8cc --- /dev/null +++ b/examples/other-runtimes.rs @@ -0,0 +1,34 @@ +// TODO: document +use std::time::{Duration, Instant}; + +use anyhow::{Error, Result}; + +fn main() -> Result<()> { + smol::run(async { + // Sleep using async-std. + let start = Instant::now(); + println!("Sleeping using async-std..."); + async_std::task::sleep(Duration::from_secs(1)).await; + println!("Woke up after {:?}", start.elapsed()); + + // Sleep using tokio. + let start = Instant::now(); + println!("Sleeping using tokio..."); + tokio::time::delay_for(Duration::from_secs(1)).await; + println!("Woke up after {:?}", start.elapsed()); + + // Make a GET request using surf. + let body = surf::get("https://www.rust-lang.org") + .recv_string() + .await + .map_err(Error::msg)?; + println!("Body from surf: {:?}", body); + + // Make a GET request using reqwest. + let resp = reqwest::get("https://www.rust-lang.org").await?; + let body = resp.text().await?; + println!("Body from reqwest: {:?}", body); + + Ok(()) + }) +} diff --git a/examples/process-output.rs b/examples/process-output.rs index acb37eb..895a33c 100644 --- a/examples/process-output.rs +++ b/examples/process-output.rs @@ -1,3 +1,4 @@ +// TODO: document use std::env; use std::process::{Command, Stdio}; diff --git a/examples/process-run.rs b/examples/process-run.rs index 5b5810b..bab8bb8 100644 --- a/examples/process-run.rs +++ b/examples/process-run.rs @@ -1,3 +1,4 @@ +// TODO: document use std::env; use std::process::Command; diff --git a/examples/read-dir.rs b/examples/read-dir.rs index b1eaed0..1fb52e9 100644 --- a/examples/read-dir.rs +++ b/examples/read-dir.rs @@ -1,3 +1,4 @@ +// TODO: document //! Lists files in a directory given as an argument. use std::env; diff --git a/examples/read-file.rs b/examples/read-file.rs index 739cc52..5f0a896 100644 --- a/examples/read-file.rs +++ b/examples/read-file.rs @@ -1,3 +1,4 @@ +// TODO: document //! Prints a file given as an argument to stdout. use std::env; diff --git a/examples/tcp-client.rs b/examples/tcp-client.rs index 02fcb0c..4ce0885 100644 --- a/examples/tcp-client.rs +++ b/examples/tcp-client.rs @@ -1,3 +1,4 @@ +// TODO: document use std::net::TcpStream; use futures::io; diff --git a/examples/tcp-server.rs b/examples/tcp-server.rs index 1d9f853..bab1c6a 100644 --- a/examples/tcp-server.rs +++ b/examples/tcp-server.rs @@ -1,3 +1,4 @@ +// TODO: document //! TCP echo server. //! //! To send messages, do: diff --git a/examples/timer-sleep.rs b/examples/timer-sleep.rs index c3ef6f5..bec0e72 100644 --- a/examples/timer-sleep.rs +++ b/examples/timer-sleep.rs @@ -1,3 +1,4 @@ +// TODO: document use std::time::{Duration, Instant}; use smol::Timer; diff --git a/examples/timer-timeout.rs b/examples/timer-timeout.rs index f390cb9..6f8baaf 100644 --- a/examples/timer-timeout.rs +++ b/examples/timer-timeout.rs @@ -1,3 +1,5 @@ +// TODO: document +use std::io; use std::time::Duration; use anyhow::{bail, Result}; @@ -6,23 +8,23 @@ use futures::io::BufReader; use futures::prelude::*; use smol::Timer; -async fn timeout(dur: Duration, f: impl Future) -> Result { - futures::pin_mut!(f); - match select(f, Timer::after(dur)).await { - Either::Left((out, _)) => Ok(out), - Either::Right(_) => bail!("timed out"), +async fn timeout(dur: Duration, f: impl Future>) -> io::Result { + futures::select! { + out = f.fuse() => out, + _ = Timer::after(dur).fuse() => { + Err(io::Error::from(io::ErrorKind::TimedOut)) + } } } fn main() -> Result<()> { smol::run(async { - let mut stdin = BufReader::new(smol::reader(std::io::stdin())); let mut line = String::new(); + let mut stdin = BufReader::new(smol::reader(std::io::stdin())); - let dur = Duration::from_secs(5); - timeout(dur, stdin.read_line(&mut line)).await??; - + timeout(Duration::from_secs(5), stdin.read_line(&mut line)).await?; println!("Line: {}", line); + Ok(()) }) } diff --git a/examples/tls-client.rs b/examples/tls-client.rs index bebda66..166ca24 100644 --- a/examples/tls-client.rs +++ b/examples/tls-client.rs @@ -1,10 +1,11 @@ +// TODO: document use std::net::TcpStream; use anyhow::Result; use async_native_tls::{Certificate, TlsConnector}; use futures::io; use futures::prelude::*; -use piper::Lock; +use piper::Mutex; use smol::Async; fn main() -> Result<()> { @@ -21,7 +22,7 @@ fn main() -> Result<()> { let stream = tls.connect("127.0.0.1", stream).await?; println!("Connected to {}", stream.get_ref().get_ref().peer_addr()?); - let stream = Lock::new(stream); + let stream = Mutex::new(stream); future::try_join( io::copy(stdin, &mut &stream), diff --git a/examples/tls-server.rs b/examples/tls-server.rs index c497818..a22dc24 100644 --- a/examples/tls-server.rs +++ b/examples/tls-server.rs @@ -1,14 +1,15 @@ +// TODO: document use std::net::{TcpListener, TcpStream}; use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use futures::io; -use piper::Lock; +use piper::Mutex; use smol::{Async, Task}; async fn echo(stream: TlsStream>) -> Result<()> { println!("Copying"); - let stream = Lock::new(stream); + let stream = Mutex::new(stream); io::copy(&stream, &mut &stream).await?; Ok(()) } diff --git a/examples/unix-signal.rs b/examples/unix-signal.rs index 78a40cb..986fded 100644 --- a/examples/unix-signal.rs +++ b/examples/unix-signal.rs @@ -1,3 +1,4 @@ +// TODO: document #[cfg(unix)] fn main() -> std::io::Result<()> { use std::os::unix::net::UnixStream; diff --git a/examples/web-crawler.rs b/examples/web-crawler.rs index 20a3bfd..3f85e7a 100644 --- a/examples/web-crawler.rs +++ b/examples/web-crawler.rs @@ -1,3 +1,4 @@ +// TODO: document use std::collections::{HashSet, VecDeque}; use anyhow::Result; diff --git a/examples/websocket-client.rs b/examples/websocket-client.rs index 426a36a..ee44806 100644 --- a/examples/websocket-client.rs +++ b/examples/websocket-client.rs @@ -1,3 +1,4 @@ +// TODO: document use std::net::TcpStream; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/examples/websocket-server.rs b/examples/websocket-server.rs index c99932b..6e43706 100644 --- a/examples/websocket-server.rs +++ b/examples/websocket-server.rs @@ -1,3 +1,4 @@ +// TODO: document use std::net::{TcpListener, TcpStream}; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/examples/windows-uds.rs b/examples/windows-uds.rs index afea46e..5200446 100644 --- a/examples/windows-uds.rs +++ b/examples/windows-uds.rs @@ -1,3 +1,4 @@ +// TODO: document #[cfg(windows)] fn main() -> std::io::Result<()> { use std::path::PathBuf; diff --git a/piper/Cargo.toml b/piper/Cargo.toml deleted file mode 100644 index 811e050..0000000 --- a/piper/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "piper" -version = "0.1.0" -authors = ["Stjepan Glavina "] -edition = "2018" -description = "WIP" -license = "MIT OR Apache-2.0" - -[dependencies] -crossbeam-utils = "0.7.0" -futures = { version = "0.3.4", default-features = false, features = ["std"] } - -[dev-dependencies] -smol = { path = ".." } diff --git a/piper/src/chan.rs b/piper/src/chan.rs deleted file mode 100644 index 503e94a..0000000 --- a/piper/src/chan.rs +++ /dev/null @@ -1,911 +0,0 @@ -use std::cell::UnsafeCell; -use std::fmt; -use std::future::Future; -use std::isize; -use std::marker::PhantomData; -use std::mem; -use std::pin::Pin; -use std::process; -use std::ptr; -use std::sync::atomic::{self, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use crossbeam_utils::Backoff; -use futures::future; -use futures::stream::Stream; - -use crate::signal::{Signal, SignalListener}; - -/// Creates a bounded multi-producer multi-consumer channel. -/// -/// This channel has a buffer that can hold at most `cap` messages at a time. -/// -/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it -/// becomes closed. Receive operations on a closed and empty channel return `None` instead of -/// trying to await a message. -/// -/// # Panics -/// -/// If `cap` is zero, this function will panic. -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use std::time::Duration; -/// -/// use async_std::sync::channel; -/// use async_std::task; -/// -/// let (s, r) = channel(1); -/// -/// // This call returns immediately because there is enough space in the channel. -/// s.send(1).await; -/// -/// task::spawn(async move { -/// // This call will have to wait because the channel is full. -/// // It will be able to complete only after the first message is received. -/// s.send(2).await; -/// }); -/// -/// task::sleep(Duration::from_secs(1)).await; -/// assert_eq!(r.recv().await, Some(1)); -/// assert_eq!(r.recv().await, Some(2)); -/// # -/// # }) -/// ``` -pub fn chan(cap: usize) -> (Sender, Receiver) { - let channel = Arc::new(Channel::with_capacity(cap)); - let s = Sender { - channel: channel.clone(), - }; - let r = Receiver { - channel, - next_listener: None, - }; - (s, r) -} - -/// The sending side of a channel. -/// -/// This struct is created by the [`chan`] function. See its documentation for more. -/// -/// [`chan`]: fn.chan.html -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use async_std::sync::channel; -/// use async_std::task; -/// -/// let (s1, r) = channel(100); -/// let s2 = s1.clone(); -/// -/// task::spawn(async move { s1.send(1).await }); -/// task::spawn(async move { s2.send(2).await }); -/// -/// let msg1 = r.recv().await.unwrap(); -/// let msg2 = r.recv().await.unwrap(); -/// -/// assert_eq!(msg1 + msg2, 3); -/// # -/// # }) -/// ``` -pub struct Sender { - /// The inner channel. - channel: Arc>, -} - -impl Sender { - /// Sends a message into the channel. - /// - /// If the channel is full, this method will wait until there is space in the channel. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// use async_std::task; - /// - /// let (s, r) = channel(1); - /// - /// task::spawn(async move { - /// s.send(1).await; - /// s.send(2).await; - /// }); - /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); - /// # - /// # }) - /// ``` - pub async fn send(&self, msg: T) { - self.channel.send(msg).await - } - - /// Returns the channel capacity. - /// - /// # Examples - /// - /// ``` - /// use async_std::sync::channel; - /// - /// let (s, _) = channel::(5); - /// assert_eq!(s.capacity(), 5); - /// ``` - pub fn capacity(&self) -> usize { - if self.channel.handoff.is_some() { - 0 - } else { - self.channel.cap - } - } - - /// Returns `true` if the channel is empty. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(1); - /// - /// assert!(s.is_empty()); - /// s.send(0).await; - /// assert!(!s.is_empty()); - /// # - /// # }) - /// ``` - pub fn is_empty(&self) -> bool { - self.channel.is_empty() - } - - /// Returns `true` if the channel is full. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(1); - /// - /// assert!(!s.is_full()); - /// s.send(0).await; - /// assert!(s.is_full()); - /// # - /// # }) - /// ``` - pub fn is_full(&self) -> bool { - self.channel.is_full() - } - - /// Returns the number of messages in the channel. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(2); - /// assert_eq!(s.len(), 0); - /// - /// s.send(1).await; - /// s.send(2).await; - /// assert_eq!(s.len(), 2); - /// # - /// # }) - /// ``` - pub fn len(&self) -> usize { - self.channel.len() - } -} - -impl Drop for Sender { - fn drop(&mut self) { - // Decrement the sender count and disconnect the channel if it drops down to zero. - if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.disconnect(); - } - } -} - -impl Clone for Sender { - fn clone(&self) -> Sender { - let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); - - // Make sure the count never overflows, even if lots of sender clones are leaked. - if count > isize::MAX as usize { - process::abort(); - } - - Sender { - channel: self.channel.clone(), - } - } -} - -impl fmt::Debug for Sender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Sender { .. }") - } -} - -/// The receiving side of a channel. -/// -/// This type receives messages by calling `recv`. But it also implements the [`Stream`] trait, -/// which means it can act as an asynchronous iterator. This struct is created by the [`chan`] -/// function. See its documentation for more. -/// -/// [`chan`]: fn.chan.html -/// [`Stream`]: ../stream/trait.Stream.html -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use std::time::Duration; -/// -/// use async_std::sync::channel; -/// use async_std::task; -/// -/// let (s, r) = channel(100); -/// -/// task::spawn(async move { -/// s.send(1).await; -/// task::sleep(Duration::from_secs(1)).await; -/// s.send(2).await; -/// }); -/// -/// assert_eq!(r.recv().await, Some(1)); // Received immediately. -/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second. -/// # -/// # }) -/// ``` -pub struct Receiver { - /// The inner channel. - channel: Arc>, - - /// The key for this receiver in the `channel.next_ops` set. TODO - next_listener: Option, -} - -impl Receiver { - /// TODO - pub fn try_recv(&self) -> Option { - self.channel.try_recv().ok() - } - - /// Receives a message from the channel. - /// - /// If the channel is empty and still has senders, this method will wait until a message is - /// sent into the channel or until all senders get dropped. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// use async_std::task; - /// - /// let (s, r) = channel(1); - /// - /// task::spawn(async move { - /// s.send(1).await; - /// s.send(2).await; - /// }); - /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); - /// # - /// # }) - /// ``` - pub async fn recv(&self) -> Option { - self.channel.recv().await - } - - /// Returns the channel capacity. - /// - /// # Examples - /// - /// ``` - /// use async_std::sync::channel; - /// - /// let (_, r) = channel::(5); - /// assert_eq!(r.capacity(), 5); - /// ``` - pub fn capacity(&self) -> usize { - if self.channel.handoff.is_some() { - 0 - } else { - self.channel.cap - } - } - - /// Returns `true` if the channel is empty. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(1); - /// - /// assert!(r.is_empty()); - /// s.send(0).await; - /// assert!(!r.is_empty()); - /// # - /// # }) - /// ``` - pub fn is_empty(&self) -> bool { - self.channel.is_empty() - } - - /// Returns `true` if the channel is full. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(1); - /// - /// assert!(!r.is_full()); - /// s.send(0).await; - /// assert!(r.is_full()); - /// # - /// # }) - /// ``` - pub fn is_full(&self) -> bool { - self.channel.is_full() - } - - /// Returns the number of messages in the channel. - /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::sync::channel; - /// - /// let (s, r) = channel(2); - /// assert_eq!(r.len(), 0); - /// - /// s.send(1).await; - /// s.send(2).await; - /// assert_eq!(r.len(), 2); - /// # - /// # }) - /// ``` - pub fn len(&self) -> usize { - self.channel.len() - } -} - -impl Drop for Receiver { - fn drop(&mut self) { - // Decrement the receiver count and disconnect the channel if it drops down to zero. - if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.disconnect(); - } - } -} - -impl Clone for Receiver { - fn clone(&self) -> Receiver { - let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); - - // Make sure the count never overflows, even if lots of receiver clones are leaked. - if count > isize::MAX as usize { - process::abort(); - } - - Receiver { - channel: self.channel.clone(), - next_listener: None, - } - } -} - -impl Stream for Receiver { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll = loop { - match self.channel.try_recv() { - Ok(msg) => break Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => break Poll::Ready(None), - Err(TryRecvError::Empty) => {} - } - - self.next_listener = Some(self.channel.next_ops.listen()); - - match self.channel.try_recv() { - Ok(msg) => break Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => break Poll::Ready(None), - Err(TryRecvError::Empty) => {} - } - - match Pin::new(self.next_listener.as_mut().unwrap()).poll(cx) { - Poll::Ready(()) => {} - Poll::Pending => return Poll::Pending, - } - }; - - if self.next_listener.take().is_some() { - self.channel.next_ops.notify_all(); - } - - poll - } -} - -impl fmt::Debug for Receiver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Receiver { .. }") - } -} - -/// A slot in a channel. -struct Slot { - /// The current stamp. - stamp: AtomicUsize, - - /// The message in this slot. - msg: UnsafeCell, -} - -/// Bounded channel based on a preallocated array. -struct Channel { - /// The head of the channel. - /// - /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but - /// packed into a single `usize`. The lower bits represent the index, while the upper bits - /// represent the lap. The mark bit in the head is always zero. - /// - /// Messages are popped from the head of the channel. - head: AtomicUsize, - - /// The tail of the channel. - /// - /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but - /// packed into a single `usize`. The lower bits represent the index, while the upper bits - /// represent the lap. The mark bit indicates that the channel is disconnected. - /// - /// Messages are pushed into the tail of the channel. - tail: AtomicUsize, - - /// The buffer holding slots. - buffer: *mut Slot, - - /// The channel capacity. - cap: usize, - - /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. - one_lap: usize, - - /// If this bit is set in the tail, that means either all senders were dropped or all receivers - /// were dropped. - mark_bit: usize, - - /// Send operations waiting while the channel is full. - send_ops: Signal, - - /// TODO - handoff: Option, - - /// Receive operations waiting while the channel is empty and not disconnected. - recv_ops: Signal, - - /// Stream operations while the channel is empty and not disconnected. - next_ops: Signal, - - /// The number of currently active `Sender`s. - sender_count: AtomicUsize, - - /// The number of currently active `Receivers`s. - receiver_count: AtomicUsize, - - /// Indicates that dropping a `Channel` may drop values of type `T`. - _marker: PhantomData, -} - -unsafe impl Send for Channel {} -unsafe impl Sync for Channel {} - -impl Unpin for Channel {} - -impl Channel { - /// Creates a bounded channel of capacity `cap`. - fn with_capacity(cap: usize) -> Self { - let handoff = if cap == 0 { Some(Signal::new()) } else { None }; - let cap = cap.max(1); - - // Compute constants `mark_bit` and `one_lap`. - let mark_bit = (cap + 1).next_power_of_two(); - let one_lap = mark_bit * 2; - - // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. - let head = 0; - // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. - let tail = 0; - - // Allocate a buffer of `cap` slots. - let buffer = { - let mut v = Vec::>::with_capacity(cap); - let ptr = v.as_mut_ptr(); - mem::forget(v); - ptr - }; - - // Initialize stamps in the slots. - for i in 0..cap { - unsafe { - // Set the stamp to `{ lap: 0, mark: 0, index: i }`. - let slot = buffer.add(i); - ptr::write(&mut (*slot).stamp, AtomicUsize::new(i)); - } - } - - Channel { - buffer, - cap, - one_lap, - mark_bit, - head: AtomicUsize::new(head), - tail: AtomicUsize::new(tail), - send_ops: Signal::new(), - handoff, - recv_ops: Signal::new(), - next_ops: Signal::new(), - sender_count: AtomicUsize::new(1), - receiver_count: AtomicUsize::new(1), - _marker: PhantomData, - } - } - - /// Attempts to send a message. - fn try_send(&self, msg: T) -> Result> { - let backoff = Backoff::new(); - let mut tail = self.tail.load(Ordering::Relaxed); - - loop { - // Extract mark bit from the tail and unset it. - // - // If the mark bit was set (which means all receivers have been dropped), we will still - // send the message into the channel if there is enough capacity. The message will get - // dropped when the channel is dropped (which means when all senders are also dropped). - let mark_bit = tail & self.mark_bit; - tail ^= mark_bit; - - // Deconstruct the tail. - let index = tail & (self.mark_bit - 1); - let lap = tail & !(self.one_lap - 1); - - // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the tail and the stamp match, we may attempt to push. - if tail == stamp { - let new_tail = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, mark: 0, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - - // Try moving the tail. - match self.tail.compare_exchange_weak( - tail | mark_bit, - new_tail | mark_bit, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Write the message into the slot and update the stamp. - unsafe { slot.msg.get().write(msg) }; - let stamp = tail + 1; - slot.stamp.store(stamp, Ordering::Release); - - // Wake a blocked receive operation. - self.recv_ops.notify_one(); - - // Wake all blocked streams. - self.next_ops.notify_all(); - - return Ok(stamp); - } - Err(t) => { - tail = t; - backoff.spin(); - } - } - } else if stamp.wrapping_add(self.one_lap) == tail + 1 { - atomic::fence(Ordering::SeqCst); - let head = self.head.load(Ordering::Relaxed); - - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the channel is full. - - // Check if the channel is disconnected. - if mark_bit != 0 { - return Err(TrySendError::Disconnected(msg)); - } else { - return Err(TrySendError::Full(msg)); - } - } - - backoff.spin(); - tail = self.tail.load(Ordering::Relaxed); - } else { - // Snooze because we need to wait for the stamp to get updated. - backoff.snooze(); - tail = self.tail.load(Ordering::Relaxed); - } - } - } - - async fn send(&self, mut msg: T) { - let mut listener = None; - - let stamp = loop { - match self.try_send(msg) { - Ok(stamp) => break stamp, - Err(TrySendError::Disconnected(_)) => return future::pending().await, - Err(TrySendError::Full(m)) => msg = m, - } - - match listener.take() { - None => listener = Some(self.send_ops.listen()), - Some(w) => { - w.await; - if self.cap > 1 { - self.send_ops.notify_one(); - } - } - } - }; - - if let Some(h) = &self.handoff { - let mut listener = None; - - while unsafe { &*self.buffer }.stamp.load(Ordering::SeqCst) != stamp { - match listener.take() { - None => listener = Some(h.listen()), - Some(w) => w.await, - } - } - } - } - - /// Attempts to receive a message. - fn try_recv(&self) -> Result { - let backoff = Backoff::new(); - let mut head = self.head.load(Ordering::Relaxed); - - loop { - // Deconstruct the head. - let index = head & (self.mark_bit - 1); - let lap = head & !(self.one_lap - 1); - - // Inspect the corresponding slot. - let slot = unsafe { &*self.buffer.add(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the the stamp is ahead of the head by 1, we may attempt to pop. - if head + 1 == stamp { - let new = if index + 1 < self.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, mark: 0, index: index + 1 }`. - head + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - - // Try moving the head. - match self.head.compare_exchange_weak( - head, - new, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Read the message from the slot and update the stamp. - let msg = unsafe { slot.msg.get().read() }; - let stamp = head.wrapping_add(self.one_lap); - slot.stamp.store(stamp, Ordering::Release); - - // Wake a blocked send operation. - self.send_ops.notify_one(); - - // Notify a send operation waiting for handoff. - if let Some(h) = &self.handoff { - h.notify_all(); - } - - return Ok(msg); - } - Err(h) => { - head = h; - backoff.spin(); - } - } - } else if stamp == head { - atomic::fence(Ordering::SeqCst); - let tail = self.tail.load(Ordering::Relaxed); - - // If the tail equals the head, that means the channel is empty. - if (tail & !self.mark_bit) == head { - // If the channel is disconnected... - if tail & self.mark_bit != 0 { - return Err(TryRecvError::Disconnected); - } else { - // Otherwise, the receive operation is not ready. - return Err(TryRecvError::Empty); - } - } - - backoff.spin(); - head = self.head.load(Ordering::Relaxed); - } else { - // Snooze because we need to wait for the stamp to get updated. - backoff.snooze(); - head = self.head.load(Ordering::Relaxed); - } - } - } - - async fn recv(&self) -> Option { - let mut listener = None; - - loop { - match self.try_recv() { - Ok(msg) => return Some(msg), - Err(TryRecvError::Disconnected) => return None, - Err(TryRecvError::Empty) => {} - } - - match listener.take() { - None => listener = Some(self.recv_ops.listen()), - Some(w) => { - w.await; - if self.cap > 1 { - self.recv_ops.notify_one(); - } - } - } - } - } - - /// Returns the current number of messages inside the channel. - fn len(&self) -> usize { - loop { - // Load the tail, then load the head. - let tail = self.tail.load(Ordering::SeqCst); - let head = self.head.load(Ordering::SeqCst); - - // If the tail didn't change, we've got consistent values to work with. - if self.tail.load(Ordering::SeqCst) == tail { - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); - - return if hix < tix { - tix - hix - } else if hix > tix { - self.cap - hix + tix - } else if (tail & !self.mark_bit) == head { - 0 - } else { - self.cap - }; - } - } - } - - /// Returns `true` if the channel is empty. - fn is_empty(&self) -> bool { - let head = self.head.load(Ordering::SeqCst); - let tail = self.tail.load(Ordering::SeqCst); - - // Is the tail equal to the head? - // - // Note: If the head changes just before we load the tail, that means there was a moment - // when the channel was not empty, so it is safe to just return `false`. - (tail & !self.mark_bit) == head - } - - /// Returns `true` if the channel is full. - fn is_full(&self) -> bool { - let tail = self.tail.load(Ordering::SeqCst); - let head = self.head.load(Ordering::SeqCst); - - // Is the head lagging one lap behind tail? - // - // Note: If the tail changes just before we load the head, that means there was a moment - // when the channel was not full, so it is safe to just return `false`. - head.wrapping_add(self.one_lap) == tail & !self.mark_bit - } - - /// Disconnects the channel and wakes up all blocked operations. - fn disconnect(&self) { - let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); - - if tail & self.mark_bit == 0 { - // Notify everyone blocked on this channel. - self.send_ops.notify_all(); - self.recv_ops.notify_all(); - self.next_ops.notify_all(); - } - } -} - -impl Drop for Channel { - fn drop(&mut self) { - // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); - - // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap { - hix + i - } else { - hix + i - self.cap - }; - - unsafe { - self.buffer.add(index).drop_in_place(); - } - } - - // Finally, deallocate the buffer, but don't run any destructors. - unsafe { - Vec::from_raw_parts(self.buffer, 0, self.cap); - } - } -} - -/// An error returned from the `try_send()` method. -enum TrySendError { - /// The channel is full but not disconnected. - Full(T), - - /// The channel is full and disconnected. - Disconnected(T), -} - -/// An error returned from the `try_recv()` method. -enum TryRecvError { - /// The channel is empty but not disconnected. - Empty, - - /// The channel is empty and disconnected. - Disconnected, -} diff --git a/piper/src/lib.rs b/piper/src/lib.rs deleted file mode 100644 index 295c840..0000000 --- a/piper/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! Asynchronous pipes, channels, mutexes, and other primitives. - -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] - -mod chan; -mod lock; -mod mutex; -mod pipe; -mod shared; -mod signal; - -pub use chan::{chan, Receiver, Sender}; -pub use lock::{Lock, LockGuard}; -pub use mutex::{Mutex, MutexGuard}; -pub use pipe::{pipe, Reader, Writer}; -pub use shared::Shared; diff --git a/piper/src/lock.rs b/piper/src/lock.rs deleted file mode 100644 index 30158c4..0000000 --- a/piper/src/lock.rs +++ /dev/null @@ -1,293 +0,0 @@ -use std::cell::UnsafeCell; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::task::{Context, Poll}; - -use crate::signal::Signal; - -use crossbeam_utils::Backoff; -use futures::io::{self, AsyncRead, AsyncWrite}; - -/// A lock that implements async I/O traits. -/// -/// This is a blocking mutex that adds the following impls: -/// -/// - `impl AsyncRead for Lock where &T: AsyncRead + Unpin {}` -/// - `impl AsyncRead for &Lock where &T: AsyncRead + Unpin {}` -/// - `impl AsyncWrite for Lock where &T: AsyncWrite + Unpin {}` -/// - `impl AsyncWrite for &Lock where &T: AsyncWrite + Unpin {}` -/// -/// This lock is ensures fairness by handling lock operations in the first-in first-out order. -/// -/// While primarily designed for wrapping async I/O objects, this lock can also be used as a -/// regular blocking mutex. It's not quite as efficient as [`parking_lot::Mutex`], but it's still -/// an improvement over [`std::sync::Mutex`]. -/// -/// [`parking_lot::Mutex`]: https://docs.rs/parking_lot/0.10.0/parking_lot/type.Mutex.html -/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html -/// -/// # Examples -/// -/// ``` -/// use futures::io; -/// use futures::prelude::*; -/// use piper::Lock; -/// -/// // Reads data from a stream and echoes it back. -/// async fn echo(stream: impl AsyncRead + AsyncWrite + Unpin) -> io::Result { -/// let stream = Lock::new(stream); -/// io::copy(&stream, &mut &stream).await -/// } -/// ``` -pub struct Lock { - /// Set to `true` when the lock is acquired by a `LockGuard`. - locked: AtomicBool, - - /// Lock operations waiting for the lock to get unlocked. - lock_ops: Signal, - - /// The value inside the lock. - data: UnsafeCell, -} - -unsafe impl Send for Lock {} -unsafe impl Sync for Lock {} - -impl Lock { - /// Creates a new lock. - /// - /// # Examples - /// - /// ``` - /// use piper::Lock; - /// - /// let lock = Lock::new(10); - /// ``` - pub fn new(data: T) -> Lock { - Lock { - locked: AtomicBool::new(false), - lock_ops: Signal::new(), - data: UnsafeCell::new(data), - } - } - - /// Acquires the lock, blocking the current thread until it is able to do so. - /// - /// Returns a guard that releases the lock when dropped. - /// - /// # Examples - /// - /// ``` - /// use piper::Lock; - /// - /// let lock = Lock::new(10); - /// let guard = lock.lock(); - /// assert_eq!(*guard, 10); - /// ``` - pub fn lock(&self) -> LockGuard<'_, T> { - loop { - // Try locking the lock. - let backoff = Backoff::new(); - loop { - if let Some(guard) = self.try_lock() { - return guard; - } - if backoff.is_completed() { - break; - } - backoff.snooze(); - } - - // Start watching for notifications and try locking again. - let l = self.lock_ops.listen(); - if let Some(guard) = self.try_lock() { - return guard; - } - l.wait(); - } - } - - /// Attempts to acquire the lock. - /// - /// If the lock could not be acquired at this time, then [`None`] is returned. Otherwise, a - /// guard is returned that releases the lock when dropped. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// - /// # Examples - /// - /// ``` - /// use piper::Lock; - /// - /// let lock = Lock::new(10); - /// if let Ok(guard) = lock.try_lock() { - /// assert_eq!(*guard, 10); - /// } - /// # ; - /// ``` - #[inline] - pub fn try_lock(&self) -> Option> { - if !self.locked.compare_and_swap(false, true, Ordering::Acquire) { - Some(LockGuard(self)) - } else { - None - } - } - - /// Consumes the lock, returning the underlying data. - /// - /// # Examples - /// - /// ``` - /// use piper::Lock; - /// - /// let lock = Lock::new(10); - /// assert_eq!(lock.into_inner(), 10); - /// ``` - pub fn into_inner(self) -> T { - self.data.into_inner() - } - - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the lock mutably, no actual locking takes place -- the mutable - /// borrow statically guarantees the lock is not already acquired. - /// - /// # Examples - /// - /// ``` - /// use piper::Lock; - /// - /// let mut lock = Lock::new(0); - /// *lock.get_mut() = 10; - /// assert_eq!(*lock.lock(), 10); - /// ``` - pub fn get_mut(&mut self) -> &mut T { - unsafe { &mut *self.data.get() } - } -} - -impl fmt::Debug for Lock { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct Locked; - impl fmt::Debug for Locked { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - - match self.try_lock() { - None => f.debug_struct("Lock").field("data", &Locked).finish(), - Some(guard) => f.debug_struct("Lock").field("data", &&*guard).finish(), - } - } -} - -impl From for Lock { - fn from(val: T) -> Lock { - Lock::new(val) - } -} - -impl Default for Lock { - fn default() -> Lock { - Lock::new(Default::default()) - } -} - -impl AsyncRead for Lock { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut *self.lock()).poll_read(cx, buf) - } -} - -impl AsyncRead for &Lock { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut *self.lock()).poll_read(cx, buf) - } -} - -impl AsyncWrite for Lock { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut *self.lock()).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.lock()).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.lock()).poll_close(cx) - } -} - -impl AsyncWrite for &Lock { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut *self.lock()).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.lock()).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.lock()).poll_close(cx) - } -} - -/// A guard that releases the lock when dropped. -pub struct LockGuard<'a, T>(&'a Lock); - -unsafe impl Send for LockGuard<'_, T> {} -unsafe impl Sync for LockGuard<'_, T> {} - -impl Drop for LockGuard<'_, T> { - fn drop(&mut self) { - self.0.locked.store(false, Ordering::Release); - self.0.lock_ops.notify_one(); - } -} - -impl fmt::Debug for LockGuard<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&**self, f) - } -} - -impl fmt::Display for LockGuard<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - (**self).fmt(f) - } -} - -impl Deref for LockGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.0.data.get() } - } -} - -impl DerefMut for LockGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.0.data.get() } - } -} diff --git a/piper/src/mutex.rs b/piper/src/mutex.rs deleted file mode 100644 index 3d73956..0000000 --- a/piper/src/mutex.rs +++ /dev/null @@ -1,229 +0,0 @@ -use std::cell::UnsafeCell; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; - -use crate::signal::Signal; - -/// An asynchronous mutex. -/// -/// This type is similar to [`std::sync::Mutex`], except locking is an asynchronous operation. -/// -/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html -/// -/// # Examples -/// -/// ``` -/// # smol::run(async { -/// # -/// use piper::Mutex; -/// use smol::Task; -/// use std::sync::Arc; -/// -/// let m = Arc::new(Mutex::new(0)); -/// let mut tasks = vec![]; -/// -/// for _ in 0..10 { -/// let m = m.clone(); -/// tasks.push(Task::spawn(async move { -/// *m.lock().await += 1; -/// })); -/// } -/// -/// for t in tasks { -/// t.await; -/// } -/// assert_eq!(*m.lock().await, 10); -/// # -/// # }) -/// ``` -pub struct Mutex { - locked: AtomicBool, - lock_ops: Signal, - data: UnsafeCell, -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -impl Mutex { - /// Creates a new async mutex. - /// - /// # Examples - /// - /// ``` - /// use piper::Mutex; - /// - /// let mutex = Mutex::new(0); - /// ``` - pub fn new(data: T) -> Mutex { - Mutex { - locked: AtomicBool::new(false), - lock_ops: Signal::new(), - data: UnsafeCell::new(data), - } - } - - /// Acquires the mutex. - /// - /// Returns a guard that releases the mutex when dropped. - /// - /// # Examples - /// - /// ``` - /// # smol::block_on(async { - /// # - /// use piper::Mutex; - /// - /// let mutex = Mutex::new(10); - /// let guard = mutex.lock().await; - /// assert_eq!(*guard, 10); - /// # - /// # }) - /// ``` - pub async fn lock(&self) -> MutexGuard<'_, T> { - loop { - // Try locking the mutex. - if let Some(guard) = self.try_lock() { - return guard; - } - - // Start watching for notifications and try locking again. - let l = self.lock_ops.listen(); - if let Some(guard) = self.try_lock() { - return guard; - } - l.await; - } - } - - /// Attempts to acquire the mutex. - /// - /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a - /// guard is returned that releases the mutex when dropped. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// - /// # Examples - /// - /// ``` - /// use piper::Mutex; - /// - /// let mutex = Mutex::new(10); - /// if let Ok(guard) = mutex.try_lock() { - /// assert_eq!(*guard, 10); - /// } - /// # ; - /// ``` - #[inline] - pub fn try_lock(&self) -> Option> { - if !self.locked.compare_and_swap(false, true, Ordering::Acquire) { - Some(MutexGuard(self)) - } else { - None - } - } - - /// Consumes the mutex, returning the underlying data. - /// - /// # Examples - /// - /// ``` - /// use piper::Mutex; - /// - /// let mutex = Mutex::new(10); - /// assert_eq!(mutex.into_inner(), 10); - /// ``` - pub fn into_inner(self) -> T { - self.data.into_inner() - } - - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable - /// borrow statically guarantees the mutex is not already acquired. - /// - /// # Examples - /// - /// ``` - /// # smol::block_on(async { - /// # - /// use piper::Mutex; - /// - /// let mut mutex = Mutex::new(0); - /// *mutex.get_mut() = 10; - /// assert_eq!(*mutex.lock().await, 10); - /// # - /// # }) - /// ``` - pub fn get_mut(&mut self) -> &mut T { - unsafe { &mut *self.data.get() } - } -} - -impl fmt::Debug for Mutex { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct Locked; - impl fmt::Debug for Locked { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - - match self.try_lock() { - None => f.debug_struct("Mutex").field("data", &Locked).finish(), - Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), - } - } -} - -impl From for Mutex { - fn from(val: T) -> Mutex { - Mutex::new(val) - } -} - -impl Default for Mutex { - fn default() -> Mutex { - Mutex::new(Default::default()) - } -} - -/// A guard that releases the mutex when dropped. -pub struct MutexGuard<'a, T>(&'a Mutex); - -unsafe impl Send for MutexGuard<'_, T> {} -unsafe impl Sync for MutexGuard<'_, T> {} - -impl Drop for MutexGuard<'_, T> { - fn drop(&mut self) { - self.0.locked.store(false, Ordering::Release); - self.0.lock_ops.notify_one(); - } -} - -impl fmt::Debug for MutexGuard<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&**self, f) - } -} - -impl fmt::Display for MutexGuard<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - (**self).fmt(f) - } -} - -impl Deref for MutexGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.0.data.get() } - } -} - -impl DerefMut for MutexGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.0.data.get() } - } -} diff --git a/piper/src/pipe.rs b/piper/src/pipe.rs deleted file mode 100644 index 5304de2..0000000 --- a/piper/src/pipe.rs +++ /dev/null @@ -1,302 +0,0 @@ -use std::cell::Cell; -use std::io; -use std::mem; -use std::pin::Pin; -use std::slice; -use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use futures::io::{AsyncRead, AsyncWrite}; -use futures::task::AtomicWaker; - -/// Creates a bounded single-producer single-consumer pipe. -pub fn pipe(cap: usize) -> (Reader, Writer) { - assert!(cap > 0, "capacity must be positive"); - - let mut v = Vec::with_capacity(cap); - let buffer = v.as_mut_ptr(); - mem::forget(v); - - let inner = Arc::new(Inner { - head: AtomicUsize::new(0), - tail: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), - closed: AtomicBool::new(false), - buffer, - cap, - }); - - let r = Reader { - inner: inner.clone(), - head: Cell::new(0), - tail: Cell::new(0), - }; - - let w = Writer { - inner, - head: Cell::new(0), - tail: Cell::new(0), - }; - - (r, w) -} - -// NOTE: Reader and Writer are !Clone + !Sync - -/// The reading side of a pipe. -#[derive(Debug)] -pub struct Reader { - inner: Arc, - head: Cell, - tail: Cell, -} - -/// The writing side of a pipe. -#[derive(Debug)] -pub struct Writer { - inner: Arc, - head: Cell, - tail: Cell, -} - -unsafe impl Send for Reader {} -unsafe impl Send for Writer {} - -#[derive(Debug)] -struct Inner { - head: AtomicUsize, - tail: AtomicUsize, - reader: AtomicWaker, - writer: AtomicWaker, - closed: AtomicBool, - buffer: *mut u8, - cap: usize, -} - -impl Inner { - #[inline] - unsafe fn slot(&self, pos: usize) -> *mut u8 { - if pos < self.cap { - self.buffer.add(pos) - } else { - self.buffer.add(pos - self.cap) - } - } - - #[inline] - fn advance(&self, pos: usize, by: usize) -> usize { - if by < 2 * self.cap - pos { - pos + by - } else { - pos - self.cap + by - self.cap - } - } - - #[inline] - fn distance(&self, a: usize, b: usize) -> usize { - if a <= b { - b - a - } else { - 2 * self.cap - a + b - } - } -} - -impl Drop for Inner { - fn drop(&mut self) { - unsafe { - Vec::from_raw_parts(self.buffer, 0, self.cap); - } - } -} - -impl Drop for Reader { - fn drop(&mut self) { - self.inner.closed.store(true, Ordering::SeqCst); - self.inner.writer.wake(); - } -} - -impl Drop for Writer { - fn drop(&mut self) { - self.inner.closed.store(true, Ordering::SeqCst); - self.inner.reader.wake(); - } -} - -impl AsyncRead for Reader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut &*self).poll_read(cx, buf) - } -} - -impl AsyncWrite for Writer { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut &*self).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self).poll_close(cx) - } -} - -impl AsyncRead for &Reader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut head = self.head.get(); - let mut tail = self.tail.get(); - - if self.inner.distance(head, tail) == 0 { - tail = self.inner.tail.load(Ordering::Acquire); - self.tail.set(tail); - - if self.inner.distance(head, tail) == 0 { - self.inner.reader.register(cx.waker()); - atomic::fence(Ordering::SeqCst); - - tail = self.inner.tail.load(Ordering::Acquire); - self.tail.set(tail); - - if self.inner.distance(head, tail) == 0 { - if self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } else { - return Poll::Pending; - } - } - - self.inner.reader.take(); - } - } - - let mut count = 0; - - loop { - self.inner.writer.wake(); - - let streak = if head < self.inner.cap { - self.inner.cap - head - } else { - 2 * self.inner.cap - head - }; - let remaining = self.inner.distance(head, tail); - let space = buf.len() - count; - - let n = streak.min(remaining).min(space).min(16 * 1024); - if n == 0 { - break; - } - - let slice = unsafe { slice::from_raw_parts(self.inner.slot(head), n) }; - buf[count..count + n].copy_from_slice(slice); - count += n; - head = self.inner.advance(head, n); - - // Store the current head pointer. - self.inner.head.store(head, Ordering::Release); - self.head.set(head); - } - - Poll::Ready(Ok(count)) - } -} - -impl AsyncWrite for &Writer { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if buf.is_empty() || self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } - - let mut head = self.head.get(); - let mut tail = self.tail.get(); - - if self.inner.distance(head, tail) == self.inner.cap { - head = self.inner.head.load(Ordering::Acquire); - self.head.set(head); - - if self.inner.distance(head, tail) == self.inner.cap { - self.inner.writer.register(cx.waker()); - atomic::fence(Ordering::SeqCst); - - head = self.inner.head.load(Ordering::Acquire); - self.head.set(head); - - if self.inner.distance(head, tail) == self.inner.cap { - if self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } else { - return Poll::Pending; - } - } - - self.inner.writer.take(); - } - } - - let mut count = 0; - - loop { - self.inner.reader.wake(); - - let streak = if tail < self.inner.cap { - self.inner.cap - tail - } else { - 2 * self.inner.cap - tail - }; - let remaining = buf.len() - count; - let space = self.inner.cap - self.inner.distance(head, tail); - - let n = streak.min(remaining).min(space).min(16 * 1024); - if n == 0 { - break; - } - - unsafe { - self.inner - .slot(tail) - .copy_from_nonoverlapping(&buf[count], n); - } - count += n; - tail = self.inner.advance(tail, n); - - // Store the current tail pointer. - self.inner.tail.store(tail, Ordering::Release); - self.tail.set(tail); - } - - Poll::Ready(Ok(count)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} diff --git a/piper/src/shared.rs b/piper/src/shared.rs deleted file mode 100644 index a8c18af..0000000 --- a/piper/src/shared.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::fmt; -use std::hash::{Hash, Hasher}; -use std::ops::Deref; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use futures::io::{self, AsyncRead, AsyncWrite}; - -/// A reference-counted pointer that implements async I/O traits. -/// -/// This is just a wrapper around `Arc` that adds the following impls: -/// -/// - `impl AsyncRead for Shared where &T: AsyncRead {}` -/// - `impl AsyncWrite for Shared where &T: AsyncWrite {}` -/// -/// # Examples -/// -/// ```no_run -/// use futures::io; -/// use piper::Shared; -/// use smol::Async; -/// use std::net::TcpStream; -/// -/// # fn main() -> std::io::Result<()> { smol::run(async { -/// // A client that echoes messages back to the server. -/// let stream = Async::::connect("127.0.0.1:8000").await?; -/// -/// // Create two handles to the stream. -/// let reader = Shared::new(stream); -/// let mut writer = reader.clone(); -/// -/// // Echo data received from the reader back into the writer. -/// io::copy(reader, &mut writer).await?; -/// # Ok(()) }) } -/// ``` -pub struct Shared(Arc); - -impl Unpin for Shared {} - -impl Shared { - /// Constructs a new `Shared`. - /// - /// # Examples - /// - /// ``` - /// use piper::Shared; - /// use std::sync::Arc; - /// - /// // These two lines are equivalent: - /// let a = Shared::new(7); - /// let a = Shared(Arc::new(7)); - /// ``` - pub fn new(data: T) -> Shared { - Shared(Arc::new(data)) - } -} - -impl Clone for Shared { - fn clone(&self) -> Shared { - Shared(self.0.clone()) - } -} - -impl Deref for Shared { - type Target = T; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl fmt::Debug for Shared { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&**self, f) - } -} - -impl fmt::Display for Shared { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&**self, f) - } -} - -impl Hash for Shared { - fn hash(&self, state: &mut H) { - (**self).hash(state) - } -} - -impl fmt::Pointer for Shared { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Pointer::fmt(&(&**self as *const T), f) - } -} - -impl Default for Shared { - fn default() -> Shared { - Shared(Arc::new(Default::default())) - } -} - -impl From for Shared { - fn from(t: T) -> Shared { - Shared(Arc::new(t)) - } -} - -// NOTE(stjepang): It would also make sense to have the following impls: -// -// - `impl AsyncRead for &Shared where &T: AsyncRead {}` -// - `impl AsyncWrite for &Shared where &T: AsyncWrite {}` -// -// However, those impls sometimes make Rust's type inference try too hard when types cannot be -// inferred. In the end, instead of complaining with a nice error message, the Rust compiler ends -// up overflowing and dumping a very long error message spanning multiple screens. -// -// Since those impls are not essential, I decided to err on the safe side and not include them. - -impl AsyncRead for Shared -where - for<'a> &'a T: AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut &*self.0).poll_read(cx, buf) - } -} - -impl AsyncWrite for Shared -where - for<'a> &'a T: AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut &*self.0).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self.0).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self.0).poll_close(cx) - } -} diff --git a/piper/src/signal.rs b/piper/src/signal.rs deleted file mode 100644 index 4291918..0000000 --- a/piper/src/signal.rs +++ /dev/null @@ -1,523 +0,0 @@ -//! A synchronization primitive for notifying async tasks and threads. -//! -//! This is a variant of a conditional variable that is heavily inspired by eventcounts invented -//! by Dmitry Vyukov: http://www.1024cores.net/home/lock-free-algorithms/eventcounts - -use std::cell::Cell; -use std::future::Future; -use std::mem::{self, ManuallyDrop}; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::ptr::NonNull; -use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::task::{Context, Poll, Waker}; -use std::thread::{self, Thread}; - -/// A bit set inside `Signal` when there is at least one listener that has already been notified. -const NOTIFIED: usize = 1 << 0; - -/// A bit set inside `Signal` when there is at least one notifiable listener. -const NOTIFIABLE: usize = 1 << 1; - -/// Inner state of `Signal`. -struct Inner { - /// Holds bits `NOTIFIED` and `NOTIFIABLE`. - flags: AtomicUsize, - - /// A linked list holding registered listeners. - list: Mutex, -} - -impl Inner { - /// Locks the list. - fn lock(&self) -> ListGuard<'_> { - ListGuard { - inner: self, - guard: self.list.lock().unwrap(), - } - } -} - -/// A synchronization primitive for notifying async tasks and threads. -/// -/// Listeners can be registered using `listen()`. There are two ways of notifying listeners: -/// -/// 1. `notify_one()` notifies one listener. -/// 2. `notify_all()` notifies all listeners. -/// -/// If there are no active listeners at the time a notification is sent, it simply gets lost. -/// -/// Note that `notify_one()` does not notify one *additional* listener - it only makes sure -/// *at least* one listener among the active ones is notified. -/// -/// There are two ways for a listener to wait for a notification: -/// -/// 1. In an asynchronous manner using `.await`. -/// 2. In a blocking manner by calling `wait()` on it. -/// -/// If a notified listener is dropped without ever waiting for a notification, dropping will notify -/// another another active listener. -/// -/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. -pub struct Signal { - /// A pointer to heap-allocated inner state. - /// - /// This pointer is initially null and gets lazily initialized on first use. Semantically, it - /// is an `Arc` so it's important to keep in mind that it contributes to the `Arc`s - /// reference count. - inner: AtomicPtr, -} - -unsafe impl Send for Signal {} -unsafe impl Sync for Signal {} - -impl Signal { - /// Creates a new `Signal`. - #[inline] - pub fn new() -> Signal { - Signal { - inner: AtomicPtr::default(), - } - } - - /// Returns a guard listening for a notification. - #[cold] - pub fn listen(&self) -> SignalListener { - let inner = self.inner(); - let listener = SignalListener { - inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, - entry: Some(inner.lock().insert()), - }; - - // Make sure the listener is registered before whatever happens next. - full_fence(); - listener - } - - /// Notifies a single active listener. - /// - /// Note that this does not notify one *additional* listener - it only makes sure *at least* - /// one listener among the active ones is notified. - #[inline] - pub fn notify_one(&self) { - let inner = self.inner(); - - // Make sure the notification comes after whatever triggered it. - full_fence(); - - // Notify if no active listeners have been notified and there is at least one listener. - let flags = inner.flags.load(Ordering::Relaxed); - if flags & NOTIFIED == 0 && flags & NOTIFIABLE != 0 { - inner.lock().notify(false); - } - } - - /// Notifies all active listeners. - #[inline] - pub fn notify_all(&self) { - let inner = self.inner(); - - // Make sure the notification comes after whatever triggered it. - full_fence(); - - // Notify if there is at least one listener. - if inner.flags.load(Ordering::Relaxed) & NOTIFIABLE != 0 { - inner.lock().notify(true); - } - } - - /// Returns a reference to the inner state. - fn inner(&self) -> &Inner { - let mut inner = self.inner.load(Ordering::Acquire); - - // Initialize the state if this is its first use. - if inner.is_null() { - // Allocate on the heap. - let new = Arc::new(Inner { - flags: AtomicUsize::new(0), - list: Mutex::new(List { - head: None, - tail: None, - len: 0, - notifiable: 0, - }), - }); - // Convert the heap-allocated state into a raw pointer. - let new = Arc::into_raw(new) as *mut Inner; - - // Attempt to replace the null-pointer with the new state pointer. - inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel); - - // Check if the old pointer value was indeed null. - if inner.is_null() { - // If yes, then use the new state pointer. - inner = new; - } else { - // If not, that means a concurrent operation has initialized the state. - // In that case, use the old pointer and deallocate the new one. - unsafe { - drop(Arc::from_raw(new)); - } - } - } - - unsafe { &*inner } - } -} - -impl Drop for Signal { - #[inline] - fn drop(&mut self) { - let inner: *mut Inner = *self.inner.get_mut(); - - // If the state pointer has been initialized, deallocate it. - if !inner.is_null() { - unsafe { - drop(Arc::from_raw(inner)); - } - } - } -} - -impl Default for Signal { - fn default() -> Signal { - Signal::new() - } -} - -/// A guard waiting for a notification from a `Signal`. -/// -/// There are two ways for a listener to wait for a notification: -/// -/// 1. In an asynchronous manner using `.await`. -/// 2. In a blocking manner by calling `wait()` on it. -/// -/// If a notified listener is dropped without ever waiting for a notification, dropping will notify -/// another another active listener. -pub struct SignalListener { - /// A reference to `Signal`s inner state. - inner: Arc, - - /// A pointer to this listener's entry in the linked list. - entry: Option>, -} - -unsafe impl Send for SignalListener {} -unsafe impl Sync for SignalListener {} - -impl SignalListener { - /// Blocks until a notification is received. - pub fn wait(mut self) { - // Take out the entry pointer and set it to `None`. - let entry = match self.entry.take() { - None => unreachable!("cannot wait twice on a `SignalListener`"), - Some(entry) => entry, - }; - - // Set this listener's state to `Waiting`. - { - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; - - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified) { - State::Notified => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry); - return; - } - // Otherwise, set the state to `Waiting`. - _ => e.state.set(State::Waiting(thread::current())), - } - } - - // Wait until a notification is received. - loop { - thread::park(); - - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; - - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified) { - State::Notified => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry); - return; - } - // Otherwise, set the state back to `Waiting`. - state => e.state.set(state), - } - } - } -} - -impl Future for SignalListener { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut list = self.inner.lock(); - - let entry = match self.entry { - None => unreachable!("cannot poll a completed `SignalListener` future"), - Some(entry) => entry, - }; - let state = unsafe { &entry.as_ref().state }; - - // Do a dummy replace operation in order to take out the state. - match state.replace(State::Notified) { - State::Notified => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry); - drop(list); - self.entry = None; - return Poll::Ready(()); - } - State::Created => { - // If the listener was just created, put it in the `Polling` state. - state.set(State::Polling(cx.waker().clone())); - } - State::Polling(w) => { - // If the listener was in the `Pooling` state, keep it. - state.set(State::Polling(w)); - } - State::Waiting(_) => { - unreachable!("cannot poll and wait on `SignalListener` at the same time") - } - } - - Poll::Pending - } -} - -impl Drop for SignalListener { - fn drop(&mut self) { - // If this listener has never picked up a notification... - if let Some(entry) = self.entry.take() { - let mut list = self.inner.lock(); - - // But if a notification was delivered to it... - if list.remove(entry).is_notified() { - // Then pass it on to another active listener. - list.notify(false); - } - } - } -} - -/// A guard holding the linked list locked. -struct ListGuard<'a> { - /// A reference to `Signal`s inner state. - inner: &'a Inner, - - /// The actual guard that acquired the linked list. - guard: MutexGuard<'a, List>, -} - -impl Drop for ListGuard<'_> { - #[inline] - fn drop(&mut self) { - let list = &mut **self; - let mut flags = 0; - - // Set the `NOTIFIED` flag if there is at least one notified listener. - if list.len - list.notifiable > 0 { - flags |= NOTIFIED; - } - - // Set the `NOTIFIABLE` flag if there is at least one notifiable listener. - if list.notifiable > 0 { - flags |= NOTIFIABLE; - } - - self.inner.flags.store(flags, Ordering::Release); - } -} - -impl Deref for ListGuard<'_> { - type Target = List; - - #[inline] - fn deref(&self) -> &List { - &*self.guard - } -} - -impl DerefMut for ListGuard<'_> { - #[inline] - fn deref_mut(&mut self) -> &mut List { - &mut *self.guard - } -} - -/// The state of a listener. -enum State { - /// It has just been created. - Created, - - /// It has received a notification. - Notified, - - /// An async task is polling it. - Polling(Waker), - - /// A thread is blocked on it. - Waiting(Thread), -} - -impl State { - /// Returns `true` if this is the `Notified` state. - #[inline] - fn is_notified(&self) -> bool { - match self { - State::Notified => true, - State::Created | State::Polling(_) | State::Waiting(_) => false, - } - } -} - -/// An entry representing a registered listener. -struct Entry { - /// THe state of this listener. - state: Cell, - - /// Previous entry in the linked list. - prev: Cell>>, - - /// Next entry in the linked list. - next: Cell>>, -} - -/// A linked list of entries. -struct List { - /// First entry in the list. - head: Option>, - - /// Last entry in the list. - tail: Option>, - - /// Total number of entries in the list. - len: usize, - - /// Number of notifiable entries in the list. - /// - /// Notifiable entries are those that haven't been notified yet. - notifiable: usize, -} - -impl List { - /// Inserts a new entry into the list. - fn insert(&mut self) -> NonNull { - unsafe { - // Allocate an entry that is going to become the new tail. - let entry = NonNull::new_unchecked(Box::into_raw(Box::new(Entry { - state: Cell::new(State::Created), - prev: Cell::new(self.tail), - next: Cell::new(None), - }))); - - // Replace the tail with the new entry. - match mem::replace(&mut self.tail, Some(entry)) { - None => self.head = Some(entry), - Some(t) => t.as_ref().next.set(Some(entry)), - } - - // Bump the total count and the count of notifiable entries. - self.len += 1; - self.notifiable += 1; - - entry - } - } - - /// Removes an entry from the list and returns its state. - fn remove(&mut self, entry: NonNull) -> State { - unsafe { - let prev = entry.as_ref().prev.get(); - let next = entry.as_ref().next.get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => p.as_ref().next.set(next), - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => n.as_ref().prev.set(prev), - } - - // Deallocate and extract the state. - let entry = Box::from_raw(entry.as_ptr()); - let state = entry.state.into_inner(); - - // Update the counters. - if !state.is_notified() { - self.notifiable -= 1; - } - self.len -= 1; - - state - } - } - - /// Notifies an entry. - #[cold] - fn notify(&mut self, notify_all: bool) { - let mut entry = self.tail; - - // Iterate over the entries in the list. - while let Some(e) = entry { - let e = unsafe { e.as_ref() }; - - // Set the state of this entry to `Notified`. - let state = e.state.replace(State::Notified); - let is_notified = state.is_notified(); - - // Wake the task or unpark the thread. - match state { - State::Notified => {} - State::Created => {} - State::Polling(w) => w.wake(), - State::Waiting(t) => t.unpark(), - } - - // Update the count of notifiable entries. - if !is_notified { - self.notifiable -= 1; - } - - // If all entries need to be notified, go to the next one. - if notify_all { - entry = e.prev.get(); - } else { - break; - } - } - } -} - -/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. -#[inline] -fn full_fence() { - if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { - // HACK(stjepang): On x86 architectures there are two different ways of executing - // a `SeqCst` fence. - // - // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. - // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction. - // - // Both instructions have the effect of a full barrier, but empirical benchmarks have shown - // that the second one makes notifiying listeners a bit faster. - // - // The ideal solution here would be to use inline assembly, but we're instead creating a - // temporary atomic variable and compare-and-exchanging its value. No sane compiler to - // x86 platforms is going to optimize this away. - let a = AtomicUsize::new(0); - a.compare_and_swap(0, 1, Ordering::SeqCst); - } else { - atomic::fence(Ordering::SeqCst); - } -} diff --git a/piper/tests/chan.rs b/piper/tests/chan.rs deleted file mode 100644 index acd4242..0000000 --- a/piper/tests/chan.rs +++ /dev/null @@ -1,357 +0,0 @@ -#![cfg(feature = "unstable")] - -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use rand::{thread_rng, Rng}; - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn smoke() { - task::block_on(async { - let (s, r) = piper::chan(1); - - s.send(7).await; - assert_eq!(r.recv().await, Some(7)); - - s.send(8).await; - assert_eq!(r.recv().await, Some(8)); - - drop(s); - assert_eq!(r.recv().await, None); - }); - - task::block_on(async { - let (s, r) = piper::chan(10); - drop(r); - s.send(1).await; - }); -} - -#[test] -fn capacity() { - for i in 1..10 { - let (s, r) = channel::<()>(i); - assert_eq!(s.capacity(), i); - assert_eq!(r.capacity(), i); - } -} - -#[test] -fn len_empty_full() { - #![allow(clippy::cognitive_complexity)] - task::block_on(async { - let (s, r) = piper::chan(2); - - assert_eq!(s.len(), 0); - assert_eq!(s.is_empty(), true); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 0); - assert_eq!(r.is_empty(), true); - assert_eq!(r.is_full(), false); - - s.send(()).await; - - assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); - - s.send(()).await; - - assert_eq!(s.len(), 2); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), true); - assert_eq!(r.len(), 2); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), true); - - r.recv().await; - - assert_eq!(s.len(), 1); - assert_eq!(s.is_empty(), false); - assert_eq!(s.is_full(), false); - assert_eq!(r.len(), 1); - assert_eq!(r.is_empty(), false); - assert_eq!(r.is_full(), false); - }) -} - -#[test] -fn recv() { - task::block_on(async { - let (s, r) = piper::chan(100); - - task::spawn(async move { - assert_eq!(r.recv().await, Some(7)); - task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(8)); - task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(9)); - assert_eq!(r.recv().await, None); - }); - - task::sleep(ms(1500)).await; - s.send(7).await; - s.send(8).await; - s.send(9).await; - }) -} - -#[test] -fn send() { - task::block_on(async { - let (s, r) = piper::chan(1); - - task::spawn(async move { - s.send(7).await; - task::sleep(ms(1000)).await; - s.send(8).await; - task::sleep(ms(1000)).await; - s.send(9).await; - task::sleep(ms(1000)).await; - s.send(10).await; - }); - - task::sleep(ms(1500)).await; - assert_eq!(r.recv().await, Some(7)); - assert_eq!(r.recv().await, Some(8)); - assert_eq!(r.recv().await, Some(9)); - }) -} - -#[test] -fn recv_after_disconnect() { - task::block_on(async { - let (s, r) = piper::chan(100); - - s.send(1).await; - s.send(2).await; - s.send(3).await; - - drop(s); - - assert_eq!(r.recv().await, Some(1)); - assert_eq!(r.recv().await, Some(2)); - assert_eq!(r.recv().await, Some(3)); - assert_eq!(r.recv().await, None); - }) -} - -#[test] -fn len() { - const COUNT: usize = 25_000; - const CAP: usize = 1000; - - task::block_on(async { - let (s, r) = piper::chan(CAP); - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - for _ in 0..CAP / 10 { - for i in 0..50 { - s.send(i).await; - assert_eq!(s.len(), i + 1); - } - - for i in 0..50 { - r.recv().await; - assert_eq!(r.len(), 50 - i - 1); - } - } - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - for i in 0..CAP { - s.send(i).await; - assert_eq!(s.len(), i + 1); - } - - for _ in 0..CAP { - r.recv().await.unwrap(); - } - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - - let child = task::spawn({ - let r = r.clone(); - async move { - for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); - let len = r.len(); - assert!(len <= CAP); - } - } - }); - - for i in 0..COUNT { - s.send(i).await; - let len = s.len(); - assert!(len <= CAP); - } - - child.await; - - assert_eq!(s.len(), 0); - assert_eq!(r.len(), 0); - }) -} - -#[test] -fn disconnect_wakes_receiver() { - task::block_on(async { - let (s, r) = channel::<()>(1); - - let child = task::spawn(async move { - assert_eq!(r.recv().await, None); - }); - - task::sleep(ms(1000)).await; - drop(s); - - child.await; - }) -} - -#[test] -fn spsc() { - const COUNT: usize = 100_000; - - task::block_on(async { - let (s, r) = piper::chan(3); - - let child = task::spawn(async move { - for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); - } - assert_eq!(r.recv().await, None); - }); - - for i in 0..COUNT { - s.send(i).await; - } - drop(s); - - child.await; - }) -} - -#[test] -fn mpmc() { - const COUNT: usize = 25_000; - const TASKS: usize = 4; - - task::block_on(async { - let (s, r) = channel::(3); - let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); - let v = Arc::new(v); - - let mut tasks = Vec::new(); - - for _ in 0..TASKS { - let r = r.clone(); - let v = v.clone(); - tasks.push(task::spawn(async move { - for _ in 0..COUNT { - let n = r.recv().await.unwrap(); - v[n].fetch_add(1, Ordering::SeqCst); - } - })); - } - - for _ in 0..TASKS { - let s = s.clone(); - tasks.push(task::spawn(async move { - for i in 0..COUNT { - s.send(i).await; - } - })); - } - - for t in tasks { - t.await; - } - - for c in v.iter() { - assert_eq!(c.load(Ordering::SeqCst), TASKS); - } - }); -} - -#[test] -fn oneshot() { - const COUNT: usize = 10_000; - - task::block_on(async { - for _ in 0..COUNT { - let (s, r) = piper::chan(1); - - let c1 = task::spawn(async move { r.recv().await.unwrap() }); - let c2 = task::spawn(async move { s.send(0).await }); - - c1.await; - c2.await; - } - }) -} - -#[test] -fn drops() { - const RUNS: usize = 100; - - static DROPS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Debug, PartialEq)] - struct DropCounter; - - impl Drop for DropCounter { - fn drop(&mut self) { - DROPS.fetch_add(1, Ordering::SeqCst); - } - } - - let mut rng = thread_rng(); - - for _ in 0..RUNS { - task::block_on(async { - let steps = rng.gen_range(0, 10_000); - let additional = rng.gen_range(0, 50); - - DROPS.store(0, Ordering::SeqCst); - let (s, r) = channel::(50); - - let child = task::spawn({ - let r = r.clone(); - async move { - for _ in 0..steps { - r.recv().await.unwrap(); - } - } - }); - - for _ in 0..steps { - s.send(DropCounter).await; - } - - child.await; - - for _ in 0..additional { - s.send(DropCounter).await; - } - - assert_eq!(DROPS.load(Ordering::SeqCst), steps); - drop(s); - drop(r); - assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); - }) - } -} diff --git a/src/async_io.rs b/src/async_io.rs index 5d95340..945a25f 100644 --- a/src/async_io.rs +++ b/src/async_io.rs @@ -342,15 +342,15 @@ impl Async { let _ = socket.connect(&addr.into()); let stream = Async::new(socket.into_tcp_stream())?; - // Waits until the stream becomes writable. - let wait_writable = |mut stream: &TcpStream| match stream.write(&[]) { + // Returns the peer address, or `WouldBlock` if not connected. + let peer_addr = |stream: &TcpStream| match stream.peer_addr() { Err(err) if err.kind() == io::ErrorKind::NotConnected => { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } - res => res.map(|_| ()), + res => res, }; - // The stream becomes writable when connected. - stream.with(|io| wait_writable(io)).await?; + // Wait until there is a peer address. + stream.with(|io| peer_addr(io)).await?; Ok(stream) } @@ -651,15 +651,15 @@ impl Async { let _ = socket.connect(&socket2::SockAddr::unix(path)?); let stream = Async::new(socket.into_unix_stream())?; - // Waits until the stream becomes writable. - let wait_writable = |mut stream: &UnixStream| match stream.write(&[]) { + // Returns the peer address, or `WouldBlock` if not connected. + let peer_addr = |stream: &UnixStream| match stream.peer_addr() { Err(err) if err.kind() == io::ErrorKind::NotConnected => { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } - res => res.map(|_| ()), + res => res, }; - // The stream becomes writable when connected. - stream.with(|io| wait_writable(io)).await?; + // Wait until there is a peer address. + stream.with(|io| peer_addr(io)).await?; Ok(stream) } diff --git a/src/reactor.rs b/src/reactor.rs index be6b02b..fd5415e 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -45,17 +45,17 @@ pub(crate) struct Reactor { sys: sys::Reactor, /// Registered sources. - sources: piper::Lock>>, + sources: piper::Mutex>>, /// Temporary storage for I/O events when polling the reactor. - events: piper::Mutex, + events: piper::Lock, /// An ordered map of registered timers. /// /// Timers are in the order in which they fire. The `u64` in this type is a unique timer ID /// used to distinguish timers that fire at the same time. The `Waker` represents the task /// awaiting the timer. - timers: piper::Lock>, + timers: piper::Mutex>, /// An I/O event that is triggered when a new earliest timer is registered. /// @@ -72,9 +72,9 @@ impl Reactor { pub fn get() -> &'static Reactor { static REACTOR: Lazy = Lazy::new(|| Reactor { sys: sys::Reactor::new().expect("cannot initialize I/O event notification"), - sources: piper::Lock::new(Slab::new()), - events: piper::Mutex::new(sys::Events::new()), - timers: piper::Lock::new(BTreeMap::new()), + sources: piper::Mutex::new(Slab::new()), + events: piper::Lock::new(sys::Events::new()), + timers: piper::Mutex::new(BTreeMap::new()), event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")), }); &REACTOR @@ -110,7 +110,7 @@ impl Reactor { let source = Arc::new(Source { raw, key: vacant.key(), - wakers: piper::Lock::new(Vec::new()), + wakers: piper::Mutex::new(Vec::new()), tick: AtomicU64::new(0), }); self.sys.register(raw, source.key)?; @@ -175,7 +175,7 @@ impl Reactor { /// Polls the reactor for I/O events and wakes up tasks. pub(crate) struct ReactorLock<'a> { reactor: &'a Reactor, - events: piper::MutexGuard<'a, sys::Events>, + events: piper::LockGuard, } impl ReactorLock<'_> { @@ -255,7 +255,7 @@ pub(crate) struct Source { key: usize, /// A list of wakers representing tasks interested in events on this source. - wakers: piper::Lock>, + wakers: piper::Mutex>, /// Incremented on every I/O notification - this is only used for synchronization. tick: AtomicU64, diff --git a/src/timer.rs b/src/timer.rs index 70fb14e..d2fb5fd 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -8,7 +8,22 @@ use crate::reactor::Reactor; /// Fires at the chosen point in time. /// -/// TODO +/// # Examples +/// +/// Sleep for 1 second: +/// +/// ``` +/// use smol::Timer; +/// use std::time::Duration; +/// +/// async fn sleep(dur: Duration) { +/// Timer::after(dur).await; +/// } +/// +/// # smol::run(async { +/// sleep(Duration::from_secs(1)).await; +/// # }); +/// ``` #[derive(Debug)] pub struct Timer { /// A unique ID for this timer.