Ignore errors on reregister

This commit is contained in:
Stjepan Glavina 2020-02-15 11:06:22 +01:00
parent 324bd6308f
commit 8a9f5efe8d
1 changed files with 36 additions and 35 deletions

View File

@ -68,7 +68,7 @@ static POLLER: Lazy<Poller> = Lazy::new(|| Poller::create().expect("cannot creat
impl Reactor {
fn create() -> io::Result<Reactor> {
let io_flag = IoFlag::create()?;
POLLER.register(sys::RawSource::new(&io_flag.socket_wakeup))?;
POLLER.register(sys::Raw::new(&io_flag.socket_wakeup))?;
Ok(Reactor {
io_flag,
@ -124,7 +124,7 @@ impl Reactor {
// ----- Poller -----
struct Source {
raw: sys::RawSource,
raw: sys::Raw,
index: usize,
readers: Mutex<Vec<Waker>>,
writers: Mutex<Vec<Waker>>,
@ -151,7 +151,7 @@ impl Poller {
})
}
fn register(&self, raw: sys::RawSource) -> io::Result<Arc<Source>> {
fn register(&self, raw: sys::Raw) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock();
let vacant = sources.vacant_entry();
let index = vacant.key();
@ -364,6 +364,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
});
pin_utils::pin_mut!(flag_ready);
// TODO: use piper::select! here
match block_on(future::select(flag_ready, EXECUTOR.mutex.lock())) {
future::Either::Left(_) => break,
future::Either::Right(_) if io_flag.get_ref().get() => break,
@ -611,7 +612,7 @@ impl<T: std::os::unix::io::AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(inner: T) -> io::Result<Async<T>> {
Ok(Async {
source: POLLER.register(sys::RawSource::new(&inner))?,
source: POLLER.register(sys::Raw::new(&inner))?,
inner: Box::new(inner),
})
}
@ -622,7 +623,7 @@ impl<T: std::os::windows::io::AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(inner: T) -> io::Result<Async<T>> {
Ok(Async {
source: POLLER.register(sys::RawSource::new(&inner))?,
source: POLLER.register(sys::Raw::new(&inner))?,
inner: Box::new(inner),
})
}
@ -1024,10 +1025,10 @@ mod sys {
};
#[derive(Clone, Copy)]
pub struct RawSource(RawFd);
impl RawSource {
pub fn new(s: &impl AsRawFd) -> RawSource {
RawSource(s.as_raw_fd())
pub struct Raw(RawFd);
impl Raw {
pub fn new(s: &impl AsRawFd) -> Raw {
Raw(s.as_raw_fd())
}
}
@ -1038,15 +1039,15 @@ mod sys {
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
))
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
pub fn register(&self, raw: Raw, index: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(flags(), index as u64);
epoll_ctl(self.0, EpollOp::EpollCtlAdd, source.0, Some(ev)).map_err(io_err)
epoll_ctl(self.0, EpollOp::EpollCtlAdd, raw.0, Some(ev)).map_err(io_err)
}
pub fn reregister(&self, _source: RawSource, _index: usize) -> io::Result<()> {
pub fn reregister(&self, _raw: Raw, _index: usize) -> io::Result<()> {
Ok(())
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
epoll_ctl(self.0, EpollOp::EpollCtlDel, source.0, None).map_err(io_err)
pub fn deregister(&self, raw: Raw) -> io::Result<()> {
epoll_ctl(self.0, EpollOp::EpollCtlDel, raw.0, None).map_err(io_err)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
@ -1113,10 +1114,10 @@ mod sys {
};
#[derive(Clone, Copy)]
pub struct RawSource(RawFd);
impl RawSource {
pub fn new(s: &impl AsRawFd) -> RawSource {
RawSource(s.as_raw_fd())
pub struct Raw(RawFd);
impl Raw {
pub fn new(s: &impl AsRawFd) -> Raw {
Raw(s.as_raw_fd())
}
}
@ -1127,8 +1128,8 @@ mod sys {
fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)).map_err(io_err)?;
Ok(Poller(fd))
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
let ident = source.0 as _;
pub fn register(&self, raw: Raw, index: usize) -> io::Result<()> {
let ident = raw.0 as _;
let flags = EventFlag::EV_CLEAR | EventFlag::EV_RECEIPT | EventFlag::EV_ADD;
let fflags = FilterFlag::empty();
let udata = index as _;
@ -1153,16 +1154,16 @@ mod sys {
}
Ok(())
}
pub fn reregister(&self, _source: RawSource, _index: usize) -> io::Result<()> {
pub fn reregister(&self, _raw: Raw, _index: usize) -> io::Result<()> {
Ok(())
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
let ident = source.0 as _;
pub fn deregister(&self, raw: Raw) -> io::Result<()> {
let ident = raw.0 as _;
let flags = EventFlag::EV_RECEIPT | EventFlag::EV_DELETE;
let fflags = FilterFlag::empty();
let changelist = [
KEvent::new(ident, EventFilter::EVFILT_WRITE, flags, fflags, 0, 0),
KEvent::new(source.0 as _, EventFilter::EVFILT_READ, flags, fflags, 0, 0),
KEvent::new(raw.0 as _, EventFilter::EVFILT_READ, flags, fflags, 0, 0),
];
let mut eventlist = changelist.clone();
match kevent_ts(self.0, &changelist, &mut eventlist, None) {
@ -1236,13 +1237,13 @@ mod sys {
use wepoll_binding as wepoll;
#[derive(Clone, Copy)]
pub struct RawSource(RawSocket);
impl RawSource {
pub fn new(s: &impl AsRawSocket) -> RawSource {
RawSource(s.as_raw_socket())
pub struct Raw(RawSocket);
impl Raw {
pub fn new(s: &impl AsRawSocket) -> Raw {
Raw(s.as_raw_socket())
}
}
impl AsRawSocket for RawSource {
impl AsRawSocket for Raw {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
@ -1253,15 +1254,15 @@ mod sys {
pub fn create() -> io::Result<Poller> {
Ok(Poller(Epoll::new()?))
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.register(&source, flags(), index as u64)
pub fn register(&self, raw: Raw, index: usize) -> io::Result<()> {
self.0.register(&raw, flags(), index as u64)
}
pub fn reregister(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.reregister(&source, flags(), index as u64)
pub fn reregister(&self, raw: Raw, index: usize) -> io::Result<()> {
self.0.reregister(&raw, flags(), index as u64)
}
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
pub fn deregister(&self, raw: Raw) -> io::Result<()> {
// Ignore errors since an event can deregister the handle at any point (oneshot mode).
let _ = self.0.deregister(&source);
let _ = self.0.deregister(&raw);
Ok(())
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {