This commit is contained in:
Stjepan Glavina 2020-02-09 19:27:14 +01:00
parent eacbe58028
commit 5612e10cf4
2 changed files with 422 additions and 370 deletions

View File

@ -9,6 +9,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
async-task = "1.3.0"
crossbeam-channel = "0.4.0"
crossbeam-utils = "0.7.0"
futures-core = "0.3.3"
futures-io = "0.3.3"
futures-util = { version = "0.3.3", default-features = false, features = [] }

View File

@ -1,7 +1,8 @@
#![forbid(unsafe_code)]
// TODO: #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::collections::BTreeMap;
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::convert::TryInto;
use std::error::Error;
use std::fmt::Debug;
@ -21,6 +22,7 @@ use std::thread;
use std::time::{Duration, Instant};
use crossbeam_channel as channel;
use crossbeam_utils::sync::Parker;
use futures_core::stream::Stream;
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_util::future;
@ -43,106 +45,126 @@ compile_error!("smol does not support this target OS");
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
// TODO: implement FusedFuture for Task and Timer?
// ----- Event loop -----
// ----- Poller -----
struct Runtime {
// TODO: rename Runtime to Reactor?
// TODO: move executor into separate Executor struct (and maybe also have Blocking)
// TODO: also make Reactor::poll() and Reactor::interrupt()
// Executor
receiver: channel::Receiver<Runnable>,
queue: channel::Sender<Runnable>,
cvar: Condvar,
mutex: Mutex<bool>,
// Timers
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
// Polling I/O events
poller: Mutex<Poller>,
struct Poller {
registry: Registry,
// Interrupting epoll/kqueue/wepoll
notified: AtomicBool,
flag: AtomicBool,
socket_notify: Socket,
socket_wakeup: Async<Socket>,
}
static POLLER: Lazy<Poller> = Lazy::new(|| Poller::create().expect("cannot create poller"));
impl Poller {
fn create() -> io::Result<Poller> {
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
let addr = listener.local_addr()?;
// First socket: connect to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.connect(&addr);
let _ = sock1.set_nodelay(true)?;
sock1.set_send_buffer_size(1)?;
// Second socket: accept a client from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
let registry = Registry::create()?;
let sock2 = registry.register(sock2)?;
Ok(Poller {
registry,
flag: AtomicBool::new(false),
socket_notify: sock1,
socket_wakeup: sock2,
})
}
fn poll(&self) {
let interrupted = self.reset();
let next_timer = self.registry.poll_timers();
let timeout = if interrupted {
Some(Duration::from_secs(0))
} else {
next_timer.map(|when| Instant::now().saturating_duration_since(when))
};
self.registry.wait_io(timeout);
}
fn poll_quick(&self) {
self.registry.poll_timers();
self.registry.wait_io(Some(Duration::from_secs(0)));
}
/// Sets the interrupt flag and writes to the wakeup socket.
fn interrupt(&self) {
if !self.flag.load(Ordering::SeqCst) {
if !self.flag.swap(true, Ordering::SeqCst) {
loop {
match (&self.socket_notify).write(&[1]) {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => {
let _ = (&self.socket_notify).flush();
break;
}
}
}
}
}
}
/// Clears the interrupt flag and drains the wakeup socket.
fn reset(&self) -> bool {
let value = self.flag.swap(false, Ordering::SeqCst);
if value {
loop {
match self.socket_wakeup.source().read(&mut [0; 64]) {
Ok(n) if n > 0 => {}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
value
}
}
struct Registry {
epoll: RawFd,
entries: Mutex<Slab<Arc<Entry>>>,
}
struct Poller {
epoll_events: Box<[EpollEvent]>,
bytes: Box<[u8]>,
wakers: std::collections::VecDeque<Waker>,
}
/// Converts any error into an I/O error.
fn io_err(err: impl Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
fn initialize() -> io::Result<Runtime> {
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
let addr = listener.local_addr()?;
// First socket: connect to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.connect(&addr);
let _ = sock1.set_nodelay(true)?;
sock1.set_send_buffer_size(1)?;
// Second socket: accept a client from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
let registry = Registry {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
entries: Mutex::new(Slab::new()),
};
let sock2 = registry.register(sock2)?;
let (sender, receiver) = channel::unbounded::<Runnable>();
Ok(Runtime {
receiver,
queue: sender,
cvar: Condvar::new(),
mutex: Mutex::new(false),
timers: Mutex::new(BTreeMap::new()),
// TODO: convert Result to io::Result
poller: Mutex::new(Poller {
epoll_events: vec![EpollEvent::empty(); 1000].into_boxed_slice(),
bytes: vec![0; 1000].into_boxed_slice(),
wakers: Default::default(),
}),
registry,
notified: AtomicBool::new(false),
socket_notify: sock1,
socket_wakeup: sock2,
})
events: Mutex<Box<[EpollEvent]>>,
io_handles: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
}
impl Registry {
fn create() -> io::Result<Registry> {
Ok(Registry {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
events: Mutex::new(vec![EpollEvent::empty(); 1000].into_boxed_slice()),
io_handles: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
})
}
// TODO: insert/delete terminology?
fn register<T: AsRawFd>(&self, source: T) -> io::Result<Async<T>> {
let fd = source.as_raw_fd();
let entry = {
let mut entries = self.entries.lock();
let vacant = entries.vacant_entry();
let mut io_handles = self.io_handles.lock();
let vacant = io_handles.vacant_entry();
let entry = Arc::new(Entry {
fd,
index: vacant.key(),
@ -153,7 +175,6 @@ impl Registry {
entry
};
// TODO: handle epoll errors
epoll_ctl(
self.epoll,
EpollOp::EpollCtlAdd,
@ -177,182 +198,192 @@ impl Registry {
// TODO: we probably don't need to pass fd because it's in the entry
fn unregister(&self, fd: RawFd, index: usize) {
self.entries.lock().remove(index);
self.io_handles.lock().remove(index);
// Ignore errors because an event in oneshot mode may unregister the fd before we do.
let _ = epoll_ctl(self.epoll, EpollOp::EpollCtlDel, fd, None);
}
}
static RT: Lazy<Runtime> = Lazy::new(|| initialize().expect("cannot initialize smol runtime"));
fn poll_timers(&self) -> Option<Instant> {
let now = Instant::now();
let (ready, next_timer) = {
let mut timers = self.timers.lock();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
let next_timer = timers.keys().next().map(|(when, _)| *when);
(ready, next_timer)
};
fn poll(block: bool) -> bool {
let mut poller = match RT.poller.try_lock() {
None => return false,
Some(poller) => poller,
};
// Reset the interrupt flag and the wakeup socket.
let notified = RT.notified.swap(false, Ordering::SeqCst);
if notified {
loop {
match RT.socket_wakeup.source().read(&mut poller.bytes) {
Ok(n) if n > 0 => {}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
// Wake up ready timers.
for (_, waker) in ready {
waker.wake();
}
next_timer
}
let mut timeout = poll_timers(&mut poller);
if notified || !block {
timeout = Some(Duration::from_secs(0));
}
poll_io(&mut poller, timeout);
fn wait_io(&self, timeout: Option<Duration>) {
let mut events = if timeout == Some(Duration::from_secs(0)) {
match self.events.try_lock() {
None => return,
Some(e) => e,
}
} else {
self.events.lock()
};
true
}
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
fn poll_timers(poller: &mut Poller) -> Option<Duration> {
let now = Instant::now();
// TODO: handle unwrap
let n = epoll_wait(self.epoll, &mut events, timeout_ms).unwrap();
let (ready, next_event) = {
let mut timers = RT.timers.lock();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
let next_event = timers.keys().next().map(|(when, _)| *when - now);
(ready, next_event)
};
if n == 0 {
return;
}
// Wake up ready timers.
for (_, waker) in ready {
waker.wake();
}
let mut wakers = VecDeque::new();
let io_handles = self.io_handles.lock();
next_event
}
fn poll_io(poller: &mut Poller, timeout: Option<Duration>) {
poller.wakers.clear();
let timeout_ms = match timeout {
None => -1,
Some(t) => t.as_millis().try_into().expect("timer duration overflow"),
};
let n = epoll_wait(RT.registry.epoll, &mut poller.epoll_events, timeout_ms).unwrap();
if n > 0 {
let entries = RT.registry.entries.lock();
for ev in &poller.epoll_events[..n] {
for ev in &events[..n] {
let is_read = ev.events() != EpollFlags::EPOLLOUT;
let is_write = ev.events() != EpollFlags::EPOLLIN;
let index = ev.data() as usize;
if let Some(entry) = entries.get(index) {
// TODO: https://twitter.com/kingprotty/status/1222152589405384705?s=19
if let Some(entry) = io_handles.get(index) {
if is_read {
for w in entry.readers.lock().drain(..) {
poller.wakers.push_back(w);
wakers.push_back(w);
}
}
if is_write {
for w in entry.writers.lock().drain(..) {
poller.wakers.push_front(w);
wakers.push_front(w);
}
}
}
}
}
for waker in poller.wakers.drain(..) {
waker.wake();
}
}
drop(io_handles);
fn interrupt() {
if !RT.notified.load(Ordering::SeqCst) {
if !RT.notified.swap(true, Ordering::SeqCst) {
loop {
match (&RT.socket_notify).write(&[1]) {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
// Wake up ready I/O.
for waker in wakers {
waker.wake();
}
}
}
// ----- Timer -----
/// Fires at an instant in time.
pub struct Timer {
when: Instant,
inserted: bool,
}
impl Timer {
/// Fires after the specified duration of time.
pub fn after(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
/// Fires at the specified instant in time.
pub fn at(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
let id = self as *mut Timer as usize;
RT.timers.lock().remove(&(self.when, id));
self.inserted = false;
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Timer as usize;
let mut timers = RT.timers.lock();
if Instant::now() >= self.when {
timers.remove(&(self.when, id));
return Poll::Ready(self.when);
}
if !self.inserted {
let mut is_earliest = false;
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
is_earliest = true;
}
}
let waker = cx.waker().clone();
timers.insert((self.when, id), waker);
self.inserted = true;
if is_earliest {
drop(timers);
interrupt();
}
}
Poll::Pending
}
/// Converts any error into an I/O error.
fn io_err(err: impl Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
// ----- Executor -----
struct Executor {
receiver: channel::Receiver<Runnable>,
queue: channel::Sender<Runnable>,
cvar: Condvar,
mutex: Mutex<bool>,
}
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
let (sender, receiver) = channel::unbounded::<Runnable>();
Executor {
receiver,
queue: sender,
cvar: Condvar::new(),
mutex: Mutex::new(false),
}
});
/// A runnable future, ready for execution.
type Runnable = async_task::Task<()>;
/// Executes all futures until the main one completes.
pub fn run<T>(future: impl Future<Output = T>) -> T {
pin_utils::pin_mut!(future);
// TODO: panic on nested run()
// TODO Optimization: use thread-local cache for ready and queue
let ready = Arc::new(AtomicBool::new(true));
let waker = async_task::waker_fn({
let ready = ready.clone();
move || {
if !ready.swap(true, Ordering::SeqCst) {
POLLER.interrupt();
let _m = EXECUTOR.mutex.lock();
EXECUTOR.cvar.notify_all();
}
}
});
// The number of times the thread found work in a row.
let mut runs = 0;
// The number of times the thread didn't find work in a row.
let mut fails = 0;
loop {
while !ready.load(Ordering::SeqCst) {
if runs >= 64 {
runs = 0;
POLLER.poll_quick();
}
match EXECUTOR.receiver.try_recv() {
Ok(runnable) => {
runs += 1;
fails = 0;
let _ = catch_unwind(|| runnable.run());
}
Err(_) => {
runs = 0;
fails += 1;
POLLER.poll_quick();
if fails <= 1 {
continue;
}
if fails <= 3 {
std::thread::yield_now();
continue;
}
let mut m = EXECUTOR.mutex.lock();
if *m {
if !ready.load(Ordering::SeqCst) {
EXECUTOR.cvar.wait(&mut m);
}
continue;
}
*m = true;
drop(m);
// TODO: if this panics, set m to false and notify
POLLER.poll();
m = EXECUTOR.mutex.lock();
*m = false;
EXECUTOR.cvar.notify_one();
}
}
}
runs += 1;
fails = 0;
ready.store(false, Ordering::SeqCst);
match future.as_mut().poll(&mut Context::from_waker(&waker)) {
Poll::Pending => {}
Poll::Ready(val) => return val,
}
}
}
/// A spawned future.
pub struct Task<T>(async_task::JoinHandle<T, ()>);
@ -364,8 +395,8 @@ impl<T: Send + 'static> Task<T> {
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// Create a runnable and schedule it for execution.
let schedule = |runnable| {
RT.queue.send(runnable).unwrap();
interrupt();
EXECUTOR.queue.send(runnable).unwrap();
POLLER.interrupt();
};
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
@ -377,8 +408,10 @@ impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the blocking thread pool.
#[must_use]
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// TODO: do THREAD_POOL.spawn(future)
crate::blocking(future)
let (runnable, handle) =
async_task::spawn(future, |r| THREAD_POOL.sender.send(r).unwrap(), ());
runnable.schedule();
Task(handle)
}
}
@ -391,10 +424,7 @@ impl<T: 'static> Task<T> {
where
T: 'static,
{
// let (runnable, handle) = async_task::spawn_local(future, |t| todo!(), ());
// runnable.schedule();
// TODO: panic if not called inside a worker started with worker()
// TODO
// TODO: panic if not called inside a worker started with run()
todo!()
}
}
@ -417,85 +447,120 @@ impl<T> Future for Task<T> {
}
}
/// Executes all futures until the main one completes.
pub fn run<T>(future: impl Future<Output = T>) -> T {
pin_utils::pin_mut!(future);
// ----- Blocking -----
// TODO: what is the behavior of nested run()s
// - maybe just panic
// TODO Optimization: use thread-local cache for ready and queue
let ready = Arc::new(AtomicBool::new(true));
let waker = async_task::waker_fn({
let ready = ready.clone();
move || {
if !ready.swap(true, Ordering::SeqCst) {
interrupt();
let _m = RT.mutex.lock();
RT.cvar.notify_all();
}
}
});
loop {
while !ready.load(Ordering::SeqCst) {
match RT.receiver.try_recv() {
Ok(runnable) => {
let _ = catch_unwind(|| runnable.run());
}
Err(_) => {
let mut m = RT.mutex.lock();
if *m {
if !ready.load(Ordering::SeqCst) {
RT.cvar.wait(&mut m);
}
continue;
}
*m = true;
drop(m);
// TODO: if this panics, set m to false and notify
if !ready.load(Ordering::SeqCst) {
poll(true);
}
m = RT.mutex.lock();
*m = false;
RT.cvar.notify_one();
}
}
}
ready.store(false, Ordering::SeqCst);
match future.as_mut().poll(&mut Context::from_waker(&waker)) {
Poll::Pending => {}
Poll::Ready(val) => return val,
}
}
// TODO: struct ThreadPool and method fn spawn()
// - Task::blocking(fut) then calls THREAD_POOL.spawn(fut)
struct ThreadPool {
sender: channel::Sender<Runnable>,
receiver: channel::Receiver<Runnable>,
}
// ----- Blocking -----
static THREAD_POOL: Lazy<ThreadPool> = Lazy::new(|| {
// Start a single worker thread waiting for the first task.
start_thread();
let (sender, receiver) = channel::unbounded();
ThreadPool { sender, receiver }
});
fn start_thread() {
use std::sync::atomic::*;
static SLEEPING: AtomicUsize = AtomicUsize::new(0);
SLEEPING.fetch_add(1, Ordering::SeqCst);
let timeout = Duration::from_secs(1);
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let mut runnable = match THREAD_POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
}
// Stop the thread.
return;
}
};
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match THREAD_POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
}
})
.expect("cannot start a blocking thread");
}
/// Blocks on a single future.
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
thread_local! {
// Parker and waker associated with the current thread.
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
// Panic if `block_on()` is called recursively.
let (parker, waker) = &mut *cache.try_borrow_mut().ok().expect("recursive `block_on`");
pin_utils::pin_mut!(future);
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}
/// Moves blocking code onto a thread.
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
smol::Task::blocking(async move { $($expr)* }).await
$crate::Task::blocking(async move { $($expr)* }).await
};
}
/// Blocks on a single future.
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
todo!()
}
/// Converts a blocking iterator into a stream.
pub fn iter<T>(t: T) -> impl Stream<Item = T::Item> + Unpin + 'static
pub fn iter<T>(t: T) -> impl Stream<Item = T::Item> + Send + Unpin + 'static
where
T: Iterator + Send + 'static,
T::Item: Send,
{
// NOTE: stop task if the returned handle is dropped
todo!();
@ -503,15 +568,14 @@ where
}
/// Converts a blocking reader into an async reader.
pub fn reader(t: impl Read + Send + 'static) -> impl AsyncBufRead + Unpin + 'static {
// TODO: should we simply return Reader here?
pub fn reader(t: impl Read + Send + 'static) -> impl AsyncBufRead + Send + Unpin + 'static {
// NOTE: stop task if the returned handle is dropped
todo!();
futures_util::io::empty()
}
/// Converts a blocking writer into an async writer.
pub fn writer(t: impl Write + Send + 'static) -> impl AsyncWrite + Unpin + 'static {
pub fn writer(t: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
// TODO: should we simply return Writer here?
// NOTE: stop task if the returned handle is dropped
todo!();
@ -545,85 +609,72 @@ impl<T: AsyncWrite + Unpin> Write for BlockOn<T> {
}
}
// TODO: struct ThreadPool and method fn spawn()
// - Task::blocking(fut) then calls THREAD_POOL.spawn(fut)
// ----- Timer -----
/// Spawns a future onto the blocking thread pool.
fn blocking<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
// TODO: ignore panics
/// Fires at an instant in time.
pub struct Timer {
when: Instant,
inserted: bool,
}
use std::sync::atomic::*;
static SLEEPING: AtomicUsize = AtomicUsize::new(0);
struct Pool {
sender: channel::Sender<Runnable>,
receiver: channel::Receiver<Runnable>,
impl Timer {
/// Fires after the specified duration of time.
pub fn after(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
static POOL: Lazy<Pool> = Lazy::new(|| {
// Start a single worker thread waiting for the first task.
start_thread();
/// Fires at the specified instant in time.
pub fn at(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
}
let (sender, receiver) = channel::unbounded();
Pool { sender, receiver }
});
impl Drop for Timer {
fn drop(&mut self) {
let id = self as *mut Timer as usize;
POLLER.registry.timers.lock().remove(&(self.when, id));
self.inserted = false;
}
}
fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);
let timeout = Duration::from_secs(1);
impl Future for Timer {
type Output = Instant;
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let mut runnable = match POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let id = &mut *self as *mut Timer as usize;
let mut timers = POLLER.registry.timers.lock();
// Stop the thread.
return;
}
};
if Instant::now() >= self.when {
timers.remove(&(self.when, id));
return Poll::Ready(self.when);
}
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
if !self.inserted {
let mut is_earliest = false;
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
is_earliest = true;
}
})
.expect("cannot start a blocking thread");
}
}
let (runnable, handle) = async_task::spawn(future, |r| POOL.sender.send(r).unwrap(), ());
runnable.schedule();
Task(handle)
let waker = cx.waker().clone();
timers.insert((self.when, id), waker);
self.inserted = true;
if is_earliest {
drop(timers);
POLLER.interrupt();
}
}
Poll::Pending
}
}
// ----- Async I/O -----
@ -638,7 +689,7 @@ impl<T> Drop for Async<T> {
fn drop(&mut self) {
// TODO: call entry.unregister();
// TODO: what about this unwrap?
RT.registry.unregister(self.entry.fd, self.entry.index);
POLLER.registry.unregister(self.entry.fd, self.entry.index);
}
}
@ -652,7 +703,7 @@ struct Entry {
impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
RT.registry.register(source)
POLLER.registry.register(source)
}
}