Lots of changes

This commit is contained in:
Stjepan Glavina 2020-04-21 17:51:07 +02:00
parent bc0acdd308
commit a4515749a8
41 changed files with 168 additions and 2898 deletions

1
.gitignore vendored
View File

@ -1,3 +1,2 @@
target/
Cargo.lock
socket

View File

@ -11,7 +11,7 @@ async-task = "3.0.0"
crossbeam = "0.7.3"
futures = { version = "0.3.4", default-features = false, features = ["std"] }
once_cell = "1.3.1"
piper = { path = "piper" }
piper = { git = "https://github.com/stjepang/piper.git" }
scoped-tls-hkt = "0.1.2"
slab = "0.4.2"
socket2 = { version = "0.3.12", features = ["pair", "unix"] }

View File

@ -1,30 +1,26 @@
# smol
A small and fast async runtime.
A small and fast async runtime in Rust.
https://discord.gg/5RxMVnr
## Goals
### Goals
* Small - Around 1500 lines of code.
* Fast - On par with async-std and Tokio.
* Safe - Written in 100% safe Rust.
* Complete - Fully featured and ready for production.
* Documented - Simple code, easy to understand and modify.
* Lightweight - Small dependencies, relies on epoll/kqueue/wepoll.
* Small - 1500 lines of code.
* Simple - Well-documented, easy to maintain.
* Fast - Based on [async-task], [crossbeam], and [piper].
* Safe - Just 1 line of unsafe code for Windows support.
* Portable - Linux, Android, macOS, iOS, Windows, FreeBSD, OpenBSD, NetBSD, DragonFly BSD.
## Features
### Features
* Executor - Configurable threads, work stealing, supports non-send futures.
* Blocking - Thread pool for isolating blocking code.
* Networking - TCP, UDP, Unix domain sockets, and custom files/sockets.
* Process - Spawns child processes and interacts with their I/O.
* Files - Filesystem manipulation operations.
* Stdio - Asynchronous stdin, stdout, and stderr.
* Async I/O - TCP, UDP, Unix domain sockets, and custom FDs/sockets.
* Thread-local executor - For non-send futures.
* Work-stealing executor - adapts to uneven workload.
* Blocking executor - For files, processes, and standard I/O.
* Timer - Efficient userspace timers.
# Documentation
### Documentation
```
cargo doc --document-private-items --no-deps --open

View File

@ -20,7 +20,7 @@ use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor};
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use piper::{Lock, Shared};
use piper::{Arc, Mutex};
use smol::{Async, Task};
/// Serves a request and returns a response.
@ -50,13 +50,13 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
// Spawn a background task serving this connection.
let task = match &tls {
None => {
let stream = Shared::new(stream);
let stream = Arc::new(stream);
Task::spawn(async move { async_h1::accept(&host, stream, serve).await })
}
Some(tls) => {
// In case of HTTPS, establish a secure TLS connection first.
let stream = tls.accept(stream).await?;
let stream = Shared::new(Lock::new(stream));
let stream = Arc::new(Mutex::new(stream));
Task::spawn(async move { async_h1::accept(&host, stream, serve).await })
}
};

View File

@ -19,6 +19,42 @@ use futures::prelude::*;
use smol::Async;
fn main() -> io::Result<()> {
for _ in 0..1 {
std::thread::spawn(|| smol::run(future::pending::<()>()));
}
smol::Task::spawn(async {
// Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect("127.0.0.1:6000").await.unwrap();
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
// Intro messages.
println!("Connected to {}", stream.get_ref().peer_addr().unwrap());
println!("My nickname: {}", stream.get_ref().local_addr().unwrap());
println!("Type a message and hit enter!\n");
// References to `Async<T>` also implement `AsyncRead` and `AsyncWrite`.
let stream_r = &stream;
let mut stream_w = &stream;
smol::Task::local(async {
let mut cnt = 0;
loop {
cnt += 1;
eprintln!("BEFORE TIMER {}", cnt);
smol::Timer::after(std::time::Duration::from_millis(20)).await;
eprintln!("AFTER TIMER");
}
}).detach();
// Wait until the standard input is closed or the connection is closed.
futures::select! {
_ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"),
_ = io::copy(stream_r, &mut stdout).fuse() => println!("Server disconnected!"),
}
}).detach();
smol::run(async {
// Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect("127.0.0.1:6000").await?;
@ -34,6 +70,16 @@ fn main() -> io::Result<()> {
let stream_r = &stream;
let mut stream_w = &stream;
smol::Task::local(async {
let mut cnt = 0;
loop {
cnt += 1;
eprintln!("BEFORE TIMER {}", cnt);
smol::Timer::after(std::time::Duration::from_millis(20)).await;
eprintln!("AFTER TIMER");
}
}).detach();
// Wait until the standard input is closed or the connection is closed.
futures::select! {
_ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"),

View File

@ -17,10 +17,10 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
use futures::io::{self, BufReader};
use futures::prelude::*;
use piper::{Receiver, Sender, Shared};
use piper::{Receiver, Sender, Arc};
use smol::{Async, Task};
type Client = Shared<Async<TcpStream>>;
type Client = Arc<Async<TcpStream>>;
enum Event {
Join(SocketAddr, Client),
@ -53,7 +53,8 @@ async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
// Send the event to all active clients.
for stream in map.values_mut() {
stream.write_all(output.as_bytes()).await?;
// Ignore errors because the client might disconnect at any point.
let _ = stream.write_all(output.as_bytes()).await;
}
}
Ok(())
@ -87,7 +88,7 @@ fn main() -> io::Result<()> {
loop {
// Accept the next connection.
let (stream, addr) = listener.accept().await?;
let client = Shared::new(stream);
let client = Arc::new(stream);
let sender = sender.clone();
// Spawn a background task reading messages from the client.

View File

@ -1,12 +0,0 @@
use std::time::{Duration, Instant};
use async_std::task::sleep;
fn main() {
smol::run(async {
let start = Instant::now();
println!("Sleeping...");
sleep(Duration::from_secs(1)).await;
println!("Woke up after {:?}", start.elapsed());
})
}

View File

@ -1,11 +0,0 @@
use anyhow::Result;
fn main() -> Result<()> {
smol::run(async {
let resp = reqwest::get("https://www.rust-lang.org").await?;
let body = resp.text().await?;
println!("{:?}", body);
Ok(())
})
}

View File

@ -1,7 +0,0 @@
fn main() -> http_types::Result<()> {
smol::run(async {
let body = surf::get("https://www.rust-lang.org").recv_string().await?;
println!("{:?}", body);
Ok(())
})
}

View File

@ -1,12 +0,0 @@
use std::time::{Duration, Instant};
use tokio::time::delay_for;
fn main() {
smol::run(async {
let start = Instant::now();
println!("Sleeping...");
delay_for(Duration::from_secs(1)).await;
println!("Woke up after {:?}", start.elapsed());
})
}

View File

@ -1,3 +1,4 @@
// TODO: document
// Uses the `ctrlc` crate to set a handler that sends a message
// through an async channel.

View File

@ -1,3 +1,4 @@
// TODO: document
#[cfg(target_os = "linux")]
fn main() -> std::io::Result<()> {
use std::ffi::OsString;

View File

@ -1,3 +1,4 @@
// TODO: document
#[cfg(target_os = "linux")]
fn main() -> std::io::Result<()> {
use std::io;

View File

@ -0,0 +1,34 @@
// TODO: document
use std::time::{Duration, Instant};
use anyhow::{Error, Result};
fn main() -> Result<()> {
smol::run(async {
// Sleep using async-std.
let start = Instant::now();
println!("Sleeping using async-std...");
async_std::task::sleep(Duration::from_secs(1)).await;
println!("Woke up after {:?}", start.elapsed());
// Sleep using tokio.
let start = Instant::now();
println!("Sleeping using tokio...");
tokio::time::delay_for(Duration::from_secs(1)).await;
println!("Woke up after {:?}", start.elapsed());
// Make a GET request using surf.
let body = surf::get("https://www.rust-lang.org")
.recv_string()
.await
.map_err(Error::msg)?;
println!("Body from surf: {:?}", body);
// Make a GET request using reqwest.
let resp = reqwest::get("https://www.rust-lang.org").await?;
let body = resp.text().await?;
println!("Body from reqwest: {:?}", body);
Ok(())
})
}

View File

@ -1,3 +1,4 @@
// TODO: document
use std::env;
use std::process::{Command, Stdio};

View File

@ -1,3 +1,4 @@
// TODO: document
use std::env;
use std::process::Command;

View File

@ -1,3 +1,4 @@
// TODO: document
//! Lists files in a directory given as an argument.
use std::env;

View File

@ -1,3 +1,4 @@
// TODO: document
//! Prints a file given as an argument to stdout.
use std::env;

View File

@ -1,3 +1,4 @@
// TODO: document
use std::net::TcpStream;
use futures::io;

View File

@ -1,3 +1,4 @@
// TODO: document
//! TCP echo server.
//!
//! To send messages, do:

View File

@ -1,3 +1,4 @@
// TODO: document
use std::time::{Duration, Instant};
use smol::Timer;

View File

@ -1,3 +1,5 @@
// TODO: document
use std::io;
use std::time::Duration;
use anyhow::{bail, Result};
@ -6,23 +8,23 @@ use futures::io::BufReader;
use futures::prelude::*;
use smol::Timer;
async fn timeout<T>(dur: Duration, f: impl Future<Output = T>) -> Result<T> {
futures::pin_mut!(f);
match select(f, Timer::after(dur)).await {
Either::Left((out, _)) => Ok(out),
Either::Right(_) => bail!("timed out"),
async fn timeout<T>(dur: Duration, f: impl Future<Output = io::Result<T>>) -> io::Result<T> {
futures::select! {
out = f.fuse() => out,
_ = Timer::after(dur).fuse() => {
Err(io::Error::from(io::ErrorKind::TimedOut))
}
}
}
fn main() -> Result<()> {
smol::run(async {
let mut stdin = BufReader::new(smol::reader(std::io::stdin()));
let mut line = String::new();
let mut stdin = BufReader::new(smol::reader(std::io::stdin()));
let dur = Duration::from_secs(5);
timeout(dur, stdin.read_line(&mut line)).await??;
timeout(Duration::from_secs(5), stdin.read_line(&mut line)).await?;
println!("Line: {}", line);
Ok(())
})
}

View File

@ -1,10 +1,11 @@
// TODO: document
use std::net::TcpStream;
use anyhow::Result;
use async_native_tls::{Certificate, TlsConnector};
use futures::io;
use futures::prelude::*;
use piper::Lock;
use piper::Mutex;
use smol::Async;
fn main() -> Result<()> {
@ -21,7 +22,7 @@ fn main() -> Result<()> {
let stream = tls.connect("127.0.0.1", stream).await?;
println!("Connected to {}", stream.get_ref().get_ref().peer_addr()?);
let stream = Lock::new(stream);
let stream = Mutex::new(stream);
future::try_join(
io::copy(stdin, &mut &stream),

View File

@ -1,14 +1,15 @@
// TODO: document
use std::net::{TcpListener, TcpStream};
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use futures::io;
use piper::Lock;
use piper::Mutex;
use smol::{Async, Task};
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
println!("Copying");
let stream = Lock::new(stream);
let stream = Mutex::new(stream);
io::copy(&stream, &mut &stream).await?;
Ok(())
}

View File

@ -1,3 +1,4 @@
// TODO: document
#[cfg(unix)]
fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream;

View File

@ -1,3 +1,4 @@
// TODO: document
use std::collections::{HashSet, VecDeque};
use anyhow::Result;

View File

@ -1,3 +1,4 @@
// TODO: document
use std::net::TcpStream;
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,3 +1,4 @@
// TODO: document
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};

View File

@ -1,3 +1,4 @@
// TODO: document
#[cfg(windows)]
fn main() -> std::io::Result<()> {
use std::path::PathBuf;

View File

@ -1,14 +0,0 @@
[package]
name = "piper"
version = "0.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "WIP"
license = "MIT OR Apache-2.0"
[dependencies]
crossbeam-utils = "0.7.0"
futures = { version = "0.3.4", default-features = false, features = ["std"] }
[dev-dependencies]
smol = { path = ".." }

View File

@ -1,911 +0,0 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::isize;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::process;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use crossbeam_utils::Backoff;
use futures::future;
use futures::stream::Stream;
use crate::signal::{Signal, SignalListener};
/// Creates a bounded multi-producer multi-consumer channel.
///
/// This channel has a buffer that can hold at most `cap` messages at a time.
///
/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it
/// becomes closed. Receive operations on a closed and empty channel return `None` instead of
/// trying to await a message.
///
/// # Panics
///
/// If `cap` is zero, this function will panic.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::time::Duration;
///
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s, r) = channel(1);
///
/// // This call returns immediately because there is enough space in the channel.
/// s.send(1).await;
///
/// task::spawn(async move {
/// // This call will have to wait because the channel is full.
/// // It will be able to complete only after the first message is received.
/// s.send(2).await;
/// });
///
/// task::sleep(Duration::from_secs(1)).await;
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// #
/// # })
/// ```
pub fn chan<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel::with_capacity(cap));
let s = Sender {
channel: channel.clone(),
};
let r = Receiver {
channel,
next_listener: None,
};
(s, r)
}
/// The sending side of a channel.
///
/// This struct is created by the [`chan`] function. See its documentation for more.
///
/// [`chan`]: fn.chan.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s1, r) = channel(100);
/// let s2 = s1.clone();
///
/// task::spawn(async move { s1.send(1).await });
/// task::spawn(async move { s2.send(2).await });
///
/// let msg1 = r.recv().await.unwrap();
/// let msg2 = r.recv().await.unwrap();
///
/// assert_eq!(msg1 + msg2, 3);
/// #
/// # })
/// ```
pub struct Sender<T> {
/// The inner channel.
channel: Arc<Channel<T>>,
}
impl<T> Sender<T> {
/// Sends a message into the channel.
///
/// If the channel is full, this method will wait until there is space in the channel.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s, r) = channel(1);
///
/// task::spawn(async move {
/// s.send(1).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// assert_eq!(r.recv().await, None);
/// #
/// # })
/// ```
pub async fn send(&self, msg: T) {
self.channel.send(msg).await
}
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// use async_std::sync::channel;
///
/// let (s, _) = channel::<i32>(5);
/// assert_eq!(s.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
if self.channel.handoff.is_some() {
0
} else {
self.channel.cap
}
}
/// Returns `true` if the channel is empty.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(s.is_empty());
/// s.send(0).await;
/// assert!(!s.is_empty());
/// #
/// # })
/// ```
pub fn is_empty(&self) -> bool {
self.channel.is_empty()
}
/// Returns `true` if the channel is full.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(!s.is_full());
/// s.send(0).await;
/// assert!(s.is_full());
/// #
/// # })
/// ```
pub fn is_full(&self) -> bool {
self.channel.is_full()
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(2);
/// assert_eq!(s.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(s.len(), 2);
/// #
/// # })
/// ```
pub fn len(&self) -> usize {
self.channel.len()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// Decrement the sender count and disconnect the channel if it drops down to zero.
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.disconnect();
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of sender clones are leaked.
if count > isize::MAX as usize {
process::abort();
}
Sender {
channel: self.channel.clone(),
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
/// The receiving side of a channel.
///
/// This type receives messages by calling `recv`. But it also implements the [`Stream`] trait,
/// which means it can act as an asynchronous iterator. This struct is created by the [`chan`]
/// function. See its documentation for more.
///
/// [`chan`]: fn.chan.html
/// [`Stream`]: ../stream/trait.Stream.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::time::Duration;
///
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s, r) = channel(100);
///
/// task::spawn(async move {
/// s.send(1).await;
/// task::sleep(Duration::from_secs(1)).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
/// #
/// # })
/// ```
pub struct Receiver<T> {
/// The inner channel.
channel: Arc<Channel<T>>,
/// The key for this receiver in the `channel.next_ops` set. TODO
next_listener: Option<SignalListener>,
}
impl<T> Receiver<T> {
/// TODO
pub fn try_recv(&self) -> Option<T> {
self.channel.try_recv().ok()
}
/// Receives a message from the channel.
///
/// If the channel is empty and still has senders, this method will wait until a message is
/// sent into the channel or until all senders get dropped.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
/// use async_std::task;
///
/// let (s, r) = channel(1);
///
/// task::spawn(async move {
/// s.send(1).await;
/// s.send(2).await;
/// });
///
/// assert_eq!(r.recv().await, Some(1));
/// assert_eq!(r.recv().await, Some(2));
/// assert_eq!(r.recv().await, None);
/// #
/// # })
/// ```
pub async fn recv(&self) -> Option<T> {
self.channel.recv().await
}
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// use async_std::sync::channel;
///
/// let (_, r) = channel::<i32>(5);
/// assert_eq!(r.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
if self.channel.handoff.is_some() {
0
} else {
self.channel.cap
}
}
/// Returns `true` if the channel is empty.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(r.is_empty());
/// s.send(0).await;
/// assert!(!r.is_empty());
/// #
/// # })
/// ```
pub fn is_empty(&self) -> bool {
self.channel.is_empty()
}
/// Returns `true` if the channel is full.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(1);
///
/// assert!(!r.is_full());
/// s.send(0).await;
/// assert!(r.is_full());
/// #
/// # })
/// ```
pub fn is_full(&self) -> bool {
self.channel.is_full()
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::sync::channel;
///
/// let (s, r) = channel(2);
/// assert_eq!(r.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(r.len(), 2);
/// #
/// # })
/// ```
pub fn len(&self) -> usize {
self.channel.len()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and disconnect the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.disconnect();
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of receiver clones are leaked.
if count > isize::MAX as usize {
process::abort();
}
Receiver {
channel: self.channel.clone(),
next_listener: None,
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let poll = loop {
match self.channel.try_recv() {
Ok(msg) => break Poll::Ready(Some(msg)),
Err(TryRecvError::Disconnected) => break Poll::Ready(None),
Err(TryRecvError::Empty) => {}
}
self.next_listener = Some(self.channel.next_ops.listen());
match self.channel.try_recv() {
Ok(msg) => break Poll::Ready(Some(msg)),
Err(TryRecvError::Disconnected) => break Poll::Ready(None),
Err(TryRecvError::Empty) => {}
}
match Pin::new(self.next_listener.as_mut().unwrap()).poll(cx) {
Poll::Ready(()) => {}
Poll::Pending => return Poll::Pending,
}
};
if self.next_listener.take().is_some() {
self.channel.next_ops.notify_all();
}
poll
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
}
}
/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
/// The message in this slot.
msg: UnsafeCell<T>,
}
/// Bounded channel based on a preallocated array.
struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: AtomicUsize,
/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: AtomicUsize,
/// The buffer holding slots.
buffer: *mut Slot<T>,
/// The channel capacity.
cap: usize,
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
/// If this bit is set in the tail, that means either all senders were dropped or all receivers
/// were dropped.
mark_bit: usize,
/// Send operations waiting while the channel is full.
send_ops: Signal,
/// TODO
handoff: Option<Signal>,
/// Receive operations waiting while the channel is empty and not disconnected.
recv_ops: Signal,
/// Stream operations while the channel is empty and not disconnected.
next_ops: Signal,
/// The number of currently active `Sender`s.
sender_count: AtomicUsize,
/// The number of currently active `Receivers`s.
receiver_count: AtomicUsize,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for Channel<T> {}
unsafe impl<T: Send> Sync for Channel<T> {}
impl<T> Unpin for Channel<T> {}
impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
fn with_capacity(cap: usize) -> Self {
let handoff = if cap == 0 { Some(Signal::new()) } else { None };
let cap = cap.max(1);
// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let one_lap = mark_bit * 2;
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let head = 0;
// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let tail = 0;
// Allocate a buffer of `cap` slots.
let buffer = {
let mut v = Vec::<Slot<T>>::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
ptr
};
// Initialize stamps in the slots.
for i in 0..cap {
unsafe {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
let slot = buffer.add(i);
ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
}
}
Channel {
buffer,
cap,
one_lap,
mark_bit,
head: AtomicUsize::new(head),
tail: AtomicUsize::new(tail),
send_ops: Signal::new(),
handoff,
recv_ops: Signal::new(),
next_ops: Signal::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
_marker: PhantomData,
}
}
/// Attempts to send a message.
fn try_send(&self, msg: T) -> Result<usize, TrySendError<T>> {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
// Extract mark bit from the tail and unset it.
//
// If the mark bit was set (which means all receivers have been dropped), we will still
// send the message into the channel if there is enough capacity. The message will get
// dropped when the channel is dropped (which means when all senders are also dropped).
let mark_bit = tail & self.mark_bit;
tail ^= mark_bit;
// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail | mark_bit,
new_tail | mark_bit,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Write the message into the slot and update the stamp.
unsafe { slot.msg.get().write(msg) };
let stamp = tail + 1;
slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked receive operation.
self.recv_ops.notify_one();
// Wake all blocked streams.
self.next_ops.notify_all();
return Ok(stamp);
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the channel is full.
// Check if the channel is disconnected.
if mark_bit != 0 {
return Err(TrySendError::Disconnected(msg));
} else {
return Err(TrySendError::Full(msg));
}
}
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
async fn send(&self, mut msg: T) {
let mut listener = None;
let stamp = loop {
match self.try_send(msg) {
Ok(stamp) => break stamp,
Err(TrySendError::Disconnected(_)) => return future::pending().await,
Err(TrySendError::Full(m)) => msg = m,
}
match listener.take() {
None => listener = Some(self.send_ops.listen()),
Some(w) => {
w.await;
if self.cap > 1 {
self.send_ops.notify_one();
}
}
}
};
if let Some(h) = &self.handoff {
let mut listener = None;
while unsafe { &*self.buffer }.stamp.load(Ordering::SeqCst) != stamp {
match listener.take() {
None => listener = Some(h.listen()),
Some(w) => w.await,
}
}
}
}
/// Attempts to receive a message.
fn try_recv(&self) -> Result<T, TryRecvError> {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
if head + 1 == stamp {
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the head.
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Read the message from the slot and update the stamp.
let msg = unsafe { slot.msg.get().read() };
let stamp = head.wrapping_add(self.one_lap);
slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked send operation.
self.send_ops.notify_one();
// Notify a send operation waiting for handoff.
if let Some(h) = &self.handoff {
h.notify_all();
}
return Ok(msg);
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if (tail & !self.mark_bit) == head {
// If the channel is disconnected...
if tail & self.mark_bit != 0 {
return Err(TryRecvError::Disconnected);
} else {
// Otherwise, the receive operation is not ready.
return Err(TryRecvError::Empty);
}
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
async fn recv(&self) -> Option<T> {
let mut listener = None;
loop {
match self.try_recv() {
Ok(msg) => return Some(msg),
Err(TryRecvError::Disconnected) => return None,
Err(TryRecvError::Empty) => {}
}
match listener.take() {
None => listener = Some(self.recv_ops.listen()),
Some(w) => {
w.await;
if self.cap > 1 {
self.recv_ops.notify_one();
}
}
}
}
}
/// Returns the current number of messages inside the channel.
fn len(&self) -> usize {
loop {
// Load the tail, then load the head.
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap
};
}
}
}
/// Returns `true` if the channel is empty.
fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
// Is the tail equal to the head?
//
// Note: If the head changes just before we load the tail, that means there was a moment
// when the channel was not empty, so it is safe to just return `false`.
(tail & !self.mark_bit) == head
}
/// Returns `true` if the channel is full.
fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// Is the head lagging one lap behind tail?
//
// Note: If the tail changes just before we load the head, that means there was a moment
// when the channel was not full, so it is safe to just return `false`.
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
}
/// Disconnects the channel and wakes up all blocked operations.
fn disconnect(&self) {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
// Notify everyone blocked on this channel.
self.send_ops.notify_all();
self.recv_ops.notify_all();
self.next_ops.notify_all();
}
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
// Loop over all slots that hold a message and drop them.
for i in 0..self.len() {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i
} else {
hix + i - self.cap
};
unsafe {
self.buffer.add(index).drop_in_place();
}
}
// Finally, deallocate the buffer, but don't run any destructors.
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
/// An error returned from the `try_send()` method.
enum TrySendError<T> {
/// The channel is full but not disconnected.
Full(T),
/// The channel is full and disconnected.
Disconnected(T),
}
/// An error returned from the `try_recv()` method.
enum TryRecvError {
/// The channel is empty but not disconnected.
Empty,
/// The channel is empty and disconnected.
Disconnected,
}

