Lots of changes

This commit is contained in:
Stjepan Glavina 2020-02-09 15:32:44 +01:00
parent c67b8d3653
commit 3cf08f929d
12 changed files with 506 additions and 584 deletions

View File

@ -21,7 +21,7 @@ socket2 = "0.3.11"
[dev-dependencies]
futures = { version = "0.3.3", default-features = false, features = ["std"] }
hyper = "0.13.2"
hyper = { version = "0.13", default-features = false }
num_cpus = "1.12.0"
pin-utils = "0.1.0-alpha.4"
tokio = "0.2.11"
tokio = { version = "0.2", default-features = false }

20
examples/command.rs Normal file
View File

@ -0,0 +1,20 @@
use std::env;
use std::io;
use std::io::prelude::*;
use std::process::Command;
fn main() -> io::Result<()> {
smol::run(async {
let mut args = env::args().skip(1);
let mut cmd = Command::new(args.next().expect("missing program name"));
for arg in args {
cmd.arg(arg);
}
let out = smol::blocking!(cmd.output())?;
println!("{}", out.status);
io::stdout().write_all(&out.stdout)?;
io::stdout().write_all(&out.stderr)?;
Ok(())
})
}

View File

@ -10,10 +10,10 @@ use std::net::TcpStream;
use futures::io;
use futures::prelude::*;
use smol::{Async, Task};
use smol::Async;
fn main() -> io::Result<()> {
Task::run(async {
smol::run(async {
let mut stream = Async::<TcpStream>::connect("www.example.com:80").await?;
let request = b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n";

View File

@ -1,100 +1,95 @@
use std::convert::Infallible;
use std::io;
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::net::TcpListener;
use futures::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use smol::{Async, Task};
use smol::Async;
async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World!")))
}
struct Incoming(Async<TcpListener>);
impl hyper::server::accept::Accept for Incoming {
type Conn = Connection;
type Error = io::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let fut = self.0.accept();
pin_utils::pin_mut!(fut);
let (stream, _) = futures::ready!(fut.poll(cx))?;
Poll::Ready(Some(Ok(Connection(stream))))
}
}
struct Connection(Async<TcpStream>);
impl tokio::io::AsyncRead for Connection {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl tokio::io::AsyncWrite for Connection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
#[derive(Clone)]
struct Executor;
impl<F> hyper::rt::Executor<F> for Executor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
Task::schedule(fut);
}
}
pub fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Task::run(Task::schedule(async {
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(|_conn| {
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async { Ok::<_, Infallible>(service_fn(hello)) }
});
smol::run(async {
let addr = "127.0.0.1:3000";
let listener = Async::<TcpListener>::bind(addr)?;
let server = Server::builder(Incoming(listener))
.executor(Executor)
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(hello)) });
let server = Server::builder(compat::HyperListener(listener))
.executor(compat::HyperExecutor)
.serve(make_svc);
println!("Listening on http://{}", addr);
server.await?;
Ok(())
}))
})
}
pub mod compat {
use std::io;
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use smol::{Async, Task};
#[derive(Clone)]
pub struct HyperExecutor;
impl<F> hyper::rt::Executor<F> for HyperExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
let _ = Task::spawn(fut);
}
}
pub struct HyperListener(pub Async<TcpListener>);
impl hyper::server::accept::Accept for HyperListener {
type Conn = HyperStream;
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let stream =
futures::ready!(Pin::new(&mut self.0.incoming()).poll_next(cx)).unwrap()?;
Poll::Ready(Some(Ok(HyperStream(stream))))
}
}
pub struct HyperStream(pub Async<TcpStream>);
impl tokio::io::AsyncRead for HyperStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl tokio::io::AsyncWrite for HyperStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
}

20
examples/netcat.rs Normal file
View File

@ -0,0 +1,20 @@
use std::env;
use std::net::TcpStream;
use futures::io;
use futures::io::BufReader;
use smol::Async;
fn main() -> io::Result<()> {
let mut args = env::args().skip(1);
let addr = args.next().expect("missing address argument");
let port = args.next().expect("missing port argument");
smol::run(async {
let mut input = BufReader::new(smol::reader(std::io::stdin()));
let mut stream = Async::<TcpStream>::connect(format!("{}:{}", addr, port)).await?;
io::copy(&mut input, &mut stream).await?;
Ok(())
})
}

