This commit is contained in:
Stjepan Glavina 2020-02-04 14:47:30 +01:00
parent dfb80d05fb
commit c63ee2bb96
1 changed files with 22 additions and 12 deletions

View File

@ -94,7 +94,7 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
// Runs the future to completion on a new worker (have Mutex<Vec<Stealer>>)
// there will be no hidden threadpool!!
pub fn run<F, R>(future: F) -> JoinHandle<R>
pub fn run<F, R>(future: F) -> Spawn<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
@ -123,7 +123,7 @@ where
type Task = async_task::Task<()>;
/// Spawns a future on the executor.
pub fn spawn<F, R>(future: F) -> JoinHandle<R>
pub fn spawn<F, R>(future: F) -> Spawn<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
@ -133,13 +133,13 @@ where
task.schedule();
// Return a join handle that retrieves the output of the future.
JoinHandle(handle)
Spawn(handle)
}
/// Awaits the output of a spawned future.
pub struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
pub struct Spawn<R>(async_task::JoinHandle<R, ()>);
impl<R> Future for JoinHandle<R> {
impl<R> Future for Spawn<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -150,6 +150,16 @@ impl<R> Future for JoinHandle<R> {
}
}
// ----- Blocking -----
pub fn blocking<F, R>(future: F) -> Spawn<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
todo!()
}
// ----- Timer -----
pub fn timer(dur: Duration) -> Timer {
@ -217,12 +227,9 @@ pub struct Registration<T> {
pub struct Async<T>(Arc<Registration<T>>);
impl<T> Async<T> {
impl<T: AsRawFd> Async<T> {
// note: make sure source is in non-blocking mode
pub fn register(source: T) -> Async<T>
where
T: AsRawFd,
{
pub fn register(source: T) -> Async<T> {
let mut entries = RT.entries.lock().unwrap();
let vacant = entries.vacant_entry();
let index = vacant.key();
@ -250,7 +257,9 @@ impl<T> Async<T> {
let fd = source.as_raw_fd();
Async(Arc::new(Registration { fd, source, entry }))
}
}
impl<T> Async<T> {
/// Gets a reference to the source I/O handle.
pub fn source(&self) -> &T {
&self.0.source
@ -275,7 +284,7 @@ impl<T> Async<T> {
fn poll_with<'a, R>(
&'a self,
cx: &mut Context<'_>,
m: &Mutex<Vec<Waker>>,
wakers: &Mutex<Vec<Waker>>,
mut f: impl FnMut(&'a T) -> io::Result<R>,
) -> Poll<io::Result<R>> {
// Attempt the non-blocking operation.
@ -285,7 +294,7 @@ impl<T> Async<T> {
}
// Acquire a lock on the waker list.
let mut wakers = m.lock().unwrap();
let mut wakers = wakers.lock().unwrap();
// Attempt the non-blocking operation again.
match f(self.source()) {
@ -355,6 +364,7 @@ impl Async<TcpStream> {
let mut last_err = None;
// Try connecting to each address one by one.
// TODO: use blocking pool to resolve
for addr in addr.to_socket_addrs()? {
match Self::connect_to(addr).await {
Ok(stream) => return Ok(stream),