View File

@ -1,16 +0,0 @@
//! Asynchronous pipes, channels, mutexes, and other primitives.
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
mod chan;
mod lock;
mod mutex;
mod pipe;
mod shared;
mod signal;
pub use chan::{chan, Receiver, Sender};
pub use lock::{Lock, LockGuard};
pub use mutex::{Mutex, MutexGuard};
pub use pipe::{pipe, Reader, Writer};
pub use shared::Shared;

View File

@ -1,293 +0,0 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use crate::signal::Signal;
use crossbeam_utils::Backoff;
use futures::io::{self, AsyncRead, AsyncWrite};
/// A lock that implements async I/O traits.
///
/// This is a blocking mutex that adds the following impls:
///
/// - `impl<T> AsyncRead for Lock<T> where &T: AsyncRead + Unpin {}`
/// - `impl<T> AsyncRead for &Lock<T> where &T: AsyncRead + Unpin {}`
/// - `impl<T> AsyncWrite for Lock<T> where &T: AsyncWrite + Unpin {}`
/// - `impl<T> AsyncWrite for &Lock<T> where &T: AsyncWrite + Unpin {}`
///
/// This lock is ensures fairness by handling lock operations in the first-in first-out order.
///
/// While primarily designed for wrapping async I/O objects, this lock can also be used as a
/// regular blocking mutex. It's not quite as efficient as [`parking_lot::Mutex`], but it's still
/// an improvement over [`std::sync::Mutex`].
///
/// [`parking_lot::Mutex`]: https://docs.rs/parking_lot/0.10.0/parking_lot/type.Mutex.html
/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// use futures::io;
/// use futures::prelude::*;
/// use piper::Lock;
///
/// // Reads data from a stream and echoes it back.
/// async fn echo(stream: impl AsyncRead + AsyncWrite + Unpin) -> io::Result<u64> {
/// let stream = Lock::new(stream);
/// io::copy(&stream, &mut &stream).await
/// }
/// ```
pub struct Lock<T> {
/// Set to `true` when the lock is acquired by a `LockGuard`.
locked: AtomicBool,
/// Lock operations waiting for the lock to get unlocked.
lock_ops: Signal,
/// The value inside the lock.
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}
impl<T> Lock<T> {
/// Creates a new lock.
///
/// # Examples
///
/// ```
/// use piper::Lock;
///
/// let lock = Lock::new(10);
/// ```
pub fn new(data: T) -> Lock<T> {
Lock {
locked: AtomicBool::new(false),
lock_ops: Signal::new(),
data: UnsafeCell::new(data),
}
}
/// Acquires the lock, blocking the current thread until it is able to do so.
///
/// Returns a guard that releases the lock when dropped.
///
/// # Examples
///
/// ```
/// use piper::Lock;
///
/// let lock = Lock::new(10);
/// let guard = lock.lock();
/// assert_eq!(*guard, 10);
/// ```
pub fn lock(&self) -> LockGuard<'_, T> {
loop {
// Try locking the lock.
let backoff = Backoff::new();
loop {
if let Some(guard) = self.try_lock() {
return guard;
}
if backoff.is_completed() {
break;
}
backoff.snooze();
}
// Start watching for notifications and try locking again.
let l = self.lock_ops.listen();
if let Some(guard) = self.try_lock() {
return guard;
}
l.wait();
}
}
/// Attempts to acquire the lock.
///
/// If the lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the lock when dropped.
///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
///
/// # Examples
///
/// ```
/// use piper::Lock;
///
/// let lock = Lock::new(10);
/// if let Ok(guard) = lock.try_lock() {
/// assert_eq!(*guard, 10);
/// }
/// # ;
/// ```
#[inline]
pub fn try_lock(&self) -> Option<LockGuard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Ordering::Acquire) {
Some(LockGuard(self))
} else {
None
}
}
/// Consumes the lock, returning the underlying data.
///
/// # Examples
///
/// ```
/// use piper::Lock;
///
/// let lock = Lock::new(10);
/// assert_eq!(lock.into_inner(), 10);
/// ```
pub fn into_inner(self) -> T {
self.data.into_inner()
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the lock mutably, no actual locking takes place -- the mutable
/// borrow statically guarantees the lock is not already acquired.
///
/// # Examples
///
/// ```
/// use piper::Lock;
///
/// let mut lock = Lock::new(0);
/// *lock.get_mut() = 10;
/// assert_eq!(*lock.lock(), 10);
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: fmt::Debug> fmt::Debug for Lock<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_lock() {
None => f.debug_struct("Lock").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("Lock").field("data", &&*guard).finish(),
}
}
}
impl<T> From<T> for Lock<T> {
fn from(val: T) -> Lock<T> {
Lock::new(val)
}
}
impl<T: Default> Default for Lock<T> {
fn default() -> Lock<T> {
Lock::new(Default::default())
}
}
impl<T: AsyncRead + Unpin> AsyncRead for Lock<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read(cx, buf)
}
}
impl<T: AsyncRead + Unpin> AsyncRead for &Lock<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read(cx, buf)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for Lock<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_close(cx)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for &Lock<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_close(cx)
}
}
/// A guard that releases the lock when dropped.
pub struct LockGuard<'a, T>(&'a Lock<T>);
unsafe impl<T: Send> Send for LockGuard<'_, T> {}
unsafe impl<T: Sync> Sync for LockGuard<'_, T> {}
impl<T> Drop for LockGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify_one();
}
}
impl<T: fmt::Debug> fmt::Debug for LockGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display> fmt::Display for LockGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T> Deref for LockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for LockGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}

View File

@ -1,229 +0,0 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::signal::Signal;
/// An asynchronous mutex.
///
/// This type is similar to [`std::sync::Mutex`], except locking is an asynchronous operation.
///
/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// # smol::run(async {
/// #
/// use piper::Mutex;
/// use smol::Task;
/// use std::sync::Arc;
///
/// let m = Arc::new(Mutex::new(0));
/// let mut tasks = vec![];
///
/// for _ in 0..10 {
/// let m = m.clone();
/// tasks.push(Task::spawn(async move {
/// *m.lock().await += 1;
/// }));
/// }
///
/// for t in tasks {
/// t.await;
/// }
/// assert_eq!(*m.lock().await, 10);
/// #
/// # })
/// ```
pub struct Mutex<T> {
locked: AtomicBool,
lock_ops: Signal,
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a new async mutex.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(0);
/// ```
pub fn new(data: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Signal::new(),
data: UnsafeCell::new(data),
}
}
/// Acquires the mutex.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// # smol::block_on(async {
/// #
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock().await;
/// assert_eq!(*guard, 10);
/// #
/// # })
/// ```
pub async fn lock(&self) -> MutexGuard<'_, T> {
loop {
// Try locking the mutex.
if let Some(guard) = self.try_lock() {
return guard;
}
// Start watching for notifications and try locking again.
let l = self.lock_ops.listen();
if let Some(guard) = self.try_lock() {
return guard;
}
l.await;
}
}
/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the mutex when dropped.
///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// if let Ok(guard) = mutex.try_lock() {
/// assert_eq!(*guard, 10);
/// }
/// # ;
/// ```
#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
/// Consumes the mutex, returning the underlying data.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// assert_eq!(mutex.into_inner(), 10);
/// ```
pub fn into_inner(self) -> T {
self.data.into_inner()
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
/// borrow statically guarantees the mutex is not already acquired.
///
/// # Examples
///
/// ```
/// # smol::block_on(async {
/// #
/// use piper::Mutex;
///
/// let mut mutex = Mutex::new(0);
/// *mutex.get_mut() = 10;
/// assert_eq!(*mutex.lock().await, 10);
/// #
/// # })
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_lock() {
None => f.debug_struct("Mutex").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
}
}
}
impl<T> From<T> for Mutex<T> {
fn from(val: T) -> Mutex<T> {
Mutex::new(val)
}
}
impl<T: Default> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
/// A guard that releases the mutex when dropped.
pub struct MutexGuard<'a, T>(&'a Mutex<T>);
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify_one();
}
}
impl<T: fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display> fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}

