Lots of changes

This commit is contained in:
Stjepan Glavina 2020-02-06 20:56:20 +01:00
parent efcf66a535
commit 0f4ea1fcf3
3 changed files with 51 additions and 20 deletions

View File

@ -16,7 +16,6 @@ nix = "0.16.1"
num_cpus = "1.12.0"
once_cell = "1.3.1"
parking_lot = "0.10.0"
sharded-slab = "0.0.8"
slab = "0.4.2"
socket2 = "0.3.11"

View File

@ -2,17 +2,19 @@
* Small - Fits into a single source file.
* Fast - On par with async-std and Tokio.
* Safe - Written fully in safe Rust.
* Documented - Easy to understand and tweak.
* Safe - Written in 100% safe Rust.
* Complete - Fully featured and ready for production.
* Documented - Code is easy to understand and modify.
* Portable - Works on Linux, Android, macOS, iOS, and Windows.
* Lightweight - Built from scratch, hooks directly into epoll/kqueue/wepoll.
* Lightweight - Simple dependencies, hooks into epoll/kqueue/wepoll.
## Features
* Executor - Configurable threads, work stealing, supports non-send futures.
* Blocking - Thread pool for isolating blocking code.
* Networking - TCP, UDP, Unix domain sockets, and custom files/sockets.
* Process - Can spawn child processes and interact with their I/O.
* Process - Spawns child processes and interacts with their I/O.
* Files - Filesystem manipulation operations.
* Stdio - Asynchronous stdin, stdout, and stderr.
* Timer - Efficient userspace timers.

View File

@ -1,4 +1,5 @@
#![forbid(unsafe_code)]
// TODO: #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::collections::BTreeMap;
use std::convert::TryInto;
@ -54,6 +55,8 @@ compile_error!("smol does not support this target OS");
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - maybe just call notify()
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
// TODO: Async::stream(impl Iterator) -> impl Stream + Unpin
// TODO: filesystem operations (with OS-specific extensions), don't do Async<File>
// ----- Event loop -----
@ -508,22 +511,32 @@ pub struct Async<T>(Arc<Registration<T>>);
impl<T> Async<T> {
/// Turns a non-blocking I/O handle into an async I/O handle.
pub fn register(source: T) -> Async<T>
pub fn nonblocking(source: T) -> Async<T>
where
T: AsRawFd,
{
RT.registry.register(source)
}
// // NOTE: stop task if the returned handle is dropped
// NOTE: stop task if the returned handle is dropped
// Turns a blocking iterator into an async stream.
// fn streamer(t: T) -> impl Stream<Item = T::Item> + Unpin
// where
// T: Iterator + 'static,
// {
// todo!()
// }
// Turns a blocking reader into an async reader.
// fn reader(t: T) -> impl AsyncRead + Unpin
// where
// T: Read + 'static,
// {
// todo!()
// }
//
// // NOTE: stop task if the returned handle is dropped
// Turns a blocking writer into an async writer.
// fn writer(t: T) -> impl AsyncWrite + Unpin
// where
// T: Write + 'static,
@ -636,6 +649,23 @@ macro_rules! async_io_impls {
async_io_impls!(Async<T>);
async_io_impls!(&Async<T>);
// ----- Stdio -----
// impl Async<Stdin> {
// Returns an async writer into stdin.
// TODO: Async::stdin() -> impl AsyncWrite + Unpin
// }
// impl Async<Stdout> {
// Returns an async reader from stdout.
// TODO: Async::stdout() -> impl AsyncRead + Unpin
// }
// impl Async<Stderr> {
// Returns an async reader from stderr.
// TODO: Async::stderr() -> impl AsyncRead + Unpin
// }
// ------ Process -----
impl Async<Command> {
@ -672,14 +702,14 @@ impl Async<TcpListener> {
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Async<TcpListener>> {
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
Ok(Async::register(listener))
Ok(Async::nonblocking(listener))
}
/// Accepts a new incoming connection.
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
let (stream, addr) = self.read_with(|source| source.accept()).await?;
stream.set_nonblocking(true)?;
Ok((Async::register(stream), addr))
Ok((Async::nonblocking(stream), addr))
}
/// Returns a stream over incoming connections.
@ -730,7 +760,7 @@ impl Async<TcpStream> {
// Begin async connect and ignore the inevitable "not yet connected" error.
socket.set_nonblocking(true)?;
let _ = socket.connect(&addr.into());
let stream = Async::register(socket.into_tcp_stream());
let stream = Async::nonblocking(socket.into_tcp_stream());
// Wait for connect to complete.
let wait_connect = |stream: &TcpStream| match stream.peer_addr() {
@ -760,7 +790,7 @@ impl Async<UdpSocket> {
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Async<UdpSocket>> {
let socket = UdpSocket::bind(addr)?;
socket.set_nonblocking(true)?;
Ok(Async::register(socket))
Ok(Async::nonblocking(socket))
}
/// Sends data to the specified address.
@ -799,14 +829,14 @@ impl Async<UnixListener> {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
let listener = UnixListener::bind(path)?;
listener.set_nonblocking(true)?;
Ok(Async::register(listener))
Ok(Async::nonblocking(listener))
}
/// Accepts a new incoming connection.
pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
let (stream, addr) = self.read_with(|source| source.accept()).await?;
stream.set_nonblocking(true)?;
Ok((Async::register(stream), addr))
Ok((Async::nonblocking(stream), addr))
}
/// Returns a stream over incoming connections.
@ -823,7 +853,7 @@ impl Async<UnixStream> {
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
let stream = UnixStream::connect(path)?;
stream.set_nonblocking(true)?;
Ok(Async::register(stream))
Ok(Async::nonblocking(stream))
}
/// Creates an unnamed pair of connected streams.
@ -831,7 +861,7 @@ impl Async<UnixStream> {
let (stream1, stream2) = UnixStream::pair()?;
stream1.set_nonblocking(true)?;
stream2.set_nonblocking(true)?;
Ok((Async::register(stream1), Async::register(stream2)))
Ok((Async::nonblocking(stream1), Async::nonblocking(stream2)))
}
}
@ -840,14 +870,14 @@ impl Async<UnixDatagram> {
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
let socket = UnixDatagram::bind(path)?;
socket.set_nonblocking(true)?;
Ok(Async::register(socket))
Ok(Async::nonblocking(socket))
}
/// Creates a socket not bound to any address.
pub fn unbound() -> io::Result<Async<UnixDatagram>> {
let socket = UnixDatagram::unbound()?;
socket.set_nonblocking(true)?;
Ok(Async::register(socket))
Ok(Async::nonblocking(socket))
}
/// Creates an unnamed pair of connected sockets.
@ -855,7 +885,7 @@ impl Async<UnixDatagram> {
let (socket1, socket2) = UnixDatagram::pair()?;
socket1.set_nonblocking(true)?;
socket2.set_nonblocking(true)?;
Ok((Async::register(socket1), Async::register(socket2)))
Ok((Async::nonblocking(socket1), Async::nonblocking(socket2)))
}
/// Sends data to the specified address.