Make timers and I/O work again

This commit is contained in:
Stjepan Glavina 2020-02-07 18:33:29 +01:00
parent 7bbe564d00
commit 38104eab1a
1 changed files with 119 additions and 121 deletions

View File

@ -159,6 +159,7 @@ impl Registry {
let mut entries = self.entries.lock();
let vacant = entries.vacant_entry();
let entry = Arc::new(Entry {
fd,
index: vacant.key(),
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
@ -184,10 +185,16 @@ impl Registry {
// TODO: if epoll fails, remove the entry
Async {
source: Box::new(source),
flavor: Flavor::Socket(Registration { fd, entry }),
source: Some(Box::new(source)),
flavor: Flavor::Socket(entry),
}
}
// TODO: we probably don't need to pass fd because it's in the entry
fn unregister(&self, fd: RawFd, index: usize) {
self.entries.lock().remove(index);
epoll_ctl(self.epoll, EpollOp::EpollCtlDel, fd, None).unwrap();
}
}
static RT: Lazy<Runtime> = Lazy::new(|| initialize().expect("cannot initialize smol runtime"));
@ -432,31 +439,20 @@ impl<T> Future for Task<T> {
// ----- Async I/O -----
struct Entry {
fd: RawFd,
index: usize,
readers: Mutex<Vec<Waker>>,
writers: Mutex<Vec<Waker>>,
}
struct Registration {
fd: RawFd,
entry: Arc<Entry>,
}
impl Drop for Registration {
fn drop(&mut self) {
RT.registry.entries.lock().remove(self.entry.index);
epoll_ctl(RT.registry.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
}
}
/// Asynchronous I/O.
pub struct Async<T: ?Sized> {
flavor: Flavor,
source: Box<T>,
source: Option<Box<T>>,
}
enum Flavor {
Socket(Registration),
Socket(Arc<Entry>),
Timer(Instant, bool),
}
@ -487,121 +483,108 @@ impl<T> Async<T> {
// NOTE: stop task if the returned handle is dropped
todo!()
}
}
impl<T: AsRawFd> Async<T> {
/// Turns a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> Async<T>
where
T: AsRawFd,
{
pub fn nonblocking(source: T) -> Async<T> {
RT.registry.register(source)
}
/// Gets a reference to the I/O source.
pub fn source(&self) -> &T {
&self.source
self.source.as_ref().unwrap()
}
/// Gets a mutable reference to the I/O source.
pub fn source_mut(&mut self) -> &mut T {
&mut self.source
self.source.as_mut().unwrap()
}
pub fn into_source(self) -> T {
// TODO: take from the Option<Box> and then Drop ignores it
todo!()
pub fn into_source(mut self) -> T {
let source = self.source.take().unwrap();
if let Flavor::Socket(entry) = &self.flavor {
RT.registry.unregister(entry.fd, entry.index);
}
*source
}
/// Turns a non-blocking read into an async operation.
pub async fn read_with<'a, R>(
&'a self,
f: impl FnMut(&'a T) -> io::Result<R>,
) -> io::Result<R> {
pub async fn read_with<R>(&self, f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().readers, &mut f)).await
todo!()
let mut source = self.source.as_ref().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.readers,
Flavor::Timer(..) => unreachable!(),
};
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
pub async fn read_with_mut<'a, R>(
&'a mut self,
f: impl FnMut(&'a mut T) -> io::Result<R>,
/// Turns a non-blocking read into an async operation.
pub async fn read_with_mut<R>(
&mut self,
f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().readers, &mut f)).await
todo!()
let mut source = self.source.as_mut().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.readers,
Flavor::Timer(..) => unreachable!(),
};
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
/// Turns a non-blocking write into an async operation.
pub async fn write_with<'a, R>(
&'a self,
f: impl FnMut(&'a T) -> io::Result<R>,
pub async fn write_with<R>(&self, f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut f = f;
let mut source = self.source.as_ref().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.writers,
Flavor::Timer(..) => unreachable!(),
};
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
/// Turns a non-blocking write into an async operation.
pub async fn write_with_mut<R>(
&mut self,
f: impl FnMut(&mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().writers, &mut f)).await
todo!()
let mut source = self.source.as_mut().unwrap();
let wakers = match &self.flavor {
Flavor::Socket(entry) => &entry.writers,
Flavor::Timer(..) => unreachable!(),
};
future::poll_fn(|cx| Self::poll_io(cx, |s| f(s), &mut source, wakers)).await
}
pub async fn write_with_mut<'a, R>(
&'a mut self,
f: impl FnMut(&'a mut T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
// future::poll_fn(|cx| self.poll_with(cx, &self.entry().writers, &mut f)).await
todo!()
}
fn poll_with<'a, R>(
&'a self,
fn poll_io<S, R>(
cx: &mut Context<'_>,
dir: Direction,
f: impl FnMut(&'a T) -> io::Result<R>,
mut f: impl FnMut(&mut S) -> io::Result<R>,
source: &mut S,
wakers: &Mutex<Vec<Waker>>,
) -> Poll<io::Result<R>> {
// let mut f = f;
//
// // Attempt the non-blocking operation.
// match f(&mut self.source) {
// Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
// res => return Poll::Ready(res),
// }
//
// // Acquire a lock on the waker list.
// let mut wakers = wakers.lock();
//
// // Attempt the non-blocking operation again.
// match f(&mut self.source) {
// Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
// res => return Poll::Ready(res),
// }
//
// // If it would still block, register the curent waker and return.
// if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
// wakers.push(cx.waker().clone());
// }
// Poll::Pending
todo!()
}
fn poll_with_mut<'a, R>(
&'a mut self,
cx: &mut Context<'_>,
dir: Direction,
// wakers: &Mutex<Vec<Waker>>,
f: impl FnMut(&'a mut T) -> io::Result<R>,
) -> Poll<io::Result<R>> {
todo!()
}
fn entry(&self) -> &Entry {
match &self.flavor {
Flavor::Socket(reg) => &reg.entry,
Flavor::Timer { .. } => unreachable!(),
// Attempt the non-blocking operation.
match f(source) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
}
fn entry_mut(&mut self) -> &Entry {
match &mut self.flavor {
Flavor::Socket(reg) => &mut reg.entry,
Flavor::Timer { .. } => unreachable!(),
// Acquire a lock on the waker list.
let mut wakers = wakers.lock();
// Attempt the non-blocking operation again.
match f(source) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
// If it would still block, register the curent waker and return.
if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
wakers.push(cx.waker().clone());
}
Poll::Pending
}
}
@ -609,23 +592,26 @@ impl Async<Instant> {
/// Completes after the specified duration of time.
pub fn timer(dur: Duration) -> Async<Instant> {
Async {
source: Box::new(todo!()), // TODO: have Box<Option>, set to None?
source: None,
flavor: Flavor::Timer(Instant::now() + dur, false),
}
}
}
enum Direction {
Read,
Write,
}
impl<T: ?Sized> Drop for Async<T> {
fn drop(&mut self) {
let id = self as *mut Async<T> as usize;
if let Flavor::Timer(when, inserted) = &self.flavor {
if *inserted {
RT.timers.lock().remove(&(*when, id));
match &mut self.flavor {
Flavor::Socket(entry) => {
if self.source.is_some() {
RT.registry.unregister(entry.fd, entry.index);
}
}
Flavor::Timer(when, inserted) => {
if *inserted {
RT.timers.lock().remove(&(*when, id));
}
}
}
}
@ -708,7 +694,7 @@ impl AsyncWrite for Async<dyn Write> {
}
}
impl<T: Read> AsyncRead for Async<T> {
impl<T: AsRawFd + Read> AsyncRead for Async<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -720,7 +706,7 @@ impl<T: Read> AsyncRead for Async<T> {
}
}
impl<T> AsyncRead for &Async<T>
impl<T: AsRawFd> AsyncRead for &Async<T>
where
for<'a> &'a T: Read,
{
@ -735,7 +721,7 @@ where
}
}
impl<T: Write> AsyncWrite for Async<T> {
impl<T: AsRawFd + Write> AsyncWrite for Async<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -757,7 +743,7 @@ impl<T: Write> AsyncWrite for Async<T> {
}
}
impl<T> AsyncWrite for &Async<T>
impl<T: AsRawFd> AsyncWrite for &Async<T>
where
for<'a> &'a T: Write,
{
@ -790,21 +776,33 @@ where
// - if we do it manually, what about uds_windows?
// NOTE: Async<TcpStream> can be converted to Async<dyn Read>
// impl Async<Stdin> {
// Returns an async writer into stdin.
// TODO: Async::stdin() -> Async<Stdin>
// TODO: Async::read_line()
// }
impl Async<io::Stdin> {
/// Returns an async writer into stdin.
pub fn stdin() -> Async<io::Stdin> {
todo!()
}
pub async fn read_line(&self, buf: &mut String) -> io::Result<usize> {
todo!()
}
}
// NOTE: what happens if we drop stdout? how do we know when data is flushed
// - solution: there is poll_flush()
// Returns an async reader from stdout.
// TODO: Async::stdout() -> Async<Stdout>
impl Async<io::Stdout> {
// Returns an async reader from stdout.
pub fn stdout() -> Async<io::Stdout> {
todo!()
}
}
// Returns an async reader from stderr.
// TODO: Async::stderr() -> Async<Stderr>
// }
impl Async<io::Stderr> {
/// Returns an async reader from stderr.
pub fn stderr() -> Async<io::Stderr> {
todo!()
}
}
// ----- Process -----