Add readable/writable

This commit is contained in:
Stjepan Glavina 2020-06-20 17:33:45 +02:00
parent 69c04a9e42
commit 539228dace
1 changed files with 75 additions and 47 deletions

View File

@ -315,12 +315,65 @@ impl<T> Async<T> {
}
}
/// Waits until the I/O handle is readable.
///
/// This function completes when a readability event for this I/O handle is emitted by the
/// operating system.
///
/// Keep in mind that there is a small delay between the moment the event is emitted and the
/// moment it is delivered to functions waiting for it. If a previous read operation has
/// already requested this event, it might get delivered to some function calls even after it
/// has been emitted.
///
/// # Examples
///
/// ```no_run
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let mut listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
///
/// // Wait until a client can be accepted.
/// listener.readable().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.source.readable().await
}
/// Waits until the I/O handle is writable.
///
/// This function completes when a writability event for this I/O handle is emitted by the
/// operating system.
///
/// Keep in mind that there is a small delay between the moment the event is emitted and the
/// moment it is delivered to functions waiting for it. If a previous write operation has
/// already requested this event, it might get delivered to some function calls even after it
/// has been emitted.
///
/// # Examples
///
/// ```no_run
/// use smol::Async;
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Async::<TcpStream>::connect("example.com:80").await?;
///
/// // Wait until the stream is writable.
/// stream.writable().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.source.writable().await
}
/// Performs a read operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is readable.
/// invokes the `op` closure followed by [`Async::readable()`] in a loop until the closure
/// succeeds or returns an error other than [`io::ErrorKind::WouldBlock`].
///
/// The closure receives a shared reference to the I/O handle.
///
@ -344,16 +397,15 @@ impl<T> Async<T> {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
self.source.readable().await?;
self.readable().await?;
}
}
/// Performs a read operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is readable.
/// invokes the `op` closure followed by [`Async::readable()`] in a loop until the closure
/// succeeds or returns an error other than [`io::ErrorKind::WouldBlock`].
///
/// The closure receives a mutable reference to the I/O handle.
///
@ -380,16 +432,15 @@ impl<T> Async<T> {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
self.source.readable().await?;
self.readable().await?;
}
}
/// Performs a write operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is writable.
/// invokes the `op` closure followed by [`Async::writable()`] in a loop until the closure
/// succeeds or returns an error other than [`io::ErrorKind::WouldBlock`].
///
/// The closure receives a shared reference to the I/O handle.
///
@ -414,16 +465,15 @@ impl<T> Async<T> {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
self.source.writable().await?;
self.writable().await?;
}
}
/// Performs a write operation asynchronously.
///
/// The I/O handle is registered in the reactor and put in non-blocking mode. This function
/// invokes the `op` closure in a loop until it succeeds or returns an error other than
/// [`io::ErrorKind::WouldBlock`]. In between iterations of the loop, it waits until the OS
/// sends a notification that the I/O handle is writable.
/// invokes the `op` closure followed by [`Async::writable()`] in a loop until the closure
/// succeeds or returns an error other than [`io::ErrorKind::WouldBlock`].
///
/// The closure receives a mutable reference to the I/O handle.
///
@ -451,7 +501,7 @@ impl<T> Async<T> {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
self.source.writable().await?;
self.writable().await?;
}
}
}
@ -691,22 +741,13 @@ impl Async<TcpStream> {
})?;
let stream = Async::new(socket.into_tcp_stream())?;
// Waits for connect to complete.
let wait_connect = |mut stream: &TcpStream| match stream.write(&[]) {
Err(err) if err.kind() == io::ErrorKind::NotConnected => match stream.take_error()? {
Some(err) => Err(err),
None => Err(io::ErrorKind::WouldBlock.into()),
},
res => res.map(|_| ()),
};
// The stream becomes writable when connected.
match stream.write_with(|io| wait_connect(io)).await {
Ok(()) => Ok(stream),
Err(err) => match stream.get_ref().take_error()? {
Some(err) => Err(err),
None => Err(err),
},
stream.writable().await?;
// Check if there was an error while connecting.
match stream.get_ref().take_error()? {
None => Ok(stream),
Some(err) => Err(err),
}
}
@ -1014,23 +1055,10 @@ impl Async<UnixStream> {
})?;
let stream = Async::new(socket.into_unix_stream())?;
// Waits for connect to complete.
let wait_connect = |mut stream: &UnixStream| match stream.write(&[]) {
Err(err) if err.kind() == io::ErrorKind::NotConnected => match stream.take_error()? {
Some(err) => Err(err),
None => Err(io::ErrorKind::WouldBlock.into()),
},
res => res.map(|_| ()),
};
// The stream becomes writable when connected.
match stream.write_with(|io| wait_connect(io)).await {
Ok(()) => Ok(stream),
Err(err) => match stream.get_ref().take_error()? {
Some(err) => Err(err),
None => Err(err),
},
}
stream.writable().await?;
Ok(stream)
}
/// Creates an unnamed pair of connected UDS stream sockets.