Filesystem support and big refactor

This commit is contained in:
Stjepan Glavina 2020-02-07 17:10:20 +01:00
parent bcae69c8c6
commit 372d60afa5
2 changed files with 373 additions and 123 deletions

View File

@ -16,8 +16,12 @@ nix = "0.16.1"
num_cpus = "1.12.0"
once_cell = "1.3.1"
parking_lot = "0.10.0"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
socket2 = "0.3.11"
[dev-dependencies]
futures = { version = "0.3.3", default-features = false, features = ["std", "executor"] }
hyper = { path = "../hyper" }
pin-utils = "0.1.0-alpha.4"
tokio = "0.2.11"

View File

@ -3,14 +3,16 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::fs;
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::os;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream};
use std::panic::catch_unwind;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::process::{Child, Command, ExitStatus, Output};
use std::sync::atomic::{AtomicBool, Ordering};
@ -44,9 +46,11 @@ compile_error!("smol does not support this target OS");
// TODO: example with OS timers
// TODO: example with Async::reader(std::io::stdin())
// TODO: example with Async::writer(std::io::stdout())
// TODO: example with signal-hook
// - Async::iter(Signals::new(&[SIGINT])?)
// TODO: example with ctrl-c
// TODO: example with filesystem
// TODO: example that prints a file
// TODO: example with ctrl-c
// TODO: example with hyper
// TODO: generate OS-specific docs with --cfg docsrs
// #[cfg(any(windows, docsrs))]
@ -56,6 +60,7 @@ compile_error!("smol does not support this target OS");
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - or maybe just call notify()
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
// TODO: constructors like Async::<TcpStream>::new(std_tcpstream)
// ----- Event loop -----
@ -178,7 +183,10 @@ impl Registry {
.unwrap();
// TODO: if epoll fails, remove the entry
Async(Flavor::Socket(Arc::new(Registration { fd, source, entry })))
Async {
source: Box::new(source),
flavor: Flavor::Socket(Registration { fd, entry }),
}
}
}
@ -421,8 +429,6 @@ impl<T> Future for Task<T> {
}
}
// ----- Timer -----
// ----- Async I/O -----
struct Entry {
@ -431,13 +437,12 @@ struct Entry {
writers: Mutex<Vec<Waker>>,
}
struct Registration<T> {
struct Registration {
fd: RawFd,
source: T,
entry: Arc<Entry>,
}
impl<T> Drop for Registration<T> {
impl Drop for Registration {
fn drop(&mut self) {
RT.registry.entries.lock().remove(self.entry.index);
epoll_ctl(RT.registry.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
@ -445,67 +450,65 @@ impl<T> Drop for Registration<T> {
}
/// Asynchronous I/O.
pub struct Async<T>(Flavor<T>);
pub struct Async<T: ?Sized> {
flavor: Flavor,
source: Box<T>,
}
enum Flavor<T> {
Socket(Arc<Registration<T>>),
Timer { when: Instant, inserted: bool },
enum Flavor {
Socket(Registration),
Timer(Instant, bool),
}
impl<T> Async<T> {
/// Turns a blocking iterator into an async stream.
pub fn iter(t: T) -> impl Stream<Item = T::Item> + Unpin
pub fn iter(t: T) -> Async<dyn Iterator<Item = T::Item>>
where
T: Iterator + 'static,
{
// NOTE: stop task if the returned handle is dropped
todo!();
stream::empty::<T::Item>()
todo!()
}
/// Turns a blocking reader into an async reader.
pub fn reader(t: T) -> impl AsyncRead + Unpin
pub fn reader(t: T) -> Async<dyn Read>
where
T: Read + 'static,
{
// NOTE: stop task if the returned handle is dropped
todo!();
futures_util::io::empty()
todo!()
}
/// Turns a blocking writer into an async writer.
pub fn writer(t: T) -> impl AsyncWrite + Unpin
pub fn writer(t: T) -> Async<dyn Write>
where
T: Write + 'static,
{
// NOTE: stop task if the returned handle is dropped
todo!();
futures_util::io::sink()
todo!()
}
}
impl Async<Instant> {
/// Completes after the specified duration of time.
pub fn timer(dur: Duration) -> Async<Instant> {
Async(Flavor::Timer {
when: Instant::now() + dur,
inserted: false,
})
}
}
impl<T: AsRawFd> Async<T> {
/// Turns a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> Async<T> {
pub fn nonblocking(source: T) -> Async<T>
where
T: AsRawFd,
{
RT.registry.register(source)
}
/// Gets a reference to the I/O source.
pub fn source(&self) -> &T {
match &self.0 {
Flavor::Socket(reg) => &reg.source,
Flavor::Timer { .. } => unreachable!(),
}
&self.source
}
/// Gets a mutable reference to the I/O source.
pub fn source_mut(&mut self) -> &mut T {
&mut self.source
}
pub fn into_source(self) -> T {
// TODO: take from the Option<Box> and then Drop ignores it
todo!()
}
/// Turns a non-blocking read into an async operation.
@ -514,7 +517,17 @@ impl<T: AsRawFd> Async<T> {
f: impl FnMut(&'a T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
future::poll_fn(|cx| self.poll_with(cx, &self.entry().readers, &mut f)).await
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().readers, &mut f)).await
todo!()
}
pub async fn read_with_mut<'a, R>(
&'a mut self,
f: impl FnMut(&'a mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().readers, &mut f)).await
todo!()
}
/// Turns a non-blocking write into an async operation.
@ -523,51 +536,94 @@ impl<T: AsRawFd> Async<T> {
f: impl FnMut(&'a T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
future::poll_fn(|cx| self.poll_with(cx, &self.entry().writers, &mut f)).await
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().writers, &mut f)).await
todo!()
}
pub async fn write_with_mut<'a, R>(
&'a mut self,
f: impl FnMut(&'a mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().writers, &mut f)).await
todo!()
}
fn poll_with<'a, R>(
&'a self,
cx: &mut Context<'_>,
wakers: &Mutex<Vec<Waker>>,
dir: Direction,
f: impl FnMut(&'a T) -> io::Result<R>,
) -> Poll<io::Result<R>> {
let mut f = f;
// let mut f = f;
//
// // Attempt the non-blocking operation.
// match f(&mut self.source) {
// Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
// res => return Poll::Ready(res),
// }
//
// // Acquire a lock on the waker list.
// let mut wakers = wakers.lock();
//
// // Attempt the non-blocking operation again.
// match f(&mut self.source) {
// Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
// res => return Poll::Ready(res),
// }
//
// // If it would still block, register the curent waker and return.
// if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
// wakers.push(cx.waker().clone());
// }
// Poll::Pending
todo!()
}
// Attempt the non-blocking operation.
match f(self.source()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
// Acquire a lock on the waker list.
let mut wakers = wakers.lock();
// Attempt the non-blocking operation again.
match f(self.source()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
// If it would still block, register the curent waker and return.
if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
wakers.push(cx.waker().clone());
}
Poll::Pending
fn poll_with_mut<'a, R>(
&'a mut self,
cx: &mut Context<'_>,
dir: Direction,
// wakers: &Mutex<Vec<Waker>>,
f: impl FnMut(&'a mut T) -> io::Result<R>,
) -> Poll<io::Result<R>> {
todo!()
}
fn entry(&self) -> &Entry {
match &self.0 {
match &self.flavor {
Flavor::Socket(reg) => &reg.entry,
Flavor::Timer { .. } => unreachable!(),
}
}
fn entry_mut(&mut self) -> &Entry {
match &mut self.flavor {
Flavor::Socket(reg) => &mut reg.entry,
Flavor::Timer { .. } => unreachable!(),
}
}
}
impl<T> Drop for Async<T> {
impl Async<Instant> {
/// Completes after the specified duration of time.
pub fn timer(dur: Duration) -> Async<Instant> {
Async {
source: Box::new(todo!()), // TODO: have Box<Option>, set to None?
flavor: Flavor::Timer(Instant::now() + dur, false),
}
}
}
enum Direction {
Read,
Write,
}
impl<T: ?Sized> Drop for Async<T> {
fn drop(&mut self) {
let id = self as *mut Async<T> as usize;
if let Flavor::Timer { when, inserted } = &self.0 {
if let Flavor::Timer(when, inserted) = &self.flavor {
if *inserted {
RT.timers.lock().remove(&(*when, id));
}
@ -582,9 +638,9 @@ impl Future for Async<Instant> {
let id = &mut *self as *mut Async<Instant> as usize;
let mut timers = RT.timers.lock();
match &mut self.0 {
match &mut self.flavor {
Flavor::Socket(..) => Poll::Pending,
Flavor::Timer { when, inserted } => {
Flavor::Timer(when, inserted) => {
if Instant::now() >= *when {
timers.remove(&(*when, id));
return Poll::Ready(*when);
@ -614,67 +670,140 @@ impl Future for Async<Instant> {
}
}
impl<T: AsRawFd> Clone for Async<T> {
fn clone(&self) -> Async<T> {
match &self.0 {
Flavor::Socket(reg) => Async(Flavor::Socket(reg.clone())),
Flavor::Timer { when, .. } => unreachable!(),
}
// ----- Async trait impls -----
impl<T> Stream for Async<dyn Iterator<Item = T>> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}
// Generate `AsyncRead` and `AsyncWrite` impls for `Async<T>` and `&Async<T>`.
macro_rules! async_io_impls {
($type:ty) => {
impl<T: AsRawFd> AsyncRead for $type
where
for<'a> &'a T: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.poll_with(cx, &self.entry().readers, |mut source| source.read(buf))
}
}
impl<T: AsRawFd> AsyncWrite for $type
where
for<'a> &'a T: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_with(cx, &self.entry().writers, |mut source| source.write(buf))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_with(cx, &self.entry().writers, |mut source| source.flush())
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
};
impl AsyncRead for Async<dyn Read> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
todo!()
}
}
impl AsyncWrite for Async<dyn Write> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
todo!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
todo!()
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl<T: Read> AsyncRead for Async<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let fut = self.read_with_mut(|source| source.read(buf));
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
}
impl<T> AsyncRead for &Async<T>
where
for<'a> &'a T: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let fut = self.read_with(|source| (&*source).read(buf));
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
}
impl<T: Write> AsyncWrite for Async<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let fut = self.write_with_mut(|source| source.write(buf));
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let fut = self.write_with_mut(|source| source.flush());
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl<T> AsyncWrite for &Async<T>
where
for<'a> &'a T: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let fut = self.write_with(|source| (&*source).write(buf));
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let fut = self.write_with(|source| (&*source).flush());
pin_utils::pin_mut!(fut);
fut.poll(cx)
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
async_io_impls!(Async<T>);
async_io_impls!(&Async<T>);
// ----- Stdio -----
// impl Async<()> {
// NOTE: we can return Async<Stdin> because in AsyncRead we can read from the byte channel
// - but wait! Async<Stdin> doesn't impl AsyncRead because &Stdin doesn't impl Read
// - so we have to use Async<dyn Read> or impl AsyncRead manually for all types
// - if we do it manually, what about uds_windows?
// NOTE: Async<TcpStream> can be converted to Async<dyn Read>
// impl Async<Stdin> {
// Returns an async writer into stdin.
// TODO: Async::stdin() -> impl AsyncWrite + Unpin
// TODO: Async::stdin() -> Async<Stdin>
// TODO: Async::read_line()
// }
// NOTE: what happens if we drop stdout? how do we know when data is flushed
// - solution: there is poll_flush()
// Returns an async reader from stdout.
// TODO: Async::stdout() -> impl AsyncRead + Unpin
// TODO: Async::stdout() -> Async<Stdout>
// Returns an async reader from stderr.
// TODO: Async::stderr() -> impl AsyncRead + Unpin
// TODO: Async::stderr() -> Async<Stderr>
// }
// ----- Process -----
@ -706,13 +835,6 @@ impl Async<Child> {
}
}
// ----- Filesystem -----
// TODO: filesystem operations (with OS-specific extensions), don't do Async<File>
// Async<DirBuilder>::create(builder, path)
// Async<File>::create(path) -> File // then you can put it into reader() or writer()
// Async<OpenOptions>::open(options, path)
// ----- Networking -----
impl Async<TcpListener> {
@ -732,6 +854,7 @@ impl Async<TcpListener> {
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Unpin + '_ {
// TODO: can we return Async<Incoming>?
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
@ -863,6 +986,7 @@ impl Async<UnixListener> {
/// Returns a stream over incoming connections.
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Unpin + '_ {
// TODO: can we return Async<Incoming>?
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
@ -930,3 +1054,125 @@ impl Async<UnixDatagram> {
self.read_with(|source| source.recv(buf)).await
}
}
// ----- Filesystem -----
impl Async<()> {
pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::canonicalize(path) }).await
}
pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<u64> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { fs::copy(src, dst) }).await
}
pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::create_dir(path) }).await
}
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::create_dir_all(path) }).await
}
pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { fs::hard_link(src, dst) }).await
}
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<fs::Metadata> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::metadata(path) }).await
}
pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read(path) }).await
}
pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<Async<fs::ReadDir>> {
let path = path.as_ref().to_owned();
Task::blocking(async move { todo!() }).await
}
pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read_link(path) }).await
}
pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::read_to_string(path) }).await
}
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_dir(path) }).await
}
pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_dir_all(path) }).await
}
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::remove_file(path) }).await
}
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
Task::blocking(async move { fs::rename(from, to) }).await
}
pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: fs::Permissions) -> io::Result<()> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::set_permissions(path, perm) }).await
}
pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<fs::Metadata> {
let path = path.as_ref().to_owned();
Task::blocking(async move { fs::symlink_metadata(path) }).await
}
pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let path = path.as_ref().to_owned();
let contents = contents.as_ref().to_owned();
Task::blocking(async move { fs::write(path, contents) }).await
}
// TODO: unix-only
pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
Task::blocking(async move { os::unix::fs::symlink(src, dst) }).await
}
// TODO: windows-only
// pub async fn symlink_dir<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
// let src = src.as_ref().to_owned();
// let dst = dst.as_ref().to_owned();
// Task::blocking(async move { os::windows::fs::symlink_dir(src, dst) }).await
// }
// TODO: windows-only
// pub async fn symlink_file<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
// let src = src.as_ref().to_owned();
// let dst = dst.as_ref().to_owned();
// Task::blocking(async move { os::windows::fs::symlink_dir(src, dst) }).await
// }
}
impl Stream for Async<fs::ReadDir> {
type Item = io::Result<fs::DirEntry>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}