Replace crossbeam with flume

This commit is contained in:
Stjepan Glavina 2020-09-18 14:31:53 +02:00
parent c533cc6a78
commit deb709f419
11 changed files with 18 additions and 58 deletions

View File

@ -19,7 +19,7 @@ std = []
[dev-dependencies]
atomic-waker = "1.0.0"
concurrent-queue = "1.2.2"
crossbeam = "0.7.3"
easy-parallel = "3.1.0"
flume = { version = "0.9.0", default-features = false }
futures-lite = "1.7.0"
once_cell = "1.4.1"

View File

@ -5,11 +5,10 @@ use std::future::Future;
use std::rc::Rc;
use async_task::{JoinHandle, Task};
use crossbeam::channel::{unbounded, Receiver, Sender};
thread_local! {
// A channel that holds scheduled tasks.
static QUEUE: (Sender<Task>, Receiver<Task>) = unbounded();
static QUEUE: (flume::Sender<Task>, flume::Receiver<Task>) = flume::unbounded();
}
/// Spawns a future on the executor.
@ -35,8 +34,8 @@ where
R: 'static,
{
// Spawn a task that sends its result through a channel.
let (s, r) = unbounded();
spawn(async move { s.send(future.await).unwrap() }).detach();
let (s, r) = flume::unbounded();
spawn(async move { drop(s.send(future.await)) }).detach();
loop {
// If the original task has completed, return its result.

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
use std::thread;
use async_task::JoinHandle;
use crossbeam::channel;
use futures_lite::future;
/// Spawns a future on a new dedicated thread.
@ -17,7 +16,7 @@ where
R: Send + 'static,
{
// Create a channel that holds the task when it is scheduled for running.
let (sender, receiver) = channel::unbounded();
let (sender, receiver) = flume::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);

View File

@ -5,7 +5,6 @@ use std::panic::catch_unwind;
use std::thread;
use async_task::{JoinHandle, Task};
use crossbeam::channel::{unbounded, Sender};
use futures_lite::future;
use once_cell::sync::Lazy;
@ -16,8 +15,8 @@ where
R: Send + 'static,
{
// A channel that holds scheduled tasks.
static QUEUE: Lazy<Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = unbounded::<Task>();
static QUEUE: Lazy<flume::Sender<Task>> = Lazy::new(|| {
let (sender, receiver) = flume::unbounded::<Task>();
// Start the executor thread.
thread::spawn(|| {

View File

@ -9,7 +9,7 @@
//! All executors have some kind of queue that holds runnable tasks:
//!
//! ```
//! let (sender, receiver) = crossbeam::channel::unbounded();
//! let (sender, receiver) = flume::unbounded();
//! #
//! # // A future that will get spawned.
//! # let future = async { 1 + 2 };
@ -24,7 +24,7 @@
//! A task is constructed using either [`spawn`] or [`spawn_local`]:
//!
//! ```
//! # let (sender, receiver) = crossbeam::channel::unbounded();
//! # let (sender, receiver) = flume::unbounded();
//! #
//! // A future that will be spawned.
//! let future = async { 1 + 2 };
@ -47,7 +47,7 @@
//! runnable tasks out of the queue and running each one in order:
//!
//! ```no_run
//! # let (sender, receiver) = crossbeam::channel::unbounded();
//! # let (sender, receiver) = flume::unbounded();
//! #
//! # // A future that will get spawned.
//! # let future = async { 1 + 2 };

View File

@ -33,15 +33,13 @@ use crate::JoinHandle;
/// # Examples
///
/// ```
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async {
/// println!("Hello, world!");
/// };
///
/// // If the task gets woken up, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let (s, r) = flume::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
@ -94,15 +92,13 @@ where
/// # Examples
///
/// ```
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async {
/// println!("Hello, world!");
/// };
///
/// // If the task gets woken up, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let (s, r) = flume::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
@ -266,24 +262,6 @@ impl Task {
Waker::from_raw(raw_waker)
}
}
/// Converts this task into a raw pointer.
pub fn into_raw(self) -> *mut () {
let ptr = self.raw_task.as_ptr();
mem::forget(self);
ptr
}
/// Converts a raw pointer into a task.
///
/// This method should only be used with raw pointers returned from [`into_raw`].
///
/// [`into_raw`]: #method.into_raw
pub unsafe fn from_raw(raw: *mut ()) -> Task {
Task {
raw_task: NonNull::new_unchecked(raw as *mut ()),
}
}
}
impl Drop for Task {

View File

@ -4,7 +4,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use async_task::Task;
use crossbeam::channel;
use futures_lite::future;
// Creates a future with event counters.
@ -228,7 +227,7 @@ fn cancel_join() {
#[test]
fn schedule() {
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
let schedule = move |t| s.send(t).unwrap();
let (task, _handle) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
@ -250,7 +249,7 @@ fn schedule() {
fn schedule_counter() {
static COUNT: AtomicUsize = AtomicUsize::new(0);
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
let schedule = move |t: Task| {
COUNT.fetch_add(1, Ordering::SeqCst);
s.send(t).unwrap();
@ -284,7 +283,7 @@ fn drop_inside_schedule() {
#[test]
fn waker() {
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
let schedule = move |t| s.send(t).unwrap();
let (task, _handle) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
@ -298,12 +297,3 @@ fn waker() {
w.wake();
r.recv().unwrap();
}
#[test]
fn raw() {
let (task, _handle) = async_task::spawn(async {}, |_| panic!());
let a = task.into_raw();
let task = unsafe { Task::from_raw(a) };
task.run();
}

View File

@ -1,6 +1,4 @@
use std::cell::Cell;
use std::future::Future;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};

View File

@ -9,7 +9,6 @@ use std::time::Duration;
use async_task::Task;
use atomic_waker::AtomicWaker;
use crossbeam::channel;
use easy_parallel::Parallel;
use futures_lite::future;
@ -75,7 +74,7 @@ macro_rules! schedule {
static $sched: AtomicUsize = AtomicUsize::new(0);
let ($name, $chan) = {
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
struct Guard(Box<i32>);

View File

@ -7,7 +7,6 @@ use std::time::Duration;
use async_task::Task;
use atomic_waker::AtomicWaker;
use crossbeam::channel;
use easy_parallel::Parallel;
// Creates a future with event counters.
@ -66,7 +65,7 @@ macro_rules! schedule {
static $sched: AtomicUsize = AtomicUsize::new(0);
let ($name, $chan) = {
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
struct Guard(Box<i32>);

View File

@ -8,7 +8,6 @@ use std::time::Duration;
use async_task::Task;
use atomic_waker::AtomicWaker;
use crossbeam::channel;
// Creates a future with event counters.
//
@ -72,7 +71,7 @@ macro_rules! schedule {
static $sched: AtomicUsize = AtomicUsize::new(0);
let ($name, $chan) = {
let (s, r) = channel::unbounded();
let (s, r) = flume::unbounded();
struct Guard(Box<i32>);