Update async-io

This commit is contained in:
Stjepan Glavina 2020-10-03 16:37:09 +02:00
parent 9b6ba0ee6a
commit 906de8e99a
3 changed files with 62 additions and 21 deletions

View File

@ -16,17 +16,21 @@ readme = "README.md"
concurrent-queue = "1.2.2"
fastrand = "1.3.5"
futures-lite = "1.4.0"
libc = "0.2.78"
log = "0.4.11"
nb-connect = "1.0.0"
once_cell = "1.4.1"
parking = "2.0.0"
polling = "1.0.1"
polling = "2.0.0"
vec-arena = "1.0.0"
waker-fn = "1.1.0"
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["winsock2"] }
[dev-dependencies]
async-channel = "1.4.2"
async-net = "1.3.0"
# async-net = "1.3.0"
blocking = "1.0.0"
signal-hook = "0.1.16"
tempfile = "3.1.0"

View File

@ -53,7 +53,6 @@
//! # std::io::Result::Ok(()) });
//! ```
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::convert::TryFrom;
@ -73,7 +72,7 @@ use std::{
};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use std::os::windows::io::{AsRawSocket, RawSocket};
use futures_lite::io::{AsyncRead, AsyncWrite};
use futures_lite::stream::{self, Stream};
@ -393,8 +392,21 @@ impl<T: AsRawFd> Async<T> {
/// # std::io::Result::Ok(()) });
/// ```
pub fn new(io: T) -> io::Result<Async<T>> {
let fd = io.as_raw_fd();
// Put the file descriptor in non-blocking mode.
unsafe {
let mut res = libc::fcntl(fd, libc::F_GETFL);
if res != -1 {
res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
}
if res == -1 {
return Err(io::Error::last_os_error());
}
}
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_fd())?,
source: Reactor::get().insert_io(fd)?,
io: Some(io),
})
}
@ -434,8 +446,26 @@ impl<T: AsRawSocket> Async<T> {
/// # std::io::Result::Ok(()) });
/// ```
pub fn new(io: T) -> io::Result<Async<T>> {
let sock = io.as_raw_socket();
// Put the socket in non-blocking mode.
unsafe {
use winapi::ctypes;
use winapi::um::winsock2;
let mut nonblocking = true as ctypes::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_socket())?,
source: Reactor::get().insert_io(sock)?,
io: Some(io),
})
}

View File

@ -87,18 +87,25 @@ impl Reactor {
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
// Register the file descriptor.
self.poller.insert(raw)?;
// Create an I/O source for this file descriptor.
let mut sources = self.sources.lock().unwrap();
let key = sources.next_vacant();
let source = Arc::new(Source {
raw,
key,
state: Default::default(),
});
sources.insert(source.clone());
let source = {
let mut sources = self.sources.lock().unwrap();
let key = sources.next_vacant();
let source = Arc::new(Source {
raw,
key,
state: Default::default(),
});
sources.insert(source.clone());
source
};
// Register the file descriptor.
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
return Err(err);
}
Ok(source)
}
@ -107,7 +114,7 @@ impl Reactor {
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.poller.remove(source.raw)
self.poller.delete(source.raw)
}
/// Registers a timer in the reactor.
@ -283,7 +290,7 @@ impl ReactorLock<'_> {
// e.g. we were previously interested in both readability and writability,
// but only one of them was emitted.
if !state[READ].is_empty() || !state[WRITE].is_empty() {
self.reactor.poller.interest(
self.reactor.poller.modify(
source.raw,
Event {
key: source.key,
@ -415,7 +422,7 @@ impl Source {
// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.interest(
Reactor::get().poller.modify(
self.raw,
Event {
key: self.key,
@ -466,7 +473,7 @@ impl Source {
// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.interest(
Reactor::get().poller.modify(
self.raw,
Event {
key: self.key,