21
examples/read-dir.rs Normal file
View File

@ -0,0 +1,21 @@
//! Lists files in a directory given as an argument.
use std::env;
use std::fs;
use futures::io;
use futures::prelude::*;
fn main() -> io::Result<()> {
let path = env::args().nth(1).expect("missing path argument");
smol::run(async move {
let mut dir = smol::iter(smol::blocking!(fs::read_dir(path))?);
while let Some(res) = dir.next().await {
println!("{}", res?.file_name().to_string_lossy());
}
Ok(())
})
}

13
examples/read-file.rs Normal file
View File

@ -0,0 +1,13 @@
//! Prints a file given as an argument to stdout.
use std::{env, fs, io};
fn main() -> io::Result<()> {
let path = env::args().nth(1).expect("missing path argument");
smol::run(async {
let contents = smol::blocking!(fs::read_to_string(path))?;
println!("{}", contents);
Ok(())
})
}

35
examples/stdin.rs Normal file
View File

@ -0,0 +1,35 @@
// use std::io::prelude::*;
// use std::process::{Command, Stdio};
use futures::io;
// use futures::prelude::*;
// fn stdin() -> impl AsyncBufRead + Unpin + 'static {
// let mut child = Command::new("cat")
// .stdin(Stdio::inherit())
// .stdout(Stdio::piped())
// .spawn()
// .expect("Failed to execute command");
// let mut out = child.stdout.take().unwrap();
//
// let (reader, mut writer) = todo!("create a Reader/Writer");
// Task::blocking(async move {
// todo!("copy from out to the Writer");
// // std::io::copy(&mut out, &mut writer);
// })
// .detach();
//
// todo!("return a wrapped Reader that calls child.kill() on drop");
// futures::empty()
// }
fn main() -> io::Result<()> {
smol::run(async {
// let stdin = stdin();
//
// let mut line = String::new();
// stdin.read_line(&mut line).await?;
Ok(())
})
}

View File

@ -6,18 +6,22 @@
//! $ nc localhost 8080
//! ```
use std::net::{TcpListener, TcpStream};
use futures::io;
use smol::{Async, Task};
use std::net::TcpListener;
async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
io::copy(&stream, &mut &stream).await?;
Ok(())
}
fn main() -> io::Result<()> {
Task::run(async {
smol::run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:8080")?;
loop {
let (stream, _) = listener.accept().await?;
Task::schedule(async move {
io::copy(&stream, &mut &stream).await.expect("failed");
});
Task::spawn(echo(stream)).detach();
}
})
}

View File

@ -6,10 +6,10 @@ use smol::Task;
fn main() {
// Start a thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| Task::run(future::pending::<()>()));
thread::spawn(|| smol::run(future::pending::<()>()));
}
let val = Task::run(Task::schedule(async { 1 + 2 }));
let val = smol::block_on(Task::spawn(async { 1 + 2 }));
assert_eq!(val, 3);
// Start a stoppable threadpool.
@ -17,7 +17,7 @@ fn main() {
// for _ in 0..num_cpus::get().max(1) {
// let (s, r) = oneshot::channel<()>();
// pool.push(s);
// thread::spawn(|| Task::run(async move { drop(r.await) }));
// thread::spawn(|| smol::run(async move { drop(r.await) }));
// }
// drop(pool); // stops the threadpool!
}

View File

