Merge pull request #18 from stjepang/fix-sigsegv

Try fixing sigsegv
This commit is contained in:
Stjepan Glavina 2020-10-02 19:30:17 +02:00 committed by GitHub
commit 7d1a4e3fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 34 deletions

View File

@ -24,7 +24,7 @@ log = "0.4.11"
libc = "0.2.77"
[target.'cfg(windows)'.dependencies]
wepoll-sys = "2.0.0"
wepoll-sys = "3.0.0"
winapi = { version = "0.3.9", features = ["ioapiset", "winsock2"] }
[dev-dependencies]

View File

@ -4,7 +4,8 @@ use std::convert::TryInto;
use std::io;
use std::os::windows::io::RawSocket;
use std::ptr;
use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use wepoll_sys as we;
use winapi::ctypes;
@ -27,6 +28,7 @@ macro_rules! wepoll {
#[derive(Debug)]
pub struct Poller {
handle: we::HANDLE,
notified: AtomicBool,
}
unsafe impl Send for Poller {}
@ -39,8 +41,9 @@ impl Poller {
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let notified = AtomicBool::new(false);
log::trace!("new: handle={:?}", handle);
Ok(Poller { handle })
Ok(Poller { handle, notified })
}
/// Adds a socket.
@ -74,46 +77,61 @@ impl Poller {
/// included in the `events` list nor contribute to the returned count.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::trace!("wait: handle={:?}, timeout={:?}", self.handle, timeout);
let deadline = timeout.map(|t| Instant::now() + t);
// Convert the timeout to milliseconds.
let timeout_ms = match timeout {
None => -1,
Some(t) => {
// Round up to a whole millisecond.
let mut ms = t.as_millis().try_into().unwrap_or(std::u64::MAX);
if Duration::from_millis(ms as u64) < t {
ms = ms.saturating_add(1);
loop {
// Convert the timeout to milliseconds.
let timeout_ms = match deadline.map(|d| d.saturating_duration_since(Instant::now())) {
None => -1,
Some(t) => {
// Round up to a whole millisecond.
let mut ms = t.as_millis().try_into().unwrap_or(std::u64::MAX);
if Duration::from_millis(ms) < t {
ms += 1;
}
ms.try_into().unwrap_or(std::i32::MAX)
}
ms.try_into().unwrap_or(std::i32::MAX)
}
};
};
// Wait for I/O events.
events.len = wepoll!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as ctypes::c_int,
timeout_ms,
))? as usize;
log::trace!("new events: handle={:?}, len={}", self.handle, events.len);
// Break if there was a notification or at least one event, or if deadline is reached.
if self.notified.swap(false, Ordering::SeqCst) || events.len > 0 || timeout_ms == 0 {
break;
}
}
// Wait for I/O events.
events.len = wepoll!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as ctypes::c_int,
timeout_ms,
))? as usize;
log::trace!("new events: handle={:?}, len={}", self.handle, events.len);
Ok(())
}
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::trace!("notify: handle={:?}", self.handle);
unsafe {
// This call errors if a notification has already been posted, but that's okay - we
// can just ignore the error.
//
// The original wepoll does not support notifications triggered this way, which is
// why wepoll-sys includes a small patch to support them.
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
ptr::null_mut(),
);
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)
{
unsafe {
// This call errors if a notification has already been posted, but that's okay - we
// can just ignore the error.
//
// The original wepoll does not support notifications triggered this way, which is
// why wepoll-sys includes a small patch to support them.
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
ptr::null_mut(),
);
}
}
Ok(())
}