Remove io-flag

This commit is contained in:
Stjepan Glavina 2020-03-18 16:51:44 +01:00
parent e1dbe00c45
commit 96690a0967
4 changed files with 96 additions and 115 deletions

View File

@ -13,8 +13,7 @@ crossbeam-queue = "0.1.2"
crossbeam-utils = "0.7.0"
futures-core = "0.3.3"
futures-io = "0.3.3"
futures-util = { version = "0.3.3", default-features = false, features = ["std", "io"] }
io-flag = { path = "io-flag" }
futures-util = { version = "0.3.3", default-features = false, features = ["io"] }
libc = "0.2.66"
once_cell = "1.3.1"
parking_lot = "0.10.0"

View File

@ -1,10 +0,0 @@
[package]
name = "io-flag"
version = "0.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
socket2 = "0.3.11"

View File

@ -1,101 +0,0 @@
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::sync::atomic::{self, AtomicBool, Ordering};
use socket2::{Domain, Socket, Type};
#[derive(Debug)]
pub struct IoFlag {
flag: AtomicBool,
socket_notify: Socket,
socket_wakeup: Socket,
}
impl IoFlag {
pub fn create() -> io::Result<IoFlag> {
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
let addr = listener.local_addr()?;
// First socket: connect to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.connect(&addr);
let _ = sock1.set_nodelay(true);
sock1.set_send_buffer_size(1)?;
// Second socket: accept a client from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
Ok(IoFlag {
flag: AtomicBool::new(false),
socket_notify: sock1,
socket_wakeup: sock2,
})
}
pub fn set(&self) {
atomic::fence(Ordering::SeqCst);
if !self.flag.load(Ordering::SeqCst) {
if !self.flag.swap(true, Ordering::SeqCst) {
loop {
match (&self.socket_notify).write(&[1]) {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
loop {
match (&self.socket_notify).flush() {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
}
}
pub fn get(&self) -> bool {
self.flag.load(Ordering::SeqCst)
}
pub fn clear(&self) -> bool {
let value = self.flag.swap(false, Ordering::SeqCst);
if value {
loop {
match (&self.socket_wakeup).read(&mut [0; 64]) {
Ok(n) if n > 0 => {}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
atomic::fence(Ordering::SeqCst);
value
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for IoFlag {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.socket_wakeup.as_raw_fd()
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawSocket for IoFlag {
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
self.socket_wakeup.as_raw_socket()
}
}

View File

@ -24,6 +24,7 @@ use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::panic::catch_unwind;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::thread::{self, ThreadId};
@ -43,7 +44,6 @@ use futures_io::{AsyncRead, AsyncWrite};
use futures_util::future;
use futures_util::io::AllowStdIo;
use futures_util::stream;
use io_flag::IoFlag;
use once_cell::sync::Lazy;
use parking_lot::{Condvar, Mutex, MutexGuard};
use slab::Slab;
@ -321,7 +321,7 @@ impl Executor {
// TODO: use piper::select! here
// block_on(piper::select! {
// waker.ready() => {}
// waker.ready() => break,
// poller = REACTOR.lock() => if !waker.get_ref().get() {
// poller.poll().expect("failure while polling I/O"),
// }
@ -1180,6 +1180,99 @@ impl Async<UnixDatagram> {
}
}
pub struct IoFlag {
flag: AtomicBool,
socket_notify: Socket,
socket_wakeup: Socket,
}
impl IoFlag {
pub fn create() -> io::Result<IoFlag> {
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://github.com/mhils/backports.socketpair/blob/master/backports/socketpair/__init__.py
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
let addr = listener.local_addr()?;
// First socket: connect to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.connect(&addr);
let _ = sock1.set_nodelay(true);
sock1.set_send_buffer_size(1)?;
// Second socket: accept a client from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
sock2.set_recv_buffer_size(1)?;
Ok(IoFlag {
flag: AtomicBool::new(false),
socket_notify: sock1,
socket_wakeup: sock2,
})
}
pub fn set(&self) {
atomic::fence(Ordering::SeqCst);
if !self.flag.load(Ordering::SeqCst) {
if !self.flag.swap(true, Ordering::SeqCst) {
loop {
match (&self.socket_notify).write(&[1]) {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
loop {
match (&self.socket_notify).flush() {
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
}
}
pub fn get(&self) -> bool {
self.flag.load(Ordering::SeqCst)
}
pub fn clear(&self) -> bool {
let value = self.flag.swap(false, Ordering::SeqCst);
if value {
loop {
match (&self.socket_wakeup).read(&mut [0; 64]) {
Ok(n) if n > 0 => {}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
_ => break,
}
}
}
atomic::fence(Ordering::SeqCst);
value
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for IoFlag {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.socket_wakeup.as_raw_fd()
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawSocket for IoFlag {
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
self.socket_wakeup.as_raw_socket()
}
}
impl Async<IoFlag> {
async fn ready(&self) {
self.read_with(|inner| match inner.get() {