Replace the linked list with a safer and less allocation-heavy alternative (#38)

* Use slab to avoid unsafe code

* Move send+sync impls down to Mutex

* Code review

* Unwrap the key earlier

* Reduce the scope of one of the unsafe blocks.
This commit is contained in:
John Nunley 2022-11-21 08:42:27 -08:00 committed by GitHub
parent 64965711e1
commit 0235e55a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 226 additions and 314 deletions

View File

@ -24,6 +24,7 @@ __test = []
[dependencies]
crossbeam-utils = { version = "0.8.12", default-features = false }
parking = { version = "2.0.0", optional = true }
slab = { version = "0.4.7", default-features = false }
[dev-dependencies]
waker-fn = "1"

View File

@ -1,6 +1,6 @@
//! The inner mechanism powering the `Event` type.
use crate::list::{Entry, List};
use crate::list::List;
use crate::node::Node;
use crate::queue::Queue;
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -11,7 +11,6 @@ use alloc::vec;
use alloc::vec::Vec;
use core::ops;
use core::ptr::NonNull;
/// Inner state of [`Event`].
pub(crate) struct Inner {
@ -25,14 +24,6 @@ pub(crate) struct Inner {
/// Queue of nodes waiting to be processed.
queue: Queue,
/// A single cached list entry to avoid allocations on the fast path of the insertion.
///
/// This field can only be written to when the `cache_used` field in the `list` structure
/// is false, or the user has a pointer to the `Entry` identical to this one and that user
/// has exclusive access to that `Entry`. An immutable pointer to this field is kept in
/// the `list` structure when it is in use.
cache: UnsafeCell<Entry>,
}
impl Inner {
@ -42,7 +33,6 @@ impl Inner {
notified: AtomicUsize::new(core::usize::MAX),
list: Mutex::new(List::new()),
queue: Queue::new(),
cache: UnsafeCell::new(Entry::new()),
}
}
@ -62,12 +52,6 @@ impl Inner {
// Acquire and drop the lock to make sure that the queue is flushed.
let _guard = self.lock();
}
/// Returns the pointer to the single cached list entry.
#[inline(always)]
pub(crate) fn cache_ptr(&self) -> NonNull<Entry> {
unsafe { NonNull::new_unchecked(self.cache.get()) }
}
}
/// The guard returned by [`Inner::lock`].
@ -88,11 +72,11 @@ impl ListGuard<'_> {
guard: &mut MutexGuard<'_, List>,
) {
// Process the start node.
tasks.extend(start_node.apply(guard, self.inner));
tasks.extend(start_node.apply(guard));
// Process all remaining nodes.
while let Some(node) = self.inner.queue.pop() {
tasks.extend(node.apply(guard, self.inner));
tasks.extend(node.apply(guard));
}
}
}
@ -125,7 +109,7 @@ impl Drop for ListGuard<'_> {
}
// Update the atomic `notified` counter.
let notified = if list.notified < list.len {
let notified = if list.notified < list.len() {
list.notified
} else {
core::usize::MAX
@ -224,3 +208,6 @@ impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
unsafe { &mut *self.mutex.value.get() }
}
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}

View File

