Replace some crates

This commit is contained in:
Stjepan Glavina 2020-06-02 15:47:38 +02:00
parent 23d9f94264
commit 2253923ec8
6 changed files with 100 additions and 108 deletions

View File

@ -26,11 +26,11 @@ tokio02 = ["tokio"]
[dependencies]
async-task = "3.0.0"
crossbeam-deque = "0.7.3"
crossbeam-queue = "0.2.1"
crossbeam-utils = "0.7.2"
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
blocking = "0.4.4"
concurrent-queue = "1.0.0"
fastrand = "1.1.0"
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
once_cell = "1.3.1"
piper = "0.1.2"
scoped-tls-hkt = "0.1.2"

View File

@ -7,11 +7,7 @@
//! [`futures::executor::block_on()`]: https://docs.rs/futures/0.3/futures/executor/fn.block_on.html
//! [blog-post]: https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
use std::cell::RefCell;
use std::future::Future;
use std::task::{Context, Poll, Waker};
use crossbeam_utils::sync::Parker;
use crate::context;
@ -43,33 +39,7 @@ use crate::context;
/// })
/// ```
///
/// [`run()`]: crate::run()
/// [`run()`]: `crate::run()`
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
thread_local! {
// Parker and waker associated with the current thread.
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
// Panic if `block_on()` is called recursively.
let (parker, waker) = &*cache.borrow();
// If enabled, set up tokio before execution begins.
context::enter(|| {
futures_util::pin_mut!(future);
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
})
context::enter(|| blocking::block_on(future))
}

View File

@ -30,7 +30,7 @@ use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::{Duration, Instant};
use crossbeam_queue::ArrayQueue;
use concurrent_queue::ConcurrentQueue;
use futures_util::future;
use once_cell::sync::Lazy;
use slab::Slab;
@ -69,7 +69,7 @@ pub(crate) struct Reactor {
///
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ArrayQueue<TimerOp>,
timer_ops: ConcurrentQueue<TimerOp>,
/// An I/O event that is triggered when a new timer is registered.
///
@ -86,7 +86,7 @@ impl Reactor {
sources: piper::Mutex::new(Slab::new()),
events: piper::Lock::new(sys::Events::new()),
timers: piper::Mutex::new(BTreeMap::new()),
timer_ops: ArrayQueue::new(1000),
timer_ops: ConcurrentQueue::bounded(1000),
timer_event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
});
&REACTOR
@ -195,7 +195,7 @@ impl Reactor {
// Process timer operations, but no more than the queue capacity because otherwise we could
// keep popping operations forever.
for _ in 0..self.timer_ops.capacity() {
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);

View File

@ -54,7 +54,7 @@ pub(crate) type Runnable = async_task::Task<()>;
/// # });
/// ```
///
/// [`run()`]: crate::run()
/// [`run()`]: `crate::run()`
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
@ -75,7 +75,7 @@ impl<T: 'static> Task<T> {
/// # })
/// ```
///
/// [`run()`]: crate::run()
/// [`run()`]: `crate::run()`
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
ThreadLocalExecutor::spawn(future)
}
@ -97,7 +97,7 @@ impl<T: Send + 'static> Task<T> {
/// # });
/// ```
///
/// [`run()`]: crate::run()
/// [`run()`]: `crate::run()`
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
WorkStealingExecutor::get().spawn(future)
}

View File

