Update blocking

This commit is contained in:
Stjepan Glavina 2020-05-17 23:55:23 +02:00
parent 00b495f0ce
commit ce315f3d9a
5 changed files with 59 additions and 23 deletions

View File

@ -26,7 +26,7 @@ tokio02 = ["tokio"]
[dependencies]
async-task = "3.0.0"
blocking = "0.3.2"
blocking = "0.4.2"
crossbeam = "0.7.3"
futures-util = { version = "0.3.5", default-features = false, features = ["std"] }
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }

View File

@ -87,7 +87,7 @@ macro_rules! blocking {
pub fn iter<T: Send + 'static>(
iter: impl Iterator<Item = T> + Send + 'static,
) -> impl Stream<Item = T> + Send + Unpin + 'static {
::blocking::Blocking::new(iter)
::blocking::Unblock::new(iter)
}
/// Creates an async reader that runs on a thread.
@ -133,7 +133,7 @@ pub fn iter<T: Send + 'static>(
/// # std::io::Result::Ok(()) });
/// ```
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
::blocking::Blocking::new(reader)
::blocking::Unblock::new(reader)
}
/// Creates an async writer that runs on a thread.
@ -179,5 +179,5 @@ pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unp
/// # std::io::Result::Ok(()) });
/// ```
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
::blocking::Blocking::new(writer)
::blocking::Unblock::new(writer)
}

View File

@ -2,7 +2,7 @@
//!
//! A [`Task`] handle represents a spawned future that is run by the executor.
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -55,8 +55,21 @@ pub(crate) type Runnable = async_task::Task<()>;
///
/// [`run()`]: crate::run()
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
pub struct Task<T>(Inner<T>);
enum Inner<T> {
/// A regular task.
Handle(Option<async_task::JoinHandle<T, ()>>),
/// A blocking task.
Blocking(Pin<Box<dyn Future<Output = T> + Send>>),
}
impl<T> Task<T> {
pub(crate) fn from_handle(handle: async_task::JoinHandle<T, ()>) -> Task<T> {
Task(Inner::Handle(Some(handle)))
}
}
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
@ -132,8 +145,8 @@ impl<T: Send + 'static> Task<T> {
/// [`reader()`]: `crate::reader()`
/// [`writer()`]: `crate::writer()`
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
let closure = || crate::block_on(future);
Task::spawn(::blocking::Blocking::new(closure))
let future = ::blocking::unblock(|| crate::block_on(future));
Task(Inner::Blocking(Box::pin(future)))
}
}
@ -208,7 +221,12 @@ impl Task<()> {
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
match &mut self.0 {
Inner::Handle(handle) => {
handle.take().unwrap();
}
Inner::Blocking(..) => {}
}
}
}
@ -241,36 +259,54 @@ impl<T> Task<T> {
/// ```
pub async fn cancel(self) -> Option<T> {
// There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
// do `{ self }` here to avoid marking `self` as mutable.
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
// rebind the `self` argument.
let mut this = self;
match &mut this.0 {
Inner::Handle(handle) => {
let handle = handle.take().unwrap();
handle.cancel();
handle.await
}
Inner::Blocking(fut) => Some(fut.await),
}
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
if let Inner::Handle(Some(handle)) = &self.0 {
handle.cancel();
}
}
}
impl<T> Debug for Task<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Task")
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
match &mut self.0 {
Inner::Handle(handle) => match Pin::new(&mut handle.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
},
Inner::Blocking(fut) => fut.as_mut().poll(cx),
}
}
}
impl<T> Into<async_task::JoinHandle<T, ()>> for Task<T> {
fn into(mut self) -> async_task::JoinHandle<T, ()> {
self.0
.take()
.expect("task was already canceled or has failed")
match &mut self.0 {
Inner::Handle(handle) => handle
.take()
.expect("task was already canceled or has failed"),
Inner::Blocking(..) => panic!("cannot convert a blocking task into `JoinHandle`"),
}
}
}

View File

@ -96,7 +96,7 @@ impl ThreadLocalExecutor {
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
Task::from_handle(handle)
})
}

View File

@ -96,7 +96,7 @@ impl WorkStealingExecutor {
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
Task::from_handle(handle)
}
/// Registers a new worker.