@ -1,13 +1,13 @@
use std::time::{Duration, Instant};
use smol::{Async, Task};
use smol::Timer;
fn main() {
Task::run(async {
smol::run(async {
let start = Instant::now();
let dur = Duration::from_secs(1);
Async::timer(dur).await;
Timer::after(dur).await;
dbg!(start.elapsed());
})

View File

@ -24,7 +24,7 @@ use std::time::{Duration, Instant};
use crossbeam_channel as channel;
use futures_core::stream::Stream;
use futures_io::{AsyncRead, AsyncWrite};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_util::{future, stream};
use nix::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
@ -37,34 +37,11 @@ use socket2::{Domain, Protocol, Socket, Type};
#[cfg(not(any(target_os = "linux", target_os = "android")))]
compile_error!("smol does not support this target OS");
// TODO: hello world example
// TODO: example with uds_windows crate
// TODO: example with stdin
// TODO: example with pinned threads
// TODO: example with process spawn and output
// TODO: example with process child I/O
// TODO: example with timeout
// TODO: example with OS timers
// TODO: example with Async::reader(std::io::stdin())
// TODO: example with Async::writer(std::io::stdout())
// TODO: example with signal-hook
// - Async::iter(Signals::new(&[SIGINT])?)
// TODO: example with ctrl-c
// TODO: example with filesystem
// TODO: example that prints a file
// TODO: example with hyper
// TODO: generate OS-specific docs with --cfg docsrs
// #[cfg(any(windows, docsrs))]
// #[cfg_attr(docsrs, doc(cfg(windows)))]
// TODO: task cancellation?
// TODO: fix unwraps
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - or maybe just call notify()
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
// TODO: constructors like Async::<TcpStream>::new(std_tcpstream)
// - we need to implement all of them
// - also implement From<>
// TODO: implement FusedFuture for Task?
// TODO: implement FusedFuture for Task and Timer?
// ----- Event loop -----
@ -99,7 +76,7 @@ struct Registry {
struct Poller {
epoll_events: Box<[EpollEvent]>,
bytes: Box<[u8]>,
wakers: Vec<Waker>,
wakers: std::collections::VecDeque<Waker>,
}
/// Converts any error into an I/O error.
@ -150,7 +127,7 @@ fn initialize() -> io::Result<Runtime> {
poller: Mutex::new(Poller {
epoll_events: vec![EpollEvent::empty(); 1000].into_boxed_slice(),
bytes: vec![0; 1000].into_boxed_slice(),
wakers: Vec::new(),
wakers: Default::default(),
}),
registry,
@ -193,16 +170,16 @@ impl Registry {
// TODO: if epoll fails, remove the entry
Ok(Async {
source: Some(Box::new(source)),
flavor: Flavor::Socket(entry),
source: Box::new(source),
entry,
})
}
// TODO: we probably don't need to pass fd because it's in the entry
fn unregister(&self, fd: RawFd, index: usize) -> io::Result<()> {
fn unregister(&self, fd: RawFd, index: usize) {
self.entries.lock().remove(index);
epoll_ctl(self.epoll, EpollOp::EpollCtlDel, fd, None).map_err(io_err)?;
Ok(())
// Ignore errors because an event in oneshot mode may unregister the fd before we do.
let _ = epoll_ctl(self.epoll, EpollOp::EpollCtlDel, fd, None);
}
}
@ -272,10 +249,14 @@ fn poll_io(poller: &mut Poller, timeout: Option<Duration>) {
if let Some(entry) = entries.get(index) {
if is_read {
poller.wakers.append(&mut entry.readers.lock());
for w in entry.readers.lock().drain(..) {
poller.wakers.push_back(w);
}
}
if is_write {
poller.wakers.append(&mut entry.writers.lock());
for w in entry.writers.lock().drain(..) {
poller.wakers.push_front(w);
}
}
}
}
@ -299,80 +280,88 @@ fn interrupt() {
}
}
// ----- Timer -----
/// Fires at an instant in time.
pub struct Timer {
when: Instant,
inserted: bool,
}
impl Timer {
/// Fires after the specified duration of time.
pub fn after(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
/// Fires at the specified instant in time.
pub fn at(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
let id = self as *mut Timer as usize;
RT.timers.lock().remove(&(self.when, id));
self.inserted = false;
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Timer as usize;
let mut timers = RT.timers.lock();
if Instant::now() >= self.when {
timers.remove(&(self.when, id));
return Poll::Ready(self.when);
}
if !self.inserted {
let mut is_earliest = false;
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
is_earliest = true;
}
}
let waker = cx.waker().clone();
timers.insert((self.when, id), waker);
self.inserted = true;
if is_earliest {
drop(timers);
interrupt();
}
}
Poll::Pending
}
}
// ----- Executor -----
/// A runnable future, ready for execution.
type Runnable = async_task::Task<()>;
/// A scheduled future.
/// A spawned future.
#[must_use]
pub struct Task<T>(async_task::JoinHandle<T, ()>);
impl<T> Task<T> {
/// Starts an executor and runs a future on it.
pub fn run(future: impl Future<Output = T>) -> T {
pin_utils::pin_mut!(future);
// TODO: what is the behavior of nested run()s
// TODO Optimization: use thread-local cache
let ready = Arc::new(AtomicBool::new(true));
let waker = async_task::waker_fn({
let ready = ready.clone();
move || {
if !ready.swap(true, Ordering::SeqCst) {
interrupt();
let _m = RT.mutex.lock();
RT.cvar.notify_all();
}
}
});
loop {
while !ready.load(Ordering::SeqCst) {
match RT.receiver.try_recv() {
Ok(runnable) => {
let _ = catch_unwind(|| runnable.run());
}
Err(_) => {
let mut m = RT.mutex.lock();
if *m {
if !ready.load(Ordering::SeqCst) {
RT.cvar.wait(&mut m);
}
continue;
}
*m = true;
drop(m);
// TODO: if this panics, set m to false and notify
if !ready.load(Ordering::SeqCst) {
poll(true);
}
m = RT.mutex.lock();
*m = false;
RT.cvar.notify_one();
}
}
}
ready.store(false, Ordering::SeqCst);
match future.as_mut().poll(&mut Context::from_waker(&waker)) {
Poll::Pending => {}
Poll::Ready(val) => return val,
}
}
}
}
impl<T: Send + 'static> Task<T> {
/// Schedules a future for execution.
/// Spawns a global future.
///
/// This future is allowed to be stolen by another executor.
pub fn schedule(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// Create a runnable and schedule it for execution.
let schedule = |runnable| {
RT.queue.send(runnable).unwrap();
@ -385,87 +374,14 @@ impl<T: Send + 'static> Task<T> {
Task(handle)
}
/// Schedules a future on the blocking thread pool.
/// Spawns a future onto the blocking thread pool.
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// TODO: ignore panics
use std::sync::atomic::*;
static SLEEPING: AtomicUsize = AtomicUsize::new(0);
struct Pool {
sender: channel::Sender<Runnable>,
receiver: channel::Receiver<Runnable>,
}
static POOL: Lazy<Pool> = Lazy::new(|| {
// Start a single worker thread waiting for the first task.
start_thread();
let (sender, receiver) = channel::unbounded();
Pool { sender, receiver }
});
fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);
let timeout = Duration::from_secs(1);
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let mut runnable = match POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
}
// Stop the thread.
return;
}
};
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
}
})
.expect("cannot start a blocking thread");
}
let (runnable, handle) = async_task::spawn(future, |r| POOL.sender.send(r).unwrap(), ());
runnable.schedule();
Task(handle)
crate::blocking(future)
}
}
impl<T: 'static> Task<T> {
/// Schedules a future on the current executor.
/// Spawns a future onto the current executor.
///
/// Panics if not called within an executor.
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T>
@ -474,11 +390,22 @@ impl<T: 'static> Task<T> {
{
// let (runnable, handle) = async_task::spawn_local(future, |t| todo!(), ());
// runnable.schedule();
// TODO: panic if not called inside a worker started with run()
// TODO: panic if not called inside a worker started with worker()
// TODO
todo!()
}
}
impl Task<()> {
pub fn detach(self) {}
}
impl<E: std::fmt::Debug + Send + 'static> Task<Result<(), E>> {
pub fn detach(self) {
Task::spawn(async { self.await.unwrap() }).detach();
}
}
impl<T> Future for Task<T> {
type Output = T;
@ -490,44 +417,206 @@ impl<T> Future for Task<T> {
}
}
// ----- Async I/O -----
/// Executes all futures until the main one completes.
pub fn run<T>(future: impl Future<Output = T>) -> T {
pin_utils::pin_mut!(future);
/// Asynchronous I/O.
pub struct Async<T> {
flavor: Flavor,
source: Option<Box<T>>,
}
// TODO: what is the behavior of nested run()s
// - maybe just panic
enum Flavor {
Socket(Arc<Entry>),
Timer(Instant, bool),
// TODO: Reader()
// TODO: Writer()
// TODO: Iter()
}
// TODO Optimization: use thread-local cache for ready and queue
let ready = Arc::new(AtomicBool::new(true));
impl<T> Drop for Async<T> {
fn drop(&mut self) {
let id = self as *mut Async<T> as usize;
match &mut self.flavor {
Flavor::Socket(entry) => {
// TODO: call entry.unregister();
// TODO: what about this unwrap?
RT.registry.unregister(entry.fd, entry.index).unwrap();
let waker = async_task::waker_fn({
let ready = ready.clone();
move || {
if !ready.swap(true, Ordering::SeqCst) {
interrupt();
let _m = RT.mutex.lock();
RT.cvar.notify_all();
}
Flavor::Timer(when, inserted) => {
// TODO: call timer.unregister();
if *inserted {
RT.timers.lock().remove(&(*when, id));
*inserted = false;
}
});
loop {
while !ready.load(Ordering::SeqCst) {
match RT.receiver.try_recv() {
Ok(runnable) => {
let _ = catch_unwind(|| runnable.run());
}
Err(_) => {
let mut m = RT.mutex.lock();
if *m {
if !ready.load(Ordering::SeqCst) {
RT.cvar.wait(&mut m);
}
continue;
}
*m = true;
drop(m);
// TODO: if this panics, set m to false and notify
if !ready.load(Ordering::SeqCst) {
poll(true);
}
m = RT.mutex.lock();
*m = false;
RT.cvar.notify_one();
}
}
}
ready.store(false, Ordering::SeqCst);
match future.as_mut().poll(&mut Context::from_waker(&waker)) {
Poll::Pending => {}
Poll::Ready(val) => return val,
}
}
}
// ----- Async sockets -----
// ----- Blocking -----
/// Moves blocking code onto a thread.
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
smol::Task::blocking(async move { $($expr)* }).await
};
}
/// Blocks on a single future.
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
todo!()
}
/// Converts a blocking iterator into a stream.
pub fn iter<T>(t: T) -> impl Stream<Item = T::Item> + Unpin + 'static
where
T: Iterator + Send + 'static,
{
// NOTE: stop task if the returned handle is dropped
todo!();
stream::empty()
}
/// Converts a blocking reader into an async reader.
pub fn reader(t: impl Read + Send + 'static) -> impl AsyncBufRead + Unpin + 'static {
// TODO: should we simply return Reader here?
// NOTE: stop task if the returned handle is dropped
todo!();
futures_util::io::empty()
}
/// Converts a blocking writer into an async writer.
pub fn writer(t: impl Write + Send + 'static) -> impl AsyncWrite + Unpin + 'static {
// TODO: should we simply return Writer here?
// NOTE: stop task if the returned handle is dropped
todo!();
futures_util::io::sink()
}
/// Blocks on async I/O.
pub struct BlockOn; // TODO
// TODO: struct ThreadPool and method fn spawn()
// - Task::blocking(fut) then calls THREAD_POOL.spawn(fut)
/// Spawns a future onto the blocking thread pool.
fn blocking<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// TODO: ignore panics
use std::sync::atomic::*;
static SLEEPING: AtomicUsize = AtomicUsize::new(0);
struct Pool {
sender: channel::Sender<Runnable>,
receiver: channel::Receiver<Runnable>,
}
static POOL: Lazy<Pool> = Lazy::new(|| {
// Start a single worker thread waiting for the first task.
start_thread();
let (sender, receiver) = channel::unbounded();
Pool { sender, receiver }
});
fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);
let timeout = Duration::from_secs(1);
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let mut runnable = match POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
}
// Stop the thread.
return;
}
};
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
}
})
.expect("cannot start a blocking thread");
}
let (runnable, handle) = async_task::spawn(future, |r| POOL.sender.send(r).unwrap(), ());
runnable.schedule();
Task(handle)
}
// ----- Async I/O -----
/// Asynchronous I/O.
pub struct Async<T> {
source: Box<T>,
entry: Arc<Entry>,
}
impl<T> Drop for Async<T> {
fn drop(&mut self) {
// TODO: call entry.unregister();
// TODO: what about this unwrap?
RT.registry.unregister(self.entry.fd, self.entry.index);
}
}
struct Entry {
fd: RawFd,
@ -544,22 +633,19 @@ impl<T: AsRawFd> Async<T> {
/// Gets a reference to the I/O source.
pub fn source(&self) -> &T {
self.source.as_ref().unwrap()
&self.source
}
/// Gets a mutable reference to the I/O source.
pub fn source_mut(&mut self) -> &mut T {
self.source.as_mut().unwrap()
&mut self.source
}
/// Converts a non-blocking read into an async operation.
pub async fn read_with<R>(&self, f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut f = f;
let mut source = self.source.as_ref().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.readers,
Flavor::Timer(..) => unreachable!(),
};
let mut source = &self.source;
let wakers = &self.entry.readers;
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
@ -569,22 +655,16 @@ impl<T: AsRawFd> Async<T> {
f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
let mut source = self.source.as_mut().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.readers,
Flavor::Timer(..) => unreachable!(),
};
let mut source = &mut self.source;
let wakers = &self.entry.readers;
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
/// Converts a non-blocking write into an async operation.
pub async fn write_with<R>(&self, f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut f = f;
let mut source = self.source.as_ref().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.writers,
Flavor::Timer(..) => unreachable!(),
};
let mut source = &self.source;
let wakers = &self.entry.writers;
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
@ -594,11 +674,8 @@ impl<T: AsRawFd> Async<T> {
f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
let mut source = self.source.as_mut().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.writers,
Flavor::Timer(..) => unreachable!(),
};
let mut source = &mut self.source;
let wakers = &self.entry.writers;
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
@ -705,122 +782,6 @@ where
}
}
// ----- Timer -----
impl Async<Instant> {
/// Completes after the specified duration of time.
pub fn timer(dur: Duration) -> Async<Instant> {
Async {
source: None,
flavor: Flavor::Timer(Instant::now() + dur, false),
}
}
}
impl Future for Async<Instant> {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Async<Instant> as usize;
let mut timers = RT.timers.lock();
match &mut self.flavor {
Flavor::Socket(..) => Poll::Pending,
Flavor::Timer(when, inserted) => {
if Instant::now() >= *when {
timers.remove(&(*when, id));
return Poll::Ready(*when);
}
if !*inserted {
let mut is_earliest = false;
if let Some((first, _)) = timers.keys().next() {
if *when < *first {
is_earliest = true;
}
}
let waker = cx.waker().clone();
timers.insert((*when, id), waker);
*inserted = true;
if is_earliest {
drop(timers);
interrupt();
}
}
Poll::Pending
}
}
}
}
// ----- Stdio -----
// NOTE: we can return Async<Stdin> because in AsyncRead we can read from the byte channel
// - but wait! Async<Stdin> doesn't impl AsyncRead because &Stdin doesn't impl Read
// - so we have to use Async<dyn Read> or impl AsyncRead manually for all types
// - if we do it manually, what about uds_windows?
// NOTE: Async<TcpStream> can be converted to Async<dyn Read>
impl Async<io::Stdin> {
/// Returns an async writer into stdin.
pub fn stdin() -> Async<io::Stdin> {
todo!()
}
pub async fn read_line(&self, buf: &mut String) -> io::Result<usize> {
todo!()
}
}
// NOTE: what happens if we drop stdout? how do we know when data is flushed
// - solution: there is poll_flush()
impl Async<io::Stdout> {
// Returns an async reader from stdout.
pub fn stdout() -> Async<io::Stdout> {
todo!()
}
}
impl Async<io::Stderr> {
/// Returns an async reader from stderr.
pub fn stderr() -> Async<io::Stderr> {
todo!()
}
}
// ----- Process -----
impl Async<Command> {
/// Executes a command and returns its output.
pub async fn output(cmd: Command) -> io::Result<Output> {
let mut cmd = cmd;
Task::blocking(async move { cmd.output() }).await
}
/// Executes a command and returns its exit status.
pub async fn status(cmd: Command) -> io::Result<ExitStatus> {
let mut cmd = cmd;
Task::blocking(async move { cmd.status() }).await
}
}
impl Async<Child> {
/// Waits for a child process to exit and returns its exit status.
pub async fn wait(child: Child) -> io::Result<ExitStatus> {
let mut child = child;
Task::blocking(async move { child.wait() }).await
}
/// Waits for a child process to exit and returns its output.
pub async fn wait_with_output(child: Child) -> io::Result<Output> {
Task::blocking(async move { child.wait_with_output() }).await
}
}
// ----- Networking -----
impl Async<TcpListener> {
@ -840,7 +801,6 @@ impl Async<TcpListener> {
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Unpin + '_ {
// TODO: can we return Async<Incoming>?
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
@ -972,7 +932,6 @@ impl Async<UnixListener> {
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Unpin + '_ {
// TODO: can we return Async<Incoming>?
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
@ -1040,148 +999,3 @@ impl Async<UnixDatagram> {
self.read_with(|source| source.recv(buf)).await
}
}
// ----- Blocking -----
impl Async<()> {
// /// Converts a blocking reader into an async reader.
// pub fn reader(t: impl Read + Send + 'static) -> impl AsyncRead + Unpin + 'static {
// // NOTE: stop task if the returned handle is dropped
// todo!();
// futures_util::io::empty()
// }
//
// /// Converts a blocking writer into an async writer.
// pub fn writer(t: impl Write + Send + 'static) -> impl AsyncWrite + Unpin + 'static {
// // NOTE: stop task if the returned handle is dropped
// todo!();
// futures_util::io::sink()
// }
/// Converts an iterator into a stream.
pub fn iter<T>(t: T) -> impl Stream<Item = T::Item> + Unpin + 'static
where
T: Iterator + Send + 'static,
{
// NOTE: stop task if the returned handle is dropped
todo!();
stream::empty()
}
}
// ----- Filesystem -----
impl Async<()> {
pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::canonicalize(path) }).await
}
pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<u64> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { fs::copy(src, dst) }).await
}
pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::create_dir(path) }).await
}
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::create_dir_all(path) }).await
}
pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { fs::hard_link(src, dst) }).await
}
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<fs::Metadata> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::metadata(path) }).await
}
pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read(path) }).await
}
pub async fn read_dir<P: AsRef<Path>>(
path: P,
) -> io::Result<impl Stream<Item = io::Result<fs::DirEntry>> + Unpin + 'static> {
let path = path.as_ref().to_owned();
let iter = Task::blocking(async move { fs::read_dir(path) }).await?;
Ok(Async::iter(iter))
}
pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read_link(path) }).await
}
pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read_to_string(path) }).await
}
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_dir(path) }).await
}
pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_dir_all(path) }).await
}
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_file(path) }).await
}
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
Task::blocking(async move { fs::rename(from, to) }).await
}
pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: fs::Permissions) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::set_permissions(path, perm) }).await
}
pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<fs::Metadata> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::symlink_metadata(path) }).await
}
pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let path = path.as_ref().to_owned();
let contents = contents.as_ref().to_owned();
Task::blocking(async move { fs::write(path, contents) }).await
}
// TODO: unix-only
pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { os::unix::fs::symlink(src, dst) }).await
}
// TODO: windows-only
// pub async fn symlink_dir<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
// let src = src.as_ref().to_owned();
// let dst = dst.as_ref().to_owned();
// Task::blocking(async move { os::windows::fs::symlink_dir(src, dst) }).await
// }
// TODO: windows-only
// pub async fn symlink_file<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
// let src = src.as_ref().to_owned();
// let dst = dst.as_ref().to_owned();
// Task::blocking(async move { os::windows::fs::symlink_dir(src, dst) }).await
// }
}