@ -78,9 +78,10 @@ use alloc::sync::Arc;
use core::fmt;
use core::future::Future;
use core::mem::ManuallyDrop;
use core::mem::{self, ManuallyDrop};
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::ptr::{self, NonNull};
use core::ptr;
use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use core::task::{Context, Poll, Waker};
use core::usize;
@ -92,7 +93,7 @@ use std::time::{Duration, Instant};
use inner::Inner;
use list::{Entry, State};
use node::Node;
use node::{Node, TaskWaiting};
#[cfg(feature = "std")]
use parking::Unparker;
@ -168,9 +169,6 @@ pub struct Event {
inner: AtomicPtr<Inner>,
}
unsafe impl Send for Event {}
unsafe impl Sync for Event {}
#[cfg(feature = "std")]
impl UnwindSafe for Event {}
#[cfg(feature = "std")]
@ -210,31 +208,31 @@ impl Event {
let inner = self.inner();
// Try to acquire a lock in the inner list.
let entry = unsafe {
if let Some(mut lock) = (*inner).lock() {
let entry = lock.alloc((*inner).cache_ptr());
lock.insert(entry);
let state = {
let inner = unsafe { &*inner };
if let Some(mut lock) = inner.lock() {
let entry = lock.insert(Entry::new());
entry
ListenerState::HasNode(entry)
} else {
// Push entries into the queue indicating that we want to push a listener.
let (node, entry) = Node::listener();
(*inner).push(node);
inner.push(node);
// Indicate that there are nodes waiting to be notified.
(*inner)
inner
.notified
.compare_exchange(usize::MAX, 0, Ordering::AcqRel, Ordering::Relaxed)
.ok();
entry
ListenerState::Queued(entry)
}
};
// Register the listener.
let listener = EventListener {
inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
entry: Some(entry),
state,
};
// Make sure the listener is registered before whatever happens next.
@ -529,12 +527,20 @@ pub struct EventListener {
/// A reference to [`Event`]'s inner state.
inner: Arc<Inner>,
/// A pointer to this listener's entry in the linked list.
entry: Option<NonNull<Entry>>,
/// The current state of the listener.
state: ListenerState,
}
unsafe impl Send for EventListener {}
unsafe impl Sync for EventListener {}
enum ListenerState {
/// The listener has a node inside of the linked list.
HasNode(NonZeroUsize),
/// The listener has already been notified and has discarded its entry.
Discarded,
/// The listener has an entry in the queue that may or may not have a task waiting.
Queued(Arc<TaskWaiting>),
}
#[cfg(feature = "std")]
impl UnwindSafe for EventListener {}
@ -605,11 +611,26 @@ impl EventListener {
fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
// Take out the entry pointer and set it to `None`.
let entry = match self.entry.take() {
None => unreachable!("cannot wait twice on an `EventListener`"),
Some(entry) => entry,
};
let (parker, unparker) = parking::pair();
let entry = match self.state.take() {
ListenerState::HasNode(entry) => entry,
ListenerState::Queued(task_waiting) => {
// This listener is stuck in the backup queue.
// Wait for the task to be notified.
loop {
match task_waiting.status() {
Some(entry_id) => break entry_id,
None => {
// Register a task and park until it is notified.
task_waiting.register(Task::Thread(unparker.clone()));
parker.park();
}
}
}
}
ListenerState::Discarded => panic!("Cannot wait on a discarded listener"),
};
// Wait for the lock to be available.
let lock = || {
@ -628,22 +649,15 @@ impl EventListener {
// Set this listener's state to `Waiting`.
{
let e = unsafe { entry.as_ref() };
let mut list = lock();
if e.is_queued() {
// Write a task to be woken once the lock is acquired.
e.write_task(Task::Thread(unparker));
} else {
let mut list = lock();
// If the listener was notified, we're done.
match e.state().replace(State::Notified(false)) {
State::Notified(_) => {
list.remove(entry, self.inner.cache_ptr());
return true;
}
_ => e.state().set(State::Task(Task::Thread(unparker))),
// If the listener was notified, we're done.
match list.state(entry).replace(State::Notified(false)) {
State::Notified(_) => {
list.remove(entry);
return true;
}
_ => list.state(entry).set(State::Task(Task::Thread(unparker))),
}
}
@ -658,7 +672,7 @@ impl EventListener {
if now >= deadline {
// Remove the entry and check if notified.
let mut list = lock();
let state = list.remove(entry, self.inner.cache_ptr());
let state = list.remove(entry);
return state.is_notified();
}
@ -668,17 +682,16 @@ impl EventListener {
}
let mut list = lock();
let e = unsafe { entry.as_ref() };
// Do a dummy replace operation in order to take out the state.
match e.state().replace(State::Notified(false)) {
match list.state(entry).replace(State::Notified(false)) {
State::Notified(_) => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry, self.inner.cache_ptr());
list.remove(entry);
return true;
}
// Otherwise, set the state back to `Waiting`.
state => e.state().set(state),
state => list.state(entry).set(state),
}
}
}
@ -706,10 +719,10 @@ impl EventListener {
/// ```
pub fn discard(mut self) -> bool {
// If this listener has never picked up a notification...
if let Some(entry) = self.entry.take() {
if let ListenerState::HasNode(entry) = self.state.take() {
// Remove the listener from the list and return `true` if it was notified.
if let Some(mut lock) = self.inner.lock() {
let state = lock.remove(entry, self.inner.cache_ptr());
let state = lock.remove(entry);
if let State::Notified(_) = state {
return true;
@ -772,6 +785,30 @@ impl Future for EventListener {
#[allow(unreachable_patterns)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let entry = match self.state {
ListenerState::Discarded => {
unreachable!("cannot poll a completed `EventListener` future")
}
ListenerState::HasNode(ref entry) => *entry,
ListenerState::Queued(ref task_waiting) => {
loop {
// See if the task waiting has been completed.
match task_waiting.status() {
Some(entry_id) => {
self.state = ListenerState::HasNode(entry_id);
break entry_id;
}
None => {
// If not, wait for it to complete.
task_waiting.register(Task::Waker(cx.waker().clone()));
if task_waiting.status().is_none() {
return Poll::Pending;
}
}
}
}
}
};
let mut list = match self.inner.lock() {
Some(list) => list,
None => {
@ -787,20 +824,15 @@ impl Future for EventListener {
}
}
};
let entry = match self.entry {
None => unreachable!("cannot poll a completed `EventListener` future"),
Some(entry) => entry,
};
let state = unsafe { entry.as_ref().state() };
let state = list.state(entry);
// Do a dummy replace operation in order to take out the state.
match state.replace(State::Notified(false)) {
State::Notified(_) => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry, self.inner.cache_ptr());
list.remove(entry);
drop(list);
self.entry = None;
self.state = ListenerState::Discarded;
return Poll::Ready(());
}
State::Created => {
@ -827,12 +859,11 @@ impl Future for EventListener {
impl Drop for EventListener {
fn drop(&mut self) {
// If this listener has never picked up a notification...
if let Some(entry) = self.entry.take() {
if let ListenerState::HasNode(entry) = self.state.take() {
match self.inner.lock() {
Some(mut list) => {
// But if a notification was delivered to it...
if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr())
{
if let State::Notified(additional) = list.remove(entry) {
// Then pass it on to another active listener.
list.notify(1, additional);
}
@ -849,6 +880,12 @@ impl Drop for EventListener {
}
}
impl ListenerState {
fn take(&mut self) -> Self {
mem::replace(self, ListenerState::Discarded)
}
}
/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
@ -877,17 +914,6 @@ fn full_fence() {
}
}
/// Indicate that we're using spin-based contention and that we should yield the CPU.
#[inline]
fn yield_now() {
#[cfg(feature = "std")]
std::thread::yield_now();
#[cfg(not(feature = "std"))]
#[allow(deprecated)]
sync::atomic::spin_loop_hint();
}
#[cfg(any(feature = "__test", test))]
impl Event {
/// Locks the event.

View File

@ -1,13 +1,12 @@
//! The inner list of listeners.
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::Cell;
use crate::Task;
use alloc::boxed::Box;
use core::mem;
use core::ptr::{self, NonNull};
use core::num::NonZeroUsize;
use slab::Slab;
/// The state of a listener.
pub(crate) enum State {
@ -36,85 +35,71 @@ impl State {
/// An entry representing a registered listener.
pub(crate) struct Entry {
/// Shared state used to coordinate the listener under contention.
///
/// This is the only field that can be accessed without the list being locked.
shared_state: SharedState,
/// The state of this listener.
state: Cell<State>,
/// Previous entry in the linked list.
prev: Cell<Option<NonNull<Entry>>>,
prev: Cell<Option<NonZeroUsize>>,
/// Next entry in the linked list.
next: Cell<Option<NonNull<Entry>>>,
}
struct SharedState {
/// Information about this shared state.
state: AtomicUsize,
/// A task to wake up once we are inserted into the list.
insert_task: Cell<Option<Task>>,
next: Cell<Option<NonZeroUsize>>,
}
/// A linked list of entries.
pub(crate) struct List {
/// The raw list of entries.
entries: Slab<Entry>,
/// First entry in the list.
head: Option<NonNull<Entry>>,
head: Option<NonZeroUsize>,
/// Last entry in the list.
tail: Option<NonNull<Entry>>,
tail: Option<NonZeroUsize>,
/// The first unnotified entry in the list.
start: Option<NonNull<Entry>>,
/// Total number of entries in the list.
pub(crate) len: usize,
start: Option<NonZeroUsize>,
/// The number of notified entries in the list.
pub(crate) notified: usize,
/// Whether the cached entry is used.
cache_used: bool,
}
impl List {
/// Create a new, empty list.
pub(crate) fn new() -> Self {
// Create a Slab with a permanent entry occupying index 0, so that
// it is never used (and we can therefore use 0 as a sentinel value).
let mut entries = Slab::new();
entries.insert(Entry::new());
Self {
entries,
head: None,
tail: None,
start: None,
len: 0,
notified: 0,
cache_used: false,
}
}
/// Allocate a new entry.
pub(crate) unsafe fn alloc(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
if self.cache_used {
// Allocate an entry that is going to become the new tail.
NonNull::new_unchecked(Box::into_raw(Box::new(Entry::new())))
} else {
// No need to allocate - we can use the cached entry.
self.cache_used = true;
cache.as_ptr().write(Entry::new());
cache
}
/// Get the number of entries in the list.
pub(crate) fn len(&self) -> usize {
self.entries.len() - 1
}
/// Get the state of the entry at the given index.
pub(crate) fn state(&self, index: NonZeroUsize) -> &Cell<State> {
&self.entries[index.get()].state
}
/// Inserts a new entry into the list.
pub(crate) fn insert(&mut self, entry: NonNull<Entry>) {
pub(crate) fn insert(&mut self, entry: Entry) -> NonZeroUsize {
// Replace the tail with the new entry.
match mem::replace(&mut self.tail, Some(entry)) {
None => self.head = Some(entry),
Some(t) => unsafe {
t.as_ref().next.set(Some(entry));
entry.as_ref().prev.set(Some(t));
},
let key = NonZeroUsize::new(self.entries.vacant_key()).unwrap();
match mem::replace(&mut self.tail, Some(key)) {
None => self.head = Some(key),
Some(t) => {
self.entries[t.get()].next.set(Some(key));
entry.prev.set(Some(t));
}
}
// If there were no unnotified entries, this one is the first now.
@ -122,59 +107,45 @@ impl List {
self.start = self.tail;
}
// Bump the entry count.
self.len += 1;
}
// Insert the entry into the slab.
self.entries.insert(entry);
/// De-allocate an entry.
unsafe fn dealloc(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
// Free the cached entry.
self.cache_used = false;
entry.as_ref().state.replace(State::Created)
} else {
// Deallocate the entry.
Box::from_raw(entry.as_ptr()).state.into_inner()
}
// Return the key.
key
}
/// Removes an entry from the list and returns its state.
pub(crate) fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
unsafe {
let prev = entry.as_ref().prev.get();
let next = entry.as_ref().next.get();
pub(crate) fn remove(&mut self, key: NonZeroUsize) -> State {
let entry = self.entries.remove(key.get());
let prev = entry.prev.get();
let next = entry.next.get();
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => p.as_ref().next.set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => n.as_ref().prev.set(prev),
}
// If this was the first unnotified entry, move the pointer to the next one.
if self.start == Some(entry) {
self.start = next;
}
// Extract the state.
let state = entry.as_ref().state.replace(State::Created);
// Delete the entry.
self.dealloc(entry, cache);
// Update the counters.
if state.is_notified() {
self.notified = self.notified.saturating_sub(1);
}
self.len = self.len.saturating_sub(1);
state
// Unlink from the previous entry.
match prev {
None => self.head = next,
Some(p) => self.entries[p.get()].next.set(next),
}
// Unlink from the next entry.
match next {
None => self.tail = prev,
Some(n) => self.entries[n.get()].prev.set(prev),
}
// If this was the first unnotified entry, move the pointer to the next one.
if self.start == Some(key) {
self.start = next;
}
// Extract the state.
let state = entry.state.replace(State::Created);
// Update the counters.
if state.is_notified() {
self.notified = self.notified.saturating_sub(1);
}
state
}
/// Notifies a number of entries, either normally or as an additional notification.
@ -203,7 +174,7 @@ impl List {
None => break,
Some(e) => {
// Get the entry and move the pointer forward.
let e = unsafe { e.as_ref() };
let e = &self.entries[e.get()];
self.start = e.next.get();
// Set the state of this entry to `Notified` and notify.
@ -227,7 +198,7 @@ impl List {
None => break,
Some(e) => {
// Get the entry and move the pointer forward.
let e = unsafe { e.as_ref() };
let e = &self.entries[e.get()];
self.start = e.next.get();
// Set the state of this entry to `Notified` and notify.
@ -245,91 +216,12 @@ impl Entry {
/// Create a new, empty entry.
pub(crate) fn new() -> Self {
Self {
shared_state: SharedState {
state: AtomicUsize::new(0),
insert_task: Cell::new(None),
},
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(None),
}
}
/// Get the state of this entry.
pub(crate) fn state(&self) -> &Cell<State> {
&self.state
}
/// Tell whether this entry is currently queued.
///
/// This is only ever used as an optimization for `wait_internal`, hence that fact that
/// it is `std`-exclusive
#[cfg(feature = "std")]
pub(crate) fn is_queued(&self) -> bool {
self.shared_state.state.load(Ordering::Acquire) & QUEUED != 0
}
/// Write to the temporary task.
#[cold]
#[cfg(feature = "std")]
pub(crate) fn write_task(&self, task: Task) {
// Acquire the WRITING_STATE lock.
let mut state = self
.shared_state
.state
.fetch_or(WRITING_STATE, Ordering::AcqRel);
// Wait until the WRITING_STATE lock is released.
while state & WRITING_STATE != 0 {
state = self
.shared_state
.state
.fetch_or(WRITING_STATE, Ordering::AcqRel);
crate::yield_now();
}
// Write the task.
self.shared_state.insert_task.set(Some(task));
// Release the WRITING_STATE lock.
self.shared_state
.state
.fetch_and(!WRITING_STATE, Ordering::Release);
}
/// Dequeue the entry.
pub(crate) fn dequeue(&self) -> Option<Task> {
// Acquire the WRITING_STATE lock.
let mut state = self
.shared_state
.state
.fetch_or(WRITING_STATE, Ordering::AcqRel);
// Wait until the WRITING_STATE lock is released.
while state & WRITING_STATE != 0 {
state = self
.shared_state
.state
.fetch_or(WRITING_STATE, Ordering::AcqRel);
crate::yield_now();
}
// Read the task.
let task = self.shared_state.insert_task.take();
// Release the WRITING_STATE lock and also remove the QUEUED bit.
self.shared_state
.state
.fetch_and(!WRITING_STATE & !QUEUED, Ordering::Release);
task
}
/// Indicate that this entry has been queued.
pub(crate) fn enqueue(&self) {
self.shared_state.state.fetch_or(QUEUED, Ordering::SeqCst);
}
/// Indicate that this entry has been notified.
#[cold]
pub(crate) fn notify(&self, additional: bool) -> bool {
@ -343,9 +235,3 @@ impl Entry {
true
}
}
/// Set if we are currently queued.
const QUEUED: usize = 1 << 0;
/// Whether or not we are currently writing to the `insert_task` variable, synchronously.
const WRITING_STATE: usize = 1 << 1;

View File

@ -1,11 +1,12 @@
//! The node that makes up queues.
use crate::inner::Inner;
use crate::list::{Entry, List, State};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::{Notify, NotifyKind, Task};
use alloc::boxed::Box;
use core::ptr::NonNull;
use core::num::NonZeroUsize;
use crossbeam_utils::atomic::AtomicCell;
/// A node in the backup queue.
pub(crate) enum Node {
@ -13,8 +14,8 @@ pub(crate) enum Node {
// For some reason, the MSRV build says this variant is never constructed.
#[allow(dead_code)]
AddListener {
/// The pointer to the listener to add.
listener: Option<DistOwnedListener>,
/// The state of the listener that wants to be added.
task_waiting: Arc<TaskWaiting>,
},
/// This node is notifying a listener.
@ -22,8 +23,8 @@ pub(crate) enum Node {
/// This node is removing a listener.
RemoveListener {
/// The pointer to the listener to remove.
listener: NonNull<Entry>,
/// The ID of the listener to remove.
listener: NonZeroUsize,
/// Whether to propagate notifications to the next listener.
propagate: bool,
@ -33,54 +34,43 @@ pub(crate) enum Node {
Waiting(Task),
}
pub(crate) struct DistOwnedListener(NonNull<Entry>);
pub(crate) struct TaskWaiting {
/// The task that is being waited on.
task: AtomicCell<Option<Task>>,
impl DistOwnedListener {
/// extracts the contained entry pointer from the DOL,
/// without calling the DOL Drop handler (such that the returned pointer stays valid)
fn take(self) -> NonNull<Entry> {
core::mem::ManuallyDrop::new(self).0
}
}
impl Drop for DistOwnedListener {
fn drop(&mut self) {
drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
/// The ID of the new entry.
///
/// This is set to zero when the task is still queued.
entry_id: AtomicUsize,
}
impl Node {
pub(crate) fn listener() -> (Self, NonNull<Entry>) {
let entry = Box::into_raw(Box::new(Entry::new()));
let entry = unsafe { NonNull::new_unchecked(entry) };
pub(crate) fn listener() -> (Self, Arc<TaskWaiting>) {
// Create a new `TaskWaiting` structure.
let task_waiting = Arc::new(TaskWaiting {
task: AtomicCell::new(None),
entry_id: AtomicUsize::new(0),
});
(
Self::AddListener {
listener: Some(DistOwnedListener(entry)),
task_waiting: task_waiting.clone(),
},
entry,
task_waiting,
)
}
/// Indicate that this node has been enqueued.
pub(crate) fn enqueue(&self) {
if let Node::AddListener {
listener: Some(entry),
} = self
{
unsafe { entry.0.as_ref() }.enqueue();
}
}
/// Apply the node to the list.
pub(crate) fn apply(self, list: &mut List, inner: &Inner) -> Option<Task> {
pub(crate) fn apply(self, list: &mut List) -> Option<Task> {
match self {
Node::AddListener { mut listener } => {
// Add the listener to the list.
let entry = listener.take().unwrap().take();
list.insert(entry);
Node::AddListener { task_waiting } => {
// Add a new entry to the list.
let entry = Entry::new();
let key = list.insert(entry);
// Dequeue the listener.
return unsafe { entry.as_ref().dequeue() };
// Send the new key to the listener and wake it if necessary.
task_waiting.entry_id.store(key.get(), Ordering::Release);
return task_waiting.task.take();
}
Node::Notify(Notify { count, kind }) => {
// Notify the listener.
@ -94,7 +84,7 @@ impl Node {
propagate,
} => {
// Remove the listener from the list.
let state = list.remove(listener, inner.cache_ptr());
let state = list.remove(listener);
if let (true, State::Notified(additional)) = (propagate, state) {
// Propagate the notification to the next listener.
@ -109,3 +99,26 @@ impl Node {
None
}
}
impl TaskWaiting {
/// Determine if we are still queued.
///
/// Returns `Some` with the entry ID if we are no longer queued.
pub(crate) fn status(&self) -> Option<NonZeroUsize> {
NonZeroUsize::new(self.entry_id.load(Ordering::Acquire))
}
/// Register a listener.
pub(crate) fn register(&self, task: Task) {
// Set the task.
if let Some(task) = self.task.swap(Some(task)) {
task.wake();
}
// If the entry ID is non-zero, then we are no longer queued.
if self.status().is_some() {
// Wake the task.
self.task.take().unwrap().wake();
}
}
}

View File

@ -37,7 +37,6 @@ impl Queue {
/// Push a node to the tail end of the queue.
pub(crate) fn push(&self, node: Node) {
node.enqueue();
let node = Box::into_raw(Box::new(QueueNode {
next: AtomicPtr::new(ptr::null_mut()),
node,

View File

@ -134,13 +134,13 @@ fn drop_non_notified() {
let event = Event::new();
let mut l1 = event.listen();
//let mut l2 = event.listen();
let mut l2 = event.listen();
let l3 = event.listen();
event.notify(1);
drop(l3);
assert!(is_notified(&mut l1));
//assert!(!is_notified(&mut l2));
assert!(!is_notified(&mut l2));
}
#[test]