some changes

This commit is contained in:
Stjepan Glavina 2020-02-05 11:39:12 +01:00
parent e394e76f93
commit 9d50d5d562
4 changed files with 88 additions and 61 deletions

View File

@ -9,7 +9,10 @@ license = "MIT OR Apache-2.0"
[dependencies]
async-task = "1.3.0"
crossbeam = "0.7.3"
futures = "0.3.1"
crossbeam-channel = "0.4.0"
futures-core = "0.3.1"
futures-io = "0.3.1"
futures-util = { version = "0.3.1", default-features = false, features = ["std"] }
nix = "0.16.1"
num_cpus = "1.12.0"
once_cell = "1.3.1"

View File

@ -16,13 +16,13 @@ use smol::Async;
fn main() -> io::Result<()> {
block_on(async {
let mut stream = Async::<TcpStream>::connect("www.example.com:80").await?;
stream
.write_all(b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
.await?;
let mut html = String::new();
stream.read_to_string(&mut html).await?;
println!("{}", html);
let request = b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n";
stream.write_all(request).await?;
let mut response = String::new();
stream.read_to_string(&mut response).await?;
println!("{}", response);
Ok(())
})

View File

@ -8,9 +8,8 @@
use std::net::{TcpListener, TcpStream};
use futures::executor::block_on;
use futures::io;
use smol::Async;
use smol::{Async, Task};
async fn process(mut stream: Async<TcpStream>) -> io::Result<()> {
println!("Peer: {}", stream.source().peer_addr()?);
@ -19,13 +18,13 @@ async fn process(mut stream: Async<TcpStream>) -> io::Result<()> {
}
fn main() -> io::Result<()> {
block_on(async {
smol::run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:8080")?;
println!("Local: {}", listener.source().local_addr()?);
loop {
let (stream, _) = listener.accept().await?;
smol::spawn(async { process(stream).await.unwrap() });
Task::schedule(process(stream));
}
})
}

View File

@ -13,9 +13,9 @@ use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
use crossbeam::channel;
use futures::future;
use futures::io::{AsyncRead, AsyncWrite};
use crossbeam_channel as channel;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::future;
use nix::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
};
@ -26,13 +26,18 @@ use socket2::{Domain, Protocol, Socket, Type};
#[cfg(not(any(target_os = "linux", target_os = "android")))]
compile_error!("smol does not support this target OS");
// TODO: example with process spawn and output
// TODO: example with filesystem
// TODO: example with ctrl-c
// TODO: can we do wasm support?
// ----- Globals -----
struct Runtime {
epoll: RawFd,
entries: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
queue: channel::Sender<Task>,
queue: channel::Sender<Runnable>,
}
static RT: Lazy<Runtime> = Lazy::new(|| {
@ -72,12 +77,12 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
}
});
let (sender, receiver) = channel::unbounded::<Task>();
let (sender, receiver) = channel::unbounded::<Runnable>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
receiver.iter().for_each(|runnable| {
let _ = catch_unwind(|| runnable.run());
})
});
}
@ -94,14 +99,15 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
// Runs the future to completion on a new worker (have Mutex<Vec<Stealer>>)
// there will be no hidden threadpool!!
pub fn run<F, R>(future: F) -> Spawn<R>
/// Starts an executor and runs a future on it.
pub fn run<F, T>(future: F) -> T
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
// TODO: run() should propagate panics into caller
let handle = spawn(future);
// let handle = spawn(future);
todo!("run tasks from the queue until handle completes")
// Start a threadpool.
@ -120,27 +126,44 @@ where
}
/// A spawned future and its current state.
type Task = async_task::Task<()>;
type Runnable = async_task::Task<()>;
/// Spawns a future on the executor.
pub fn spawn<F, R>(future: F) -> Spawn<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
// Create a task and schedule it for execution.
let (task, handle) = async_task::spawn(future, |t| RT.queue.send(t).unwrap(), ());
task.schedule();
/// Awaits the output of a scheduled future.
pub struct Task<T>(async_task::JoinHandle<T, ()>);
// Return a join handle that retrieves the output of the future.
Spawn(handle)
impl<T> Task<T> {
/// Schedules a future for execution.
fn schedule<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
// Create a runnable and schedule it for execution.
let (runnable, handle) = async_task::spawn(future, |t| RT.queue.send(t).unwrap(), ());
runnable.schedule();
// Return a join handle that retrieves the output of the future.
Task(handle)
}
pub fn local<F>(future: F) -> Task<T>
where
F: Future<Output = T>,
{
todo!()
}
pub fn blocking<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
todo!()
}
}
/// Awaits the output of a spawned future.
pub struct Spawn<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for Spawn<R> {
type Output = R;
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
@ -152,28 +175,29 @@ impl<R> Future for Spawn<R> {
// ----- Blocking -----
pub fn blocking<F, R>(future: F) -> Spawn<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
todo!()
}
// TODO
// ----- Timer -----
pub fn timer(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
/// Completes at a certain point in time.
pub struct Timer {
when: Instant,
inserted: bool,
}
impl Timer {
fn at(when: Instant) -> Timer {
Timer {
when,
inserted: false,
}
}
fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
}
impl Drop for Timer {
fn drop(&mut self) {
if self.inserted {
@ -219,12 +243,20 @@ struct Entry {
writers: Mutex<Vec<Waker>>,
}
pub struct Registration<T> {
struct Registration<T> {
fd: RawFd,
source: T,
entry: Arc<Entry>,
}
impl<T> Drop for Registration<T> {
fn drop(&mut self) {
epoll_ctl(RT.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
RT.entries.lock().unwrap().remove(self.entry.index);
}
}
/// An async I/O handle.
pub struct Async<T>(Arc<Registration<T>>);
impl<T: AsRawFd> Async<T> {
@ -310,13 +342,6 @@ impl<T> Async<T> {
}
}
impl<T> Drop for Registration<T> {
fn drop(&mut self) {
epoll_ctl(RT.epoll, EpollOp::EpollCtlDel, self.fd, None).unwrap();
RT.entries.lock().unwrap().remove(self.entry.index);
}
}
impl<T> Clone for Async<T> {
fn clone(&self) -> Async<T> {
Async(self.0.clone())