Less boxing (#26)

* Get rid of boxing inside Async<>

We want Async<T> to always impl Unpin. Turns out, boxing the inner value
is unnecessary for this; and as we never pin the inner value, we can just
implement Unpin for ourselves explicitly.

See https://doc.rust-lang.org/std/pin/index.html#pinning-is-not-structural-for-field
on why this is safe and sound :)

* Do not box streams

Unfortunately, this means we also don't automatically pin them.
The caller can manually pin them (using the pin! macro, or Box::pin(),
or any other method) if needed. That is a slight API break.
This commit is contained in:
Sergey Bugaev 2020-09-09 21:25:25 +03:00 committed by GitHub
parent 6af144e5d1
commit ff2ca89cd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 16 deletions

View File

@ -334,9 +334,11 @@ pub struct Async<T> {
source: Arc<Source>,
/// The inner I/O handle.
io: Option<Box<T>>,
io: Option<T>,
}
impl<T> Unpin for Async<T> {}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
/// Creates an async I/O handle.
@ -366,7 +368,7 @@ impl<T: AsRawFd> Async<T> {
pub fn new(io: T) -> io::Result<Async<T>> {
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_fd())?,
io: Some(Box::new(io)),
io: Some(io),
})
}
}
@ -407,7 +409,7 @@ impl<T: AsRawSocket> Async<T> {
pub fn new(io: T) -> io::Result<Async<T>> {
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_socket())?,
io: Some(Box::new(io)),
io: Some(io),
})
}
}
@ -473,7 +475,7 @@ impl<T> Async<T> {
/// # std::io::Result::Ok(()) });
/// ```
pub fn into_inner(mut self) -> io::Result<T> {
let io = *self.io.take().unwrap();
let io = self.io.take().unwrap();
Reactor::get().remove_io(&self.source)?;
Ok(io)
}
@ -851,12 +853,13 @@ impl Async<TcpListener> {
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::stream::StreamExt;
/// use futures_lite::{pin, stream::StreamExt};
/// use std::net::TcpListener;
///
/// # futures_lite::future::block_on(async {
/// let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?;
/// let mut incoming = listener.incoming();
/// let incoming = listener.incoming();
/// pin!(incoming);
///
/// while let Some(stream) = incoming.next().await {
/// let stream = stream?;
@ -864,11 +867,11 @@ impl Async<TcpListener> {
/// }
/// # std::io::Result::Ok(()) });
/// ```
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
pub fn incoming(&self) -> impl Stream<Item = io::Result<Async<TcpStream>>> + Send + '_ {
stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
}))
})
}
}
@ -1178,12 +1181,13 @@ impl Async<UnixListener> {
///
/// ```no_run
/// use async_io::Async;
/// use futures_lite::stream::StreamExt;
/// use futures_lite::{pin, stream::StreamExt};
/// use std::os::unix::net::UnixListener;
///
/// # futures_lite::future::block_on(async {
/// let listener = Async::<UnixListener>::bind("/tmp/socket")?;
/// let mut incoming = listener.incoming();
/// let incoming = listener.incoming();
/// pin!(incoming);
///
/// while let Some(stream) = incoming.next().await {
/// let stream = stream?;
@ -1193,11 +1197,11 @@ impl Async<UnixListener> {
/// ```
pub fn incoming(
&self,
) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
) -> impl Stream<Item = io::Result<Async<UnixStream>>> + Send + '_ {
stream::unfold(self, |listener| async move {
let res = listener.accept().await.map(|(stream, _)| stream);
Some((res, listener))
}))
})
}
}

View File

@ -70,7 +70,7 @@ fn tcp_peek_read() -> io::Result<()> {
stream.write_all(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let mut incoming = listener.incoming();
let mut incoming = Box::pin(listener.incoming());
let mut stream = incoming.next().await.unwrap()?;
let n = stream.peek(&mut buf).await?;
@ -168,7 +168,7 @@ fn udp_connect() -> io::Result<()> {
stream.write_all(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let mut incoming = listener.incoming();
let mut incoming = Box::pin(listener.incoming());
let mut stream = incoming.next().await.unwrap()?;
let n = stream.read(&mut buf).await?;