@ -10,7 +10,7 @@ use std::future::Future;
use std::sync::Arc;
use std::thread::{self, ThreadId};
use crossbeam_queue::SegQueue;
use concurrent_queue::ConcurrentQueue;
use scoped_tls_hkt::scoped_thread_local;
use crate::io_event::IoEvent;
@ -33,7 +33,7 @@ pub(crate) struct ThreadLocalExecutor {
queue: RefCell<VecDeque<Runnable>>,
/// When another thread wakes a task belonging to this executor, it goes into this queue.
injector: Arc<SegQueue<Runnable>>,
injector: Arc<ConcurrentQueue<Runnable>>,
/// An I/O event that is triggered when another thread wakes a task belonging to this executor.
event: IoEvent,
@ -44,7 +44,7 @@ impl ThreadLocalExecutor {
pub fn new() -> ThreadLocalExecutor {
ThreadLocalExecutor {
queue: RefCell::new(VecDeque::new()),
injector: Arc::new(SegQueue::new()),
injector: Arc::new(ConcurrentQueue::unbounded()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
}
}
@ -84,7 +84,7 @@ impl ThreadLocalExecutor {
EXECUTOR.with(|ex| ex.queue.borrow_mut().push_back(runnable));
} else if let Some(injector) = injector.upgrade() {
// If scheduling from a different thread, push into the injector queue.
injector.push(runnable);
injector.push(runnable).unwrap();
}
// Trigger an I/O event to let the original thread know that a task has been

View File

@ -20,11 +20,10 @@
use std::cell::Cell;
use std::future::Future;
use std::num::Wrapping;
use std::panic;
use std::sync::{Arc, RwLock};
use crossbeam_deque as deque;
use crossbeam_utils::sync::ShardedLock;
use concurrent_queue::ConcurrentQueue;
use once_cell::sync::Lazy;
use scoped_tls_hkt::scoped_thread_local;
use slab::Slab;
@ -47,10 +46,10 @@ scoped_thread_local! {
pub(crate) struct WorkStealingExecutor {
/// When a thread that is not inside [`run()`][`crate::run()`] spawns or wakes a task, it goes
/// into this queue.
injector: deque::Injector<Runnable>,
injector: ConcurrentQueue<Runnable>,
/// Registered handles for stealing tasks from workers.
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
stealers: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
/// An I/O event that is triggered whenever there might be available tasks to run.
event: IoEvent,
@ -60,8 +59,8 @@ impl WorkStealingExecutor {
/// Returns a reference to the global work-stealing executor.
pub fn get() -> &'static WorkStealingExecutor {
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| WorkStealingExecutor {
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
injector: ConcurrentQueue::unbounded(),
stealers: RwLock::new(Slab::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
@ -86,7 +85,7 @@ impl WorkStealingExecutor {
WORKER.with(|w| w.push(runnable));
} else {
// If scheduling from a non-worker thread, push into the injector queue.
self.injector.push(runnable);
self.injector.push(runnable).unwrap();
// Notify workers that there is a task in the injector queue.
self.event.notify();
@ -110,10 +109,10 @@ impl WorkStealingExecutor {
let worker = Worker {
key: vacant.key(),
slot: Cell::new(None),
queue: deque::Worker::new_fifo(),
queue: Arc::new(ConcurrentQueue::bounded(512)),
executor: self,
};
vacant.insert(worker.queue.stealer());
vacant.insert(worker.queue.clone());
worker
}
@ -134,7 +133,7 @@ pub(crate) struct Worker<'a> {
/// A queue of tasks.
///
/// Other workers are able to steal tasks from this queue.
queue: deque::Worker<Runnable>,
queue: Arc<ConcurrentQueue<Runnable>>,
/// The parent work-stealing executor.
executor: &'a WorkStealingExecutor,
@ -217,21 +216,40 @@ impl Worker<'_> {
// Put the task into the slot.
if let Some(r) = self.slot.replace(Some(runnable)) {
// If the slot had a task, push it into the queue.
self.queue.push(r);
if let Err(err) = self.queue.push(r) {
use concurrent_queue::*;
match err {
PushError::Full(v) | PushError::Closed(v) => {
self.executor.injector.push(v).unwrap();
}
}
}
}
}
/// Moves a task from the slot into the local queue.
fn flush_slot(&self) {
if let Some(r) = self.slot.take() {
self.queue.push(r);
if let Err(err) = self.queue.push(r) {
use concurrent_queue::*;
match err {
PushError::Full(v) | PushError::Closed(v) => {
self.executor.injector.push(v).unwrap();
}
}
}
}
}
/// Finds the next task to run.
fn search(&self) -> Option<Runnable> {
// Check if there is a task in the slot or in the queue.
if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) {
// Check if there is a task in the slot.
if let Some(r) = self.slot.take() {
return Some(r);
}
// Check if there is a task in the queue.
if let Ok(r) = self.queue.pop() {
return Some(r);
}
@ -242,26 +260,61 @@ impl Worker<'_> {
// Try stealing from other workers.
let stealers = self.executor.stealers.read().unwrap();
retry_steal(|| {
// Pick a random starting point in the iterator list and rotate the list.
let n = stealers.len();
let start = fast_random(n);
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
// Remove this worker's stealer handle.
let iter = iter.filter(|(k, _)| *k != self.key);
// Pick a random starting point in the iterator list and rotate the list.
let n = stealers.len();
let start = fastrand::usize(..n);
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
// Try stealing from each worker in the list. Collecting stops as soon as we get a
// `Steal::Success`. Otherwise, if any steal attempt resulted in a `Steal::Retry`,
// that's the collected result and we'll retry from the beginning.
iter.map(|(_, s)| s.steal_batch_and_pop(&self.queue))
.collect()
})
// Remove this worker's stealer handle.
let iter = iter.filter(|(k, _)| *k != self.key);
let iter = iter.map(|(_, q)| q);
// Try stealing from each worker in the list.
for q in iter {
let count = self
.queue
.capacity()
.unwrap_or(usize::MAX)
.min((q.len() + 1) / 2);
// Steal half of the tasks from this worker.
for _ in 0..count {
if let Ok(r) = q.pop() {
self.push(r);
} else {
break;
}
}
// Check if there is a task in the slot.
if let Some(r) = self.slot.take() {
return Some(r);
}
}
None
}
/// Steals tasks from the injector queue.
fn steal_global(&self) -> Option<Runnable> {
retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue))
let count = self
.queue
.capacity()
.unwrap_or(usize::MAX)
.min((self.executor.injector.len() + 1) / 2);
// Steal half of the tasks from the injector queue.
for _ in 0..count {
if let Ok(r) = self.executor.injector.pop() {
self.push(r);
} else {
break;
}
}
// If anything was stolen, a task must be in the slot.
self.slot.take()
}
}
@ -276,7 +329,7 @@ impl Drop for Worker<'_> {
}
// Move all tasks in this worker's queue into the injector queue.
while let Some(r) = self.queue.pop() {
while let Ok(r) = self.queue.pop() {
r.schedule();
}
@ -285,34 +338,3 @@ impl Drop for Worker<'_> {
self.executor.event.notify();
}
}
/// Returns a random number in the interval `0..n`.
fn fast_random(n: usize) -> usize {
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`:
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((x.0 as u64).wrapping_mul(n as u64) >> 32) as usize
})
}
/// Retries a steal operation for as long as it returns `Steal::Retry`.
fn retry_steal<T>(mut steal_op: impl FnMut() -> deque::Steal<T>) -> Option<T> {
loop {
match steal_op() {
deque::Steal::Success(t) => return Some(t),
deque::Steal::Empty => return None,
deque::Steal::Retry => {}
}
}
}