Refactor etc

This commit is contained in:
Stjepan Glavina 2020-02-05 19:55:59 +01:00
parent b5bc9cf559
commit 8a238b892f
3 changed files with 182 additions and 48 deletions

View File

@ -8,13 +8,15 @@ license = "MIT OR Apache-2.0"
[dependencies]
async-task = "1.3.0"
crossbeam = "0.7.3"
crossbeam-channel = "0.4.0"
futures-core = "0.3.1"
futures-io = "0.3.1"
futures-util = { version = "0.3.1", default-features = false, features = ["std"] }
futures-core = "0.3.3"
futures-io = "0.3.3"
futures-util = { version = "0.3.3", default-features = false, features = ["std"] }
nix = "0.16.1"
num_cpus = "1.12.0"
once_cell = "1.3.1"
slab = "0.4.2"
socket2 = "0.3.11"
[dev-dependencies]
futures = { version = "0.3.3", default-features = false, features = ["std", "executor"] }

View File

@ -18,7 +18,7 @@ async fn process(mut stream: Async<TcpStream>) -> io::Result<()> {
}
fn main() -> io::Result<()> {
smol::run(async {
futures::executor::block_on(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:8080")?;
println!("Local: {}", listener.source().local_addr()?);

View File

@ -1,13 +1,15 @@
#![forbid(unsafe_code)]
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener, TcpStream, ToSocketAddrs};
use std::os::unix::io::{AsRawFd, RawFd};
use std::panic::catch_unwind;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
@ -26,10 +28,25 @@ use socket2::{Domain, Protocol, Socket, Type};
#[cfg(not(any(target_os = "linux", target_os = "android")))]
compile_error!("smol does not support this target OS");
// TODO: hello world example
// TODO: example with stdin
// TODO: example with process spawn and output
// TODO: example with timeout
// TODO: example with OS timers
// TODO: do we need a crate like async-pipe, which is like async byte channel?
// - impls AsyncRead and AsyncWrite
// - impls Stream
// - that allows us to "pipe" stdin into main task
// TODO: example with filesystem
// TODO: example that prints a file
// TODO: example with ctrl-c
// TODO: can we do wasm support?
// TODO: example with hyper
// TODO: generate OS-specific docs with --cfg docsrs
// #[cfg(any(windows, docsrs))]
// #[cfg_attr(docsrs, doc(cfg(windows)))]
// TODO: task cancellation?
// TODO: fix unwraps
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - maybe just call notify()
// ----- Globals -----
@ -38,43 +55,29 @@ struct Runtime {
entries: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
queue: channel::Sender<Runnable>,
poller: Mutex<Poller>,
notified: AtomicBool,
// sock_notify: Socket,
// sock_wakeup: Async<Socket>,
}
static RT: Lazy<Runtime> = Lazy::new(|| {
thread::spawn(|| {
let mut buffer = vec![EpollEvent::empty(); 1000];
loop {
let ready = {
let mut timers = RT.timers.lock().unwrap();
let pending = timers.split_off(&(Instant::now(), 0));
mem::replace(&mut *timers, pending)
};
for (_, waker) in ready {
waker.wake();
}
enum Operation {
AddTimer(Instant, usize),
DelTimer(Instant, usize),
AddEntry(usize, Arc<Entry>),
DelEntry(usize),
}
// todo: use a timeout
let n = epoll_wait(RT.epoll, &mut buffer, -1).unwrap();
let entries = RT.entries.lock().unwrap();
static SOCKETS: Lazy<(Socket, Async<Socket>)> = Lazy::new(|| {
&RT;
wakeup_sockets().unwrap()
});
for ev in &buffer[..n] {
let events = ev.events();
let index = ev.data() as usize;
if let Some(entry) = entries.get(index) {
if events != EpollFlags::EPOLLOUT {
for waker in entry.readers.lock().unwrap().drain(..) {
waker.wake();
}
}
if events != EpollFlags::EPOLLIN {
for waker in entry.writers.lock().unwrap().drain(..) {
waker.wake();
}
}
}
}
}
fn initialize() -> io::Result<Runtime> {
thread::spawn(|| loop {
poll(true);
});
let (sender, receiver) = channel::unbounded::<Runnable>();
@ -87,13 +90,139 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
});
}
Runtime {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).unwrap(),
Ok(Runtime {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).expect("cannot create epoll"),
entries: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
queue: sender,
poller: Mutex::new(Poller {
epoll_events: vec![EpollEvent::empty(); 1000],
}),
notified: AtomicBool::new(false),
// sock_notify: sock1,
// sock_wakeup: sock2,
})
}
static RT: Lazy<Runtime> = Lazy::new(|| initialize().expect("cannot initialize smol runtime"));
// ----- Poller -----
struct Poller {
epoll_events: Vec<EpollEvent>,
}
fn poll(block: bool) -> bool {
let mut poller = match RT.poller.try_lock() {
Ok(poller) => poller,
Err(_) => return false,
};
if RT.notified.swap(false, Ordering::SeqCst) {
// TODO: cache this buffer somewhere in Runtime
let mut tmp = [0; 1024];
loop {
match (&*SOCKETS.1.source()).read(&mut tmp) {
Ok(n) if n > 0 => {}
_ => break,
}
}
}
});
// TODO: handle operations
let mut timeout = poll_timers(&mut poller);
if !block {
timeout = Some(Duration::from_secs(0));
}
poll_io(&mut poller, timeout);
true
}
fn poll_timers(poller: &mut Poller) -> Option<Duration> {
let now = Instant::now();
let mut timers = RT.timers.lock().unwrap();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
// Wake up ready timers.
for (_, waker) in ready {
waker.wake();
}
timers.keys().next().map(|(when, _)| *when - now)
}
fn poll_io(poller: &mut Poller, timeout: Option<Duration>) {
let timeout_ms = match timeout {
None => -1,
Some(t) => t.as_millis().try_into().expect("timer duration overflow"),
};
let n = epoll_wait(RT.epoll, &mut poller.epoll_events, timeout_ms).unwrap();
let entries = RT.entries.lock().unwrap();
for ev in &poller.epoll_events[..n] {
let events = ev.events();
let index = ev.data() as usize;
if let Some(entry) = entries.get(index) {
if events != EpollFlags::EPOLLOUT {
for waker in entry.readers.lock().unwrap().drain(..) {
waker.wake();
}
}
if events != EpollFlags::EPOLLIN {
for waker in entry.writers.lock().unwrap().drain(..) {
waker.wake();
}
}
}
}
}
fn notify() {
if !RT.notified.load(Ordering::SeqCst) {
if !RT.notified.swap(true, Ordering::SeqCst) {
loop {
match (&SOCKETS.0).write(&[1]) {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
}
}
/// Returns a (writer, reader) pair of sockets for waking up the poller.
///
/// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
/// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
/// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
/// https://gist.github.com/geertj/4325783
fn wakeup_sockets() -> io::Result<(Socket, Async<Socket>)> {
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into();
listener.bind(&addr)?;
listener.listen(1)?;
let addr = listener.local_addr()?;
// First socket: connect to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.connect(&addr);
let _ = sock1.set_nodelay(true)?;
sock1.set_send_buffer_size(1)?;
// Second socket: accept a client from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
Ok((sock1, Async::register(sock2)))
}
// ----- Executor -----
@ -133,7 +262,7 @@ pub struct Task<T>(async_task::JoinHandle<T, ()>);
impl<T> Task<T> {
/// Schedules a future for execution.
fn schedule<F>(future: F) -> Task<T>
pub fn schedule<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
@ -166,6 +295,7 @@ impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// NOTE: always resume panics into Task with a "task cancelled" message on cancel
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
@ -186,14 +316,14 @@ pub struct Timer {
}
impl Timer {
fn at(when: Instant) -> Timer {
pub fn at(when: Instant) -> Timer {
Timer {
when,
inserted: false,
}
}
fn after(dur: Duration) -> Timer {
pub fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
}
@ -208,7 +338,7 @@ impl Drop for Timer {
}
impl Future for Timer {
type Output = ();
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Timer as usize;
@ -216,7 +346,7 @@ impl Future for Timer {
if Instant::now() >= self.when {
timers.remove(&(self.when, id));
return Poll::Ready(());
return Poll::Ready(self.when);
}
if !self.inserted {
@ -229,6 +359,7 @@ impl Future for Timer {
let waker = cx.waker().clone();
timers.insert((self.when, id), waker);
self.inserted = true;
notify();
}
Poll::Pending
@ -272,6 +403,7 @@ impl<T: AsRawFd> Async<T> {
});
vacant.insert(entry.clone());
// TODO: handle epoll errors
epoll_ctl(
RT.epoll,
EpollOp::EpollCtlAdd,