View File

@ -1,302 +0,0 @@
use std::cell::Cell;
use std::io;
use std::mem;
use std::pin::Pin;
use std::slice;
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::io::{AsyncRead, AsyncWrite};
use futures::task::AtomicWaker;
/// Creates a bounded single-producer single-consumer pipe.
pub fn pipe(cap: usize) -> (Reader, Writer) {
assert!(cap > 0, "capacity must be positive");
let mut v = Vec::with_capacity(cap);
let buffer = v.as_mut_ptr();
mem::forget(v);
let inner = Arc::new(Inner {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
reader: AtomicWaker::new(),
writer: AtomicWaker::new(),
closed: AtomicBool::new(false),
buffer,
cap,
});
let r = Reader {
inner: inner.clone(),
head: Cell::new(0),
tail: Cell::new(0),
};
let w = Writer {
inner,
head: Cell::new(0),
tail: Cell::new(0),
};
(r, w)
}
// NOTE: Reader and Writer are !Clone + !Sync
/// The reading side of a pipe.
#[derive(Debug)]
pub struct Reader {
inner: Arc<Inner>,
head: Cell<usize>,
tail: Cell<usize>,
}
/// The writing side of a pipe.
#[derive(Debug)]
pub struct Writer {
inner: Arc<Inner>,
head: Cell<usize>,
tail: Cell<usize>,
}
unsafe impl Send for Reader {}
unsafe impl Send for Writer {}
#[derive(Debug)]
struct Inner {
head: AtomicUsize,
tail: AtomicUsize,
reader: AtomicWaker,
writer: AtomicWaker,
closed: AtomicBool,
buffer: *mut u8,
cap: usize,
}
impl Inner {
#[inline]
unsafe fn slot(&self, pos: usize) -> *mut u8 {
if pos < self.cap {
self.buffer.add(pos)
} else {
self.buffer.add(pos - self.cap)
}
}
#[inline]
fn advance(&self, pos: usize, by: usize) -> usize {
if by < 2 * self.cap - pos {
pos + by
} else {
pos - self.cap + by - self.cap
}
}
#[inline]
fn distance(&self, a: usize, b: usize) -> usize {
if a <= b {
b - a
} else {
2 * self.cap - a + b
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
impl Drop for Reader {
fn drop(&mut self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.writer.wake();
}
}
impl Drop for Writer {
fn drop(&mut self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.reader.wake();
}
}
impl AsyncRead for Reader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_read(cx, buf)
}
}
impl AsyncWrite for Writer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self).poll_close(cx)
}
}
impl AsyncRead for &Reader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
let mut head = self.head.get();
let mut tail = self.tail.get();
if self.inner.distance(head, tail) == 0 {
tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
if self.inner.distance(head, tail) == 0 {
self.inner.reader.register(cx.waker());
atomic::fence(Ordering::SeqCst);
tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
if self.inner.distance(head, tail) == 0 {
if self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(Ok(0));
} else {
return Poll::Pending;
}
}
self.inner.reader.take();
}
}
let mut count = 0;
loop {
self.inner.writer.wake();
let streak = if head < self.inner.cap {
self.inner.cap - head
} else {
2 * self.inner.cap - head
};
let remaining = self.inner.distance(head, tail);
let space = buf.len() - count;
let n = streak.min(remaining).min(space).min(16 * 1024);
if n == 0 {
break;
}
let slice = unsafe { slice::from_raw_parts(self.inner.slot(head), n) };
buf[count..count + n].copy_from_slice(slice);
count += n;
head = self.inner.advance(head, n);
// Store the current head pointer.
self.inner.head.store(head, Ordering::Release);
self.head.set(head);
}
Poll::Ready(Ok(count))
}
}
impl AsyncWrite for &Writer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if buf.is_empty() || self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(Ok(0));
}
let mut head = self.head.get();
let mut tail = self.tail.get();
if self.inner.distance(head, tail) == self.inner.cap {
head = self.inner.head.load(Ordering::Acquire);
self.head.set(head);
if self.inner.distance(head, tail) == self.inner.cap {
self.inner.writer.register(cx.waker());
atomic::fence(Ordering::SeqCst);
head = self.inner.head.load(Ordering::Acquire);
self.head.set(head);
if self.inner.distance(head, tail) == self.inner.cap {
if self.inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(Ok(0));
} else {
return Poll::Pending;
}
}
self.inner.writer.take();
}
}
let mut count = 0;
loop {
self.inner.reader.wake();
let streak = if tail < self.inner.cap {
self.inner.cap - tail
} else {
2 * self.inner.cap - tail
};
let remaining = buf.len() - count;
let space = self.inner.cap - self.inner.distance(head, tail);
let n = streak.min(remaining).min(space).min(16 * 1024);
if n == 0 {
break;
}
unsafe {
self.inner
.slot(tail)
.copy_from_nonoverlapping(&buf[count], n);
}
count += n;
tail = self.inner.advance(tail, n);
// Store the current tail pointer.
self.inner.tail.store(tail, Ordering::Release);
self.tail.set(tail);
}
Poll::Ready(Ok(count))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

