mirror of https://github.com/smol-rs/async-net
Optimze allocations in Listeners
This shrinks the allocations for listener streams from 1 per connection, to a single allocation.
This commit is contained in:
parent
837f3334ee
commit
6af24d7d20
26
src/tcp.rs
26
src/tcp.rs
|
@ -169,8 +169,7 @@ impl TcpListener {
|
|||
/// ```
|
||||
pub fn incoming(&self) -> Incoming<'_> {
|
||||
Incoming {
|
||||
listener: self,
|
||||
accept: None,
|
||||
incoming: Box::pin(self.inner.incoming()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,35 +252,22 @@ impl AsRawSocket for TcpListener {
|
|||
/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
|
||||
/// created by the [`TcpListener::incoming()`] method.
|
||||
pub struct Incoming<'a> {
|
||||
listener: &'a TcpListener,
|
||||
accept: Option<
|
||||
Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>,
|
||||
>,
|
||||
incoming:
|
||||
Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>,
|
||||
}
|
||||
|
||||
impl Stream for Incoming<'_> {
|
||||
type Item = io::Result<TcpStream>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if self.accept.is_none() {
|
||||
self.accept = Some(Box::pin(self.listener.accept()));
|
||||
}
|
||||
|
||||
if let Some(f) = &mut self.accept {
|
||||
let res = ready!(f.as_mut().poll(cx));
|
||||
self.accept = None;
|
||||
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
|
||||
}
|
||||
}
|
||||
let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
|
||||
Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream)))))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Incoming<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Incoming")
|
||||
.field("listener", self.listener)
|
||||
.finish()
|
||||
write!(f, "Incoming {{ ... }}")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
27
src/unix.rs
27
src/unix.rs
|
@ -124,8 +124,7 @@ impl UnixListener {
|
|||
/// ```
|
||||
pub fn incoming(&self) -> Incoming<'_> {
|
||||
Incoming {
|
||||
listener: self,
|
||||
accept: None,
|
||||
incoming: Box::pin(self.inner.incoming()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,9 +184,10 @@ impl AsRawSocket for UnixListener {
|
|||
/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is
|
||||
/// created by the [`UnixListener::incoming()`] method.
|
||||
pub struct Incoming<'a> {
|
||||
listener: &'a UnixListener,
|
||||
accept: Option<
|
||||
Pin<Box<dyn Future<Output = io::Result<(UnixStream, SocketAddr)>> + Send + Sync + 'a>>,
|
||||
incoming: Pin<
|
||||
Box<
|
||||
dyn Stream<Item = io::Result<Async<std::os::unix::net::UnixStream>>> + Send + Sync + 'a,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
|
||||
|
@ -195,25 +195,14 @@ impl Stream for Incoming<'_> {
|
|||
type Item = io::Result<UnixStream>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
if self.accept.is_none() {
|
||||
self.accept = Some(Box::pin(self.listener.accept()));
|
||||
}
|
||||
|
||||
if let Some(f) = &mut self.accept {
|
||||
let res = ready!(f.as_mut().poll(cx));
|
||||
self.accept = None;
|
||||
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
|
||||
}
|
||||
}
|
||||
let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
|
||||
Poll::Ready(res.map(|res| res.map(|stream| UnixStream::new(Arc::new(stream)))))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Incoming<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Incoming")
|
||||
.field("listener", self.listener)
|
||||
.finish()
|
||||
write!(f, "Incoming {{ ... }}")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue