Merge pull request #11 from erickt/master

Optimze allocations in Listeners
This commit is contained in:
Taiki Endo 2021-04-06 10:54:55 +09:00 committed by GitHub
commit 4f8b7c92b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 39 deletions

View File

@ -169,8 +169,7 @@ impl TcpListener {
/// ```
pub fn incoming(&self) -> Incoming<'_> {
Incoming {
listener: self,
accept: None,
incoming: Box::pin(self.inner.incoming()),
}
}
@ -253,35 +252,22 @@ impl AsRawSocket for TcpListener {
/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
/// created by the [`TcpListener::incoming()`] method.
pub struct Incoming<'a> {
listener: &'a TcpListener,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>,
>,
incoming:
Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>,
}
impl Stream for Incoming<'_> {
type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.accept.is_none() {
self.accept = Some(Box::pin(self.listener.accept()));
}
if let Some(f) = &mut self.accept {
let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream)))))
}
}
impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("listener", self.listener)
.finish()
write!(f, "Incoming {{ ... }}")
}
}

View File

@ -124,8 +124,7 @@ impl UnixListener {
/// ```
pub fn incoming(&self) -> Incoming<'_> {
Incoming {
listener: self,
accept: None,
incoming: Box::pin(self.inner.incoming()),
}
}
@ -185,9 +184,10 @@ impl AsRawSocket for UnixListener {
/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
/// created by the [`UnixListener::incoming()`] method.
pub struct Incoming<'a> {
listener: &'a UnixListener,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(UnixStream, SocketAddr)>> + Send + Sync + 'a>>,
incoming: Pin<
Box<
dyn Stream<Item = io::Result<Async<std::os::unix::net::UnixStream>>> + Send + Sync + 'a,
>,
>,
}
@ -195,25 +195,14 @@ impl Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.accept.is_none() {
self.accept = Some(Box::pin(self.listener.accept()));
}
if let Some(f) = &mut self.accept {
let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
Poll::Ready(res.map(|res| res.map(|stream| UnixStream::new(Arc::new(stream)))))
}
}
impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("listener", self.listener)
.finish()
write!(f, "Incoming {{ ... }}")
}
}