View File

@ -1,153 +0,0 @@
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::io::{self, AsyncRead, AsyncWrite};
/// A reference-counted pointer that implements async I/O traits.
///
/// This is just a wrapper around `Arc<T>` that adds the following impls:
///
/// - `impl<T> AsyncRead for Shared<T> where &T: AsyncRead {}`
/// - `impl<T> AsyncWrite for Shared<T> where &T: AsyncWrite {}`
///
/// # Examples
///
/// ```no_run
/// use futures::io;
/// use piper::Shared;
/// use smol::Async;
/// use std::net::TcpStream;
///
/// # fn main() -> std::io::Result<()> { smol::run(async {
/// // A client that echoes messages back to the server.
/// let stream = Async::<TcpStream>::connect("127.0.0.1:8000").await?;
///
/// // Create two handles to the stream.
/// let reader = Shared::new(stream);
/// let mut writer = reader.clone();
///
/// // Echo data received from the reader back into the writer.
/// io::copy(reader, &mut writer).await?;
/// # Ok(()) }) }
/// ```
pub struct Shared<T>(Arc<T>);
impl<T> Unpin for Shared<T> {}
impl<T> Shared<T> {
/// Constructs a new `Shared<T>`.
///
/// # Examples
///
/// ```
/// use piper::Shared;
/// use std::sync::Arc;
///
/// // These two lines are equivalent:
/// let a = Shared::new(7);
/// let a = Shared(Arc::new(7));
/// ```
pub fn new(data: T) -> Shared<T> {
Shared(Arc::new(data))
}
}
impl<T> Clone for Shared<T> {
fn clone(&self) -> Shared<T> {
Shared(self.0.clone())
}
}
impl<T> Deref for Shared<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display> fmt::Display for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
impl<T: Hash> Hash for Shared<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
(**self).hash(state)
}
}
impl<T> fmt::Pointer for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Pointer::fmt(&(&**self as *const T), f)
}
}
impl<T: Default> Default for Shared<T> {
fn default() -> Shared<T> {
Shared(Arc::new(Default::default()))
}
}
impl<T> From<T> for Shared<T> {
fn from(t: T) -> Shared<T> {
Shared(Arc::new(t))
}
}
// NOTE(stjepang): It would also make sense to have the following impls:
//
// - `impl<T> AsyncRead for &Shared<T> where &T: AsyncRead {}`
// - `impl<T> AsyncWrite for &Shared<T> where &T: AsyncWrite {}`
//
// However, those impls sometimes make Rust's type inference try too hard when types cannot be
// inferred. In the end, instead of complaining with a nice error message, the Rust compiler ends
// up overflowing and dumping a very long error message spanning multiple screens.
//
// Since those impls are not essential, I decided to err on the safe side and not include them.
impl<T> AsyncRead for Shared<T>
where
for<'a> &'a T: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self.0).poll_read(cx, buf)
}
}
impl<T> AsyncWrite for Shared<T>
where
for<'a> &'a T: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self.0).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self.0).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self.0).poll_close(cx)
}
}

View File

@ -1,523 +0,0 @@
//! A synchronization primitive for notifying async tasks and threads.
//!
//! This is a variant of a conditional variable that is heavily inspired by eventcounts invented
//! by Dmitry Vyukov: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
use std::cell::Cell;
use std::future::Future;
use std::mem::{self, ManuallyDrop};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::thread::{self, Thread};
/// A bit set inside `Signal` when there is at least one listener that has already been notified.
const NOTIFIED: usize = 1 << 0;
/// A bit set inside `Signal` when there is at least one notifiable listener.
const NOTIFIABLE: usize = 1 << 1;
/// Inner state of `Signal`.
struct Inner {
/// Holds bits `NOTIFIED` and `NOTIFIABLE`.
flags: AtomicUsize,
/// A linked list holding registered listeners.
list: Mutex<List>,
}
impl Inner {
/// Locks the list.
fn lock(&self) -> ListGuard<'_> {
ListGuard {
inner: self,
guard: self.list.lock().unwrap(),
}
}
}
/// A synchronization primitive for notifying async tasks and threads.
///
/// Listeners can be registered using `listen()`. There are two ways of notifying listeners:
///
/// 1. `notify_one()` notifies one listener.
/// 2. `notify_all()` notifies all listeners.
///
/// If there are no active listeners at the time a notification is sent, it simply gets lost.
///
/// Note that `notify_one()` does not notify one *additional* listener - it only makes sure
/// *at least* one listener among the active ones is notified.
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling `wait()` on it.
///
/// If a notified listener is dropped without ever waiting for a notification, dropping will notify
/// another another active listener.
///
/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
pub struct Signal {
/// A pointer to heap-allocated inner state.
///
/// This pointer is initially null and gets lazily initialized on first use. Semantically, it
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the `Arc`s
/// reference count.
inner: AtomicPtr<Inner>,
}
unsafe impl Send for Signal {}
unsafe impl Sync for Signal {}
impl Signal {
/// Creates a new `Signal`.
#[inline]
pub fn new() -> Signal {
Signal {
inner: AtomicPtr::default(),
}
}
/// Returns a guard listening for a notification.
#[cold]
pub fn listen(&self) -> SignalListener {
let inner = self.inner();
let listener = SignalListener {
inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
entry: Some(inner.lock().insert()),
};
// Make sure the listener is registered before whatever happens next.
full_fence();
listener
}
/// Notifies a single active listener.
///
/// Note that this does not notify one *additional* listener - it only makes sure *at least*
/// one listener among the active ones is notified.
#[inline]
pub fn notify_one(&self) {
let inner = self.inner();
// Make sure the notification comes after whatever triggered it.
full_fence();
// Notify if no active listeners have been notified and there is at least one listener.
let flags = inner.flags.load(Ordering::Relaxed);
if flags & NOTIFIED == 0 && flags & NOTIFIABLE != 0 {
inner.lock().notify(false);
}
}
/// Notifies all active listeners.
#[inline]
pub fn notify_all(&self) {
let inner = self.inner();
// Make sure the notification comes after whatever triggered it.
full_fence();
// Notify if there is at least one listener.
if inner.flags.load(Ordering::Relaxed) & NOTIFIABLE != 0 {
inner.lock().notify(true);
}
}
/// Returns a reference to the inner state.
fn inner(&self) -> &Inner {
let mut inner = self.inner.load(Ordering::Acquire);
// Initialize the state if this is its first use.
if inner.is_null() {
// Allocate on the heap.
let new = Arc::new(Inner {
flags: AtomicUsize::new(0),
list: Mutex::new(List {
head: None,
tail: None,
len: 0,
notifiable: 0,
}),
});
// Convert the heap-allocated state into a raw pointer.
let new = Arc::into_raw(new) as *mut Inner;
// Attempt to replace the null-pointer with the new state pointer.
inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel);
// Check if the old pointer value was indeed null.
if inner.is_null() {
// If yes, then use the new state pointer.
inner = new;
} else {
// If not, that means a concurrent operation has initialized the state.
// In that case, use the old pointer and deallocate the new one.
unsafe {
drop(Arc::from_raw(new));
}
}
}
unsafe { &*inner }
}
}
impl Drop for Signal {
#[inline]
fn drop(&mut self) {
let inner: *mut Inner = *self.inner.get_mut();
// If the state pointer has been initialized, deallocate it.
if !inner.is_null() {
unsafe {
drop(Arc::from_raw(inner));
}
}
}
}
impl Default for Signal {
fn default() -> Signal {
Signal::new()
}
}
/// A guard waiting for a notification from a `Signal`.
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling `wait()` on it.
///
/// If a notified listener is dropped without ever waiting for a notification, dropping will notify
/// another another active listener.
pub struct SignalListener {
/// A reference to `Signal`s inner state.
inner: Arc<Inner>,
/// A pointer to this listener's entry in the linked list.
entry: Option<NonNull<Entry>>,
}
unsafe impl Send for SignalListener {}
unsafe impl Sync for SignalListener {}
impl SignalListener {
/// Blocks until a notification is received.
pub fn wait(mut self) {
// Take out the entry pointer and set it to `None`.
let entry = match self.entry.take() {
None => unreachable!("cannot wait twice on a `SignalListener`"),
Some(entry) => entry,
};
// Set this listener's state to `Waiting`.
{
let mut list = self.inner.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match e.state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
return;
}
// Otherwise, set the state to `Waiting`.
_ => e.state.set(State::Waiting(thread::current())),
}
}
// Wait until a notification is received.
loop {
thread::park();
let mut list = self.inner.lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match e.state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
return;
}
// Otherwise, set the state back to `Waiting`.
state => e.state.set(state),
}
}
}
}
impl Future for SignalListener {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut list = self.inner.lock();
let entry = match self.entry {
None => unreachable!("cannot poll a completed `SignalListener` future"),
Some(entry) => entry,
};
let state = unsafe { &entry.as_ref().state };
// Do a dummy replace operation in order to take out the state.
match state.replace(State::Notified) {
State::Notified => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry);
drop(list);
self.entry = None;
return Poll::Ready(());
}
State::Created => {
// If the listener was just created, put it in the `Polling` state.
state.set(State::Polling(cx.waker().clone()));
}
State::Polling(w) => {
// If the listener was in the `Pooling` state, keep it.
state.set(State::Polling(w));
}
State::Waiting(_) => {
unreachable!("cannot poll and wait on `SignalListener` at the same time")
}
}
Poll::Pending
}
}
impl Drop for SignalListener {
fn drop(&mut self) {
// If this listener has never picked up a notification...
if let Some(entry) = self.entry.take() {
let mut list = self.inner.lock();
// But if a notification was delivered to it...
if list.remove(entry).is_notified() {
// Then pass it on to another active listener.
list.notify(false);
}
}
}
}
/// A guard holding the linked list locked.
struct ListGuard<'a> {
/// A reference to `Signal`s inner state.
inner: &'a Inner,
/// The actual guard that acquired the linked list.
guard: MutexGuard<'a, List>,
}
impl Drop for ListGuard<'_> {
#[inline]
fn drop(&mut self) {
let list = &mut **self;
let mut flags = 0;
// Set the `NOTIFIED` flag if there is at least one notified listener.
if list.len - list.notifiable > 0 {
flags |= NOTIFIED;
}
// Set the `NOTIFIABLE` flag if there is at least one notifiable listener.
if list.notifiable > 0 {
flags |= NOTIFIABLE;
}
self.inner.flags.store(flags, Ordering::Release);
}
}
impl Deref for ListGuard<'_> {
type Target = List;
#[inline]
fn deref(&self) -> &List {
&*self.guard
}
}
impl DerefMut for ListGuard<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut List {
&mut *self.guard
}
}
/// The state of a listener.
enum State {
/// It has just been created.
Created,
/// It has received a notification.
Notified,
/// An async task is polling it.
Polling(Waker),
/// A thread is blocked on it.
Waiting(Thread),
}
impl State {
/// Returns `true` if this is the `Notified` state.
#[inline]
fn is_notified(&self) -> bool {
match self {
State::Notified => true,
State::Created | State::Polling(_) | State::Waiting(_) => false,
}
}
}
/// An entry representing a registered listener.
struct Entry {
/// THe state of this listener.
state: Cell<State>,
/// Previous entry in the linked list.
prev: Cell<Option<NonNull<Entry>>>,
/// Next entry in the linked list.
next: Cell<Option<NonNull<Entry>>>,
}
/// A linked list of entries.
struct List {
/// First entry in the list.
head: Option<NonNull<Entry>>,
/// Last entry in the list.
tail: Option<NonNull<Entry>>,
/// Total number of entries in the list.
len: usize,
/// Number of notifiable entries in the list.
///
/// Notifiable entries are those that haven't been notified yet.
notifiable: usize,
}
impl List {
/// Inserts a new entry into the list.
fn insert(&mut self) -> NonNull<Entry> {
unsafe {
// Allocate an entry that is going to become the new tail.
let entry = NonNull::new_unchecked(Box::into_raw(Box::new(Entry {
state: Cell::new(State::Created),
prev: Cell::new(self.tail),
next: Cell::new(None),
})));
// Replace the tail with the new entry.
match mem::replace(&mut self.tail, Some(entry)) {
None => self.head = Some(entry),
Some(t) => t.as_ref().next.set(Some(entry)),
}
// Bump the total count and the count of notifiable entries.
self.len += 1;
self.notifiable += 1;
entry
}
}
/// Removes an entry from the list and returns its state.
fn remove(&mut self, entry: NonNull<Entry>) -> State {
unsafe {
let prev = entry.as_ref().prev.get();
let next = entry.as_ref().next.get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => p.as_ref().next.set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => n.as_ref().prev.set(prev),
}
// Deallocate and extract the state.
let entry = Box::from_raw(entry.as_ptr());
let state = entry.state.into_inner();
// Update the counters.
if !state.is_notified() {
self.notifiable -= 1;
}
self.len -= 1;
state
}
}
/// Notifies an entry.
#[cold]
fn notify(&mut self, notify_all: bool) {
let mut entry = self.tail;
// Iterate over the entries in the list.
while let Some(e) = entry {
let e = unsafe { e.as_ref() };
// Set the state of this entry to `Notified`.
let state = e.state.replace(State::Notified);
let is_notified = state.is_notified();
// Wake the task or unpark the thread.
match state {
State::Notified => {}
State::Created => {}
State::Polling(w) => w.wake(),
State::Waiting(t) => t.unpark(),
}
// Update the count of notifiable entries.
if !is_notified {
self.notifiable -= 1;
}
// If all entries need to be notified, go to the next one.
if notify_all {
entry = e.prev.get();
} else {
break;
}
}
}
}
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
//
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one makes notifiying listeners a bit faster.
//
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
let a = AtomicUsize::new(0);
a.compare_and_swap(0, 1, Ordering::SeqCst);
} else {
atomic::fence(Ordering::SeqCst);
}
}

View File

@ -1,357 +0,0 @@
#![cfg(feature = "unstable")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use rand::{thread_rng, Rng};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn smoke() {
task::block_on(async {
let (s, r) = piper::chan(1);
s.send(7).await;
assert_eq!(r.recv().await, Some(7));
s.send(8).await;
assert_eq!(r.recv().await, Some(8));
drop(s);
assert_eq!(r.recv().await, None);
});
task::block_on(async {
let (s, r) = piper::chan(10);
drop(r);
s.send(1).await;
});
}
#[test]
fn capacity() {
for i in 1..10 {
let (s, r) = channel::<()>(i);
assert_eq!(s.capacity(), i);
assert_eq!(r.capacity(), i);
}
}
#[test]
fn len_empty_full() {
#![allow(clippy::cognitive_complexity)]
task::block_on(async {
let (s, r) = piper::chan(2);
assert_eq!(s.len(), 0);
assert_eq!(s.is_empty(), true);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 0);
assert_eq!(r.is_empty(), true);
assert_eq!(r.is_full(), false);
s.send(()).await;
assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
s.send(()).await;
assert_eq!(s.len(), 2);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), true);
assert_eq!(r.len(), 2);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), true);
r.recv().await;
assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
})
}
#[test]
fn recv() {
task::block_on(async {
let (s, r) = piper::chan(100);
task::spawn(async move {
assert_eq!(r.recv().await, Some(7));
task::sleep(ms(1000)).await;
assert_eq!(r.recv().await, Some(8));
task::sleep(ms(1000)).await;
assert_eq!(r.recv().await, Some(9));
assert_eq!(r.recv().await, None);
});
task::sleep(ms(1500)).await;
s.send(7).await;
s.send(8).await;
s.send(9).await;
})
}
#[test]
fn send() {
task::block_on(async {
let (s, r) = piper::chan(1);
task::spawn(async move {
s.send(7).await;
task::sleep(ms(1000)).await;
s.send(8).await;
task::sleep(ms(1000)).await;
s.send(9).await;
task::sleep(ms(1000)).await;
s.send(10).await;
});
task::sleep(ms(1500)).await;
assert_eq!(r.recv().await, Some(7));
assert_eq!(r.recv().await, Some(8));
assert_eq!(r.recv().await, Some(9));
})
}
#[test]
fn recv_after_disconnect() {
task::block_on(async {
let (s, r) = piper::chan(100);
s.send(1).await;
s.send(2).await;
s.send(3).await;
drop(s);
assert_eq!(r.recv().await, Some(1));
assert_eq!(r.recv().await, Some(2));
assert_eq!(r.recv().await, Some(3));
assert_eq!(r.recv().await, None);
})
}
#[test]
fn len() {
const COUNT: usize = 25_000;
const CAP: usize = 1000;
task::block_on(async {
let (s, r) = piper::chan(CAP);
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
for _ in 0..CAP / 10 {
for i in 0..50 {
s.send(i).await;
assert_eq!(s.len(), i + 1);
}
for i in 0..50 {
r.recv().await;
assert_eq!(r.len(), 50 - i - 1);
}
}
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
for i in 0..CAP {
s.send(i).await;
assert_eq!(s.len(), i + 1);
}
for _ in 0..CAP {
r.recv().await.unwrap();
}
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
let child = task::spawn({
let r = r.clone();
async move {
for i in 0..COUNT {
assert_eq!(r.recv().await, Some(i));
let len = r.len();
assert!(len <= CAP);
}
}
});
for i in 0..COUNT {
s.send(i).await;
let len = s.len();
assert!(len <= CAP);
}
child.await;
assert_eq!(s.len(), 0);
assert_eq!(r.len(), 0);
})
}
#[test]
fn disconnect_wakes_receiver() {
task::block_on(async {
let (s, r) = channel::<()>(1);
let child = task::spawn(async move {
assert_eq!(r.recv().await, None);
});
task::sleep(ms(1000)).await;
drop(s);
child.await;
})
}
#[test]
fn spsc() {
const COUNT: usize = 100_000;
task::block_on(async {
let (s, r) = piper::chan(3);
let child = task::spawn(async move {
for i in 0..COUNT {
assert_eq!(r.recv().await, Some(i));
}
assert_eq!(r.recv().await, None);
});
for i in 0..COUNT {
s.send(i).await;
}
drop(s);
child.await;
})
}
#[test]
fn mpmc() {
const COUNT: usize = 25_000;
const TASKS: usize = 4;
task::block_on(async {
let (s, r) = channel::<usize>(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
let v = Arc::new(v);
let mut tasks = Vec::new();
for _ in 0..TASKS {
let r = r.clone();
let v = v.clone();
tasks.push(task::spawn(async move {
for _ in 0..COUNT {
let n = r.recv().await.unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
}
}));
}
for _ in 0..TASKS {
let s = s.clone();
tasks.push(task::spawn(async move {
for i in 0..COUNT {
s.send(i).await;
}
}));
}
for t in tasks {
t.await;
}
for c in v.iter() {
assert_eq!(c.load(Ordering::SeqCst), TASKS);
}
});
}
#[test]
fn oneshot() {
const COUNT: usize = 10_000;
task::block_on(async {
for _ in 0..COUNT {
let (s, r) = piper::chan(1);
let c1 = task::spawn(async move { r.recv().await.unwrap() });
let c2 = task::spawn(async move { s.send(0).await });
c1.await;
c2.await;
}
})
}
#[test]
fn drops() {
const RUNS: usize = 100;
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq)]
struct DropCounter;
impl Drop for DropCounter {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::SeqCst);
}
}
let mut rng = thread_rng();
for _ in 0..RUNS {
task::block_on(async {
let steps = rng.gen_range(0, 10_000);
let additional = rng.gen_range(0, 50);
DROPS.store(0, Ordering::SeqCst);
let (s, r) = channel::<DropCounter>(50);
let child = task::spawn({
let r = r.clone();
async move {
for _ in 0..steps {
r.recv().await.unwrap();
}
}
});
for _ in 0..steps {
s.send(DropCounter).await;
}
child.await;
for _ in 0..additional {
s.send(DropCounter).await;
}
assert_eq!(DROPS.load(Ordering::SeqCst), steps);
drop(s);
drop(r);
assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
})
}
}

View File

@ -342,15 +342,15 @@ impl Async<TcpStream> {
let _ = socket.connect(&addr.into());
let stream = Async::new(socket.into_tcp_stream())?;
// Waits until the stream becomes writable.
let wait_writable = |mut stream: &TcpStream| match stream.write(&[]) {
// Returns the peer address, or `WouldBlock` if not connected.
let peer_addr = |stream: &TcpStream| match stream.peer_addr() {
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
res => res.map(|_| ()),
res => res,
};
// The stream becomes writable when connected.
stream.with(|io| wait_writable(io)).await?;
// Wait until there is a peer address.
stream.with(|io| peer_addr(io)).await?;
Ok(stream)
}
@ -651,15 +651,15 @@ impl Async<UnixStream> {
let _ = socket.connect(&socket2::SockAddr::unix(path)?);
let stream = Async::new(socket.into_unix_stream())?;
// Waits until the stream becomes writable.
let wait_writable = |mut stream: &UnixStream| match stream.write(&[]) {
// Returns the peer address, or `WouldBlock` if not connected.
let peer_addr = |stream: &UnixStream| match stream.peer_addr() {
Err(err) if err.kind() == io::ErrorKind::NotConnected => {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
res => res.map(|_| ()),
res => res,
};
// The stream becomes writable when connected.
stream.with(|io| wait_writable(io)).await?;
// Wait until there is a peer address.
stream.with(|io| peer_addr(io)).await?;
Ok(stream)
}

View File

@ -45,17 +45,17 @@ pub(crate) struct Reactor {
sys: sys::Reactor,
/// Registered sources.
sources: piper::Lock<Slab<Arc<Source>>>,
sources: piper::Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
events: piper::Mutex<sys::Events>,
events: piper::Lock<sys::Events>,
/// An ordered map of registered timers.
///
/// Timers are in the order in which they fire. The `u64` in this type is a unique timer ID
/// used to distinguish timers that fire at the same time. The `Waker` represents the task
/// awaiting the timer.
timers: piper::Lock<BTreeMap<(Instant, u64), Waker>>,
timers: piper::Mutex<BTreeMap<(Instant, u64), Waker>>,
/// An I/O event that is triggered when a new earliest timer is registered.
///
@ -72,9 +72,9 @@ impl Reactor {
pub fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
sources: piper::Lock::new(Slab::new()),
events: piper::Mutex::new(sys::Events::new()),
timers: piper::Lock::new(BTreeMap::new()),
sources: piper::Mutex::new(Slab::new()),
events: piper::Lock::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
});
&REACTOR
@ -110,7 +110,7 @@ impl Reactor {
let source = Arc::new(Source {
raw,
key: vacant.key(),
wakers: piper::Lock::new(Vec::new()),
wakers: piper::Mutex::new(Vec::new()),
tick: AtomicU64::new(0),
});
self.sys.register(raw, source.key)?;
@ -175,7 +175,7 @@ impl Reactor {
/// Polls the reactor for I/O events and wakes up tasks.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: piper::MutexGuard<'a, sys::Events>,
events: piper::LockGuard<sys::Events>,
}
impl ReactorLock<'_> {
@ -255,7 +255,7 @@ pub(crate) struct Source {
key: usize,
/// A list of wakers representing tasks interested in events on this source.
wakers: piper::Lock<Vec<Waker>>,
wakers: piper::Mutex<Vec<Waker>>,
/// Incremented on every I/O notification - this is only used for synchronization.
tick: AtomicU64,

View File

@ -8,7 +8,22 @@ use crate::reactor::Reactor;
/// Fires at the chosen point in time.
///
/// TODO
/// # Examples
///
/// Sleep for 1 second:
///
/// ```
/// use smol::Timer;
/// use std::time::Duration;
///
/// async fn sleep(dur: Duration) {
/// Timer::after(dur).await;
/// }
///
/// # smol::run(async {
/// sleep(Duration::from_secs(1)).await;
/// # });
/// ```
#[derive(Debug)]
pub struct Timer {
/// A unique ID for this timer.