This commit is contained in:
Stjepan Glavina 2020-02-06 18:45:46 +01:00
parent 149e26f5e8
commit efcf66a535
2 changed files with 96 additions and 62 deletions

View File

@ -16,6 +16,7 @@ nix = "0.16.1"
num_cpus = "1.12.0"
once_cell = "1.3.1"
parking_lot = "0.10.0"
sharded-slab = "0.0.8"
slab = "0.4.2"
socket2 = "0.3.11"

View File

@ -65,14 +65,18 @@ struct Runtime {
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
// Polling I/O events
epoll: RawFd,
entries: Mutex<Slab<Arc<Entry>>>,
poller: Mutex<Poller>,
registry: Registry,
// Interrupting epoll/kqueue/wepoll
notified: AtomicBool,
socket_notify: Socket,
socket_wakeup: Lazy<Async<Socket>, Box<dyn FnOnce() -> Async<Socket> + Send>>,
socket_wakeup: Async<Socket>,
}
struct Registry {
epoll: RawFd,
entries: Mutex<Slab<Arc<Entry>>>,
}
struct Poller {
@ -119,10 +123,15 @@ fn initialize() -> io::Result<Runtime> {
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
Ok(Runtime {
// TODO: convert Result to io::Result
let registry = Registry {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).expect("cannot create epoll"),
entries: Mutex::new(Slab::new()),
};
let sock2 = registry.register(sock2);
Ok(Runtime {
registry,
// TODO: convert Result to io::Result
timers: Mutex::new(BTreeMap::new()),
queue: sender,
poller: Mutex::new(Poller {
@ -132,10 +141,45 @@ fn initialize() -> io::Result<Runtime> {
}),
notified: AtomicBool::new(false),
socket_notify: sock1,
socket_wakeup: Lazy::new(Box::new(move || Async::register(sock2))),
socket_wakeup: sock2,
})
}
impl Registry {
fn register<T: AsRawFd>(&self, source: T) -> Async<T> {
let fd = source.as_raw_fd();
let entry = {
let mut entries = self.entries.lock();
let vacant = entries.vacant_entry();
let entry = Arc::new(Entry {
index: vacant.key(),
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
});
vacant.insert(entry.clone());
entry
};
// TODO: handle epoll errors
epoll_ctl(
self.epoll,
EpollOp::EpollCtlAdd,
fd,
Some(&mut EpollEvent::new(
EpollFlags::EPOLLET
| EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP,
entry.index as u64,
)),
)
.unwrap();
// TODO: if epoll fails, remove the entry
Async(Arc::new(Registration { fd, source, entry }))
}
}
static RT: Lazy<Runtime> = Lazy::new(|| initialize().expect("cannot initialize smol runtime"));
fn poll(block: bool) -> bool {
@ -190,10 +234,10 @@ fn poll_io(poller: &mut Poller, timeout: Option<Duration>) {
None => -1,
Some(t) => t.as_millis().try_into().expect("timer duration overflow"),
};
let n = epoll_wait(RT.epoll, &mut poller.epoll_events, timeout_ms).unwrap();
let n = epoll_wait(RT.registry.epoll, &mut poller.epoll_events, timeout_ms).unwrap();
if n > 0 {
let entries = RT.entries.lock();
let entries = RT.registry.entries.lock();
for ev in &poller.epoll_events[..n] {
let is_read = ev.events() != EpollFlags::EPOLLOUT;
let is_write = ev.events() != EpollFlags::EPOLLIN;
@ -230,8 +274,6 @@ fn interrupt() {
// ----- Executor -----
// Runs the future to completion on a new worker (have Mutex<Vec<Stealer>>)
// there will be no hidden threadpool!!
/// Starts an executor and runs a future on it.
pub fn run<F, T>(future: F) -> T
where
@ -247,10 +289,10 @@ where
todo!()
}
/// A spawned future and its current state.
/// A runnable future, ready for execution.
type Runnable = async_task::Task<()>;
/// Awaits the output of a scheduled future.
/// A scheduled future.
pub struct Task<T>(async_task::JoinHandle<T, ()>);
impl<T> Task<T> {
@ -268,6 +310,9 @@ impl<T> Task<T> {
Task(handle)
}
/// Schedules a future on the current executor.
///
/// Panics if not called within an executor.
pub fn local<F>(future: F) -> Task<T>
where
F: Future<Output = T> + 'static,
@ -279,6 +324,7 @@ impl<T> Task<T> {
todo!()
}
/// Schedules a future on the blocking thread pool.
pub fn blocking<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
@ -452,48 +498,38 @@ struct Registration<T> {
impl<T> Drop for Registration<T> {
fn drop(&mut self) {
epoll_ctl(RT.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
RT.entries.lock().remove(self.entry.index);
RT.registry.entries.lock().remove(self.entry.index);
epoll_ctl(RT.registry.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
}
}
/// An async I/O handle.
pub struct Async<T>(Arc<Registration<T>>);
impl<T: AsRawFd> Async<T> {
impl<T> Async<T> {
/// Turns a non-blocking I/O handle into an async I/O handle.
pub fn register(source: T) -> Async<T> {
let (index, entry) = {
let mut entries = RT.entries.lock();
let vacant = entries.vacant_entry();
let index = vacant.key();
let entry = Arc::new(Entry {
index,
readers: Mutex::new(Vec::new()),
writers: Mutex::new(Vec::new()),
});
vacant.insert(entry.clone());
(index, entry)
};
// TODO: handle epoll errors
epoll_ctl(
RT.epoll,
EpollOp::EpollCtlAdd,
source.as_raw_fd(),
Some(&mut EpollEvent::new(
EpollFlags::EPOLLET
| EpollFlags::EPOLLIN
| EpollFlags::EPOLLOUT
| EpollFlags::EPOLLRDHUP,
index as u64,
)),
)
.unwrap();
let fd = source.as_raw_fd();
Async(Arc::new(Registration { fd, source, entry }))
pub fn register(source: T) -> Async<T>
where
T: AsRawFd,
{
RT.registry.register(source)
}
// // NOTE: stop task if the returned handle is dropped
// fn reader(t: T) -> impl AsyncRead + Unpin
// where
// T: Read + 'static,
// {
// todo!()
// }
//
// // NOTE: stop task if the returned handle is dropped
// fn writer(t: T) -> impl AsyncWrite + Unpin
// where
// T: Write + 'static,
// {
// todo!()
// }
}
impl<T> Async<T> {
@ -503,30 +539,22 @@ impl<T> Async<T> {
}
}
impl<T: Read + 'static> Async<T> {
// NOTE: stop task if the returned handle is dropped
// TODO: fn reader(t: T) -> impl AsyncRead + Unpin {}
}
impl<T: Write + 'static> Async<T> {
// NOTE: stop task if the returned handle is dropped
// TODO: fn reader(t: T) -> impl AsyncWrite + Unpin {}
}
impl<T> Async<T> {
/// Turns a non-blocking read into an async operation.
pub async fn read_with<'a, R>(
&'a self,
mut f: impl FnMut(&'a T) -> io::Result<R>,
f: impl FnMut(&'a T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
future::poll_fn(|cx| self.poll_with(cx, &self.0.entry.readers, &mut f)).await
}
/// Turns a non-blocking write into an async operation.
pub async fn write_with<'a, R>(
&'a self,
mut f: impl FnMut(&'a T) -> io::Result<R>,
f: impl FnMut(&'a T) -> io::Result<R>,
) -> io::Result<R> {
let mut f = f;
future::poll_fn(|cx| self.poll_with(cx, &self.0.entry.writers, &mut f)).await
}
@ -534,8 +562,10 @@ impl<T> Async<T> {
&'a self,
cx: &mut Context<'_>,
wakers: &Mutex<Vec<Waker>>,
mut f: impl FnMut(&'a T) -> io::Result<R>,
f: impl FnMut(&'a T) -> io::Result<R>,
) -> Poll<io::Result<R>> {
let mut f = f;
// Attempt the non-blocking operation.
match f(self.source()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
@ -610,19 +640,22 @@ async_io_impls!(&Async<T>);
impl Async<Command> {
/// Executes a command and returns its output.
pub async fn output(mut cmd: Command) -> io::Result<Output> {
pub async fn output(cmd: Command) -> io::Result<Output> {
let mut cmd = cmd;
Task::blocking(async move { cmd.output() }).await
}
/// Executes a command and returns its exit status.
pub async fn status(mut cmd: Command) -> io::Result<ExitStatus> {
pub async fn status(cmd: Command) -> io::Result<ExitStatus> {
let mut cmd = cmd;
Task::blocking(async move { cmd.status() }).await
}
}
impl Async<Child> {
/// Waits for a child process to exit and returns its exit status.
pub async fn wait(mut child: Child) -> io::Result<ExitStatus> {
pub async fn wait(child: Child) -> io::Result<ExitStatus> {
let mut child = child;
Task::blocking(async move { child.wait() }).await
}