futures-lite/src/io.rs

3101 lines
86 KiB
Rust

//! Tools and combinators for I/O.
//!
//! # Examples
//!
//! ```
//! use futures_lite::io::{self, AsyncReadExt};
//!
//! # spin_on::spin_on(async {
//! let input: &[u8] = b"hello";
//! let mut reader = io::BufReader::new(input);
//!
//! let mut contents = String::new();
//! reader.read_to_string(&mut contents).await?;
//! # std::io::Result::Ok(()) });
//! ```
#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, Result, SeekFrom};
#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
use std::borrow::{Borrow, BorrowMut};
use std::cmp;
use std::fmt;
use std::future::Future;
use std::io::{IoSlice, IoSliceMut};
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use crate::future;
use crate::ready;
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
/// Copies the entire contents of a reader into a writer.
///
/// This function will read data from `reader` and write it into `writer` in a streaming fashion
/// until `reader` returns EOF.
///
/// On success, returns the total number of bytes copied.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, BufReader, BufWriter};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
///
/// let mut output = Vec::new();
/// let writer = BufWriter::new(&mut output);
///
/// io::copy(reader, writer).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
where
R: AsyncRead,
W: AsyncWrite,
{
pin_project! {
struct CopyFuture<R, W> {
#[pin]
reader: R,
#[pin]
writer: W,
amt: u64,
}
}
impl<R, W> Future for CopyFuture<R, W>
where
R: AsyncBufRead,
W: AsyncWrite,
{
type Output = Result<u64>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
ready!(this.writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(*this.amt));
}
let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(ErrorKind::WriteZero.into()));
}
*this.amt += i as u64;
this.reader.as_mut().consume(i);
}
}
}
let future = CopyFuture {
reader: BufReader::new(reader),
writer,
amt: 0,
};
future.await
}
/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
///
/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
/// This is usually the case for in-memory buffered I/O.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AssertAsync, AsyncReadExt};
///
/// let reader: &[u8] = b"hello";
///
/// # spin_on::spin_on(async {
/// let mut async_reader = AssertAsync::new(reader);
/// let mut contents = String::new();
///
/// // This line works in async manner - note that there is await:
/// async_reader.read_to_string(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct AssertAsync<T>(T);
impl<T> Unpin for AssertAsync<T> {}
impl<T> AssertAsync<T> {
/// Wraps an I/O handle implementing [`std::io`] traits.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AssertAsync;
///
/// let reader: &[u8] = b"hello";
///
/// let async_reader = AssertAsync::new(reader);
/// ```
#[inline(always)]
pub fn new(io: T) -> Self {
AssertAsync(io)
}
/// Gets a reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AssertAsync;
///
/// let reader: &[u8] = b"hello";
///
/// let async_reader = AssertAsync::new(reader);
/// let r = async_reader.get_ref();
/// ```
#[inline(always)]
pub fn get_ref(&self) -> &T {
&self.0
}
/// Gets a mutable reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AssertAsync;
///
/// let reader: &[u8] = b"hello";
///
/// let mut async_reader = AssertAsync::new(reader);
/// let r = async_reader.get_mut();
/// ```
#[inline(always)]
pub fn get_mut(&mut self) -> &mut T {
&mut self.0
}
/// Extracts the inner I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AssertAsync;
///
/// let reader: &[u8] = b"hello";
///
/// let async_reader = AssertAsync::new(reader);
/// let inner = async_reader.into_inner();
/// ```
#[inline(always)]
pub fn into_inner(self) -> T {
self.0
}
}
fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
where
F: FnMut() -> std::io::Result<T>,
{
loop {
match f() {
Err(err) if err.kind() == ErrorKind::Interrupted => {}
res => return Poll::Ready(res),
}
}
}
impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
assert_async_wrapio(move || self.0.read(buf))
}
#[inline]
fn poll_read_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
assert_async_wrapio(move || self.0.read_vectored(bufs))
}
}
impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
assert_async_wrapio(move || self.0.write(buf))
}
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
assert_async_wrapio(move || self.0.write_vectored(bufs))
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
assert_async_wrapio(move || self.0.flush())
}
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_flush(cx)
}
}
impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
#[inline]
fn poll_seek(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
assert_async_wrapio(move || self.0.seek(pos))
}
}
/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
/// polls to `WouldBlock` errors.
///
/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
/// that take `Read` as a parameter.
///
/// # Examples
///
/// ```
/// use std::io::Read;
/// use std::task::{Poll, Context};
///
/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
/// // Assume we have a library that's built around `Read` and `Write` traits.
/// use cooltls::Session;
///
/// // We want to use it with our writer that implements `AsyncWrite`.
/// let writer = Stream::new();
///
/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
/// use futures_lite::io::AsyncAsSync;
/// let writer = AsyncAsSync::new(cx, writer);
///
/// // Now, we can use it with `cooltls`.
/// let mut session = Session::new(writer);
///
/// // Match on the result of `read()` and translate it to poll.
/// match session.read(&mut [0; 1024]) {
/// Ok(n) => Poll::Ready(n),
/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
/// Err(err) => panic!("unexpected error: {}", err),
/// }
/// }
///
/// // Usually, poll-based functions are best wrapped using `poll_fn`.
/// use futures_lite::future::poll_fn;
/// # futures_lite::future::block_on(async {
/// poll_fn(|cx| poll_for_io(cx)).await;
/// # });
/// # struct Stream;
/// # impl Stream {
/// # fn new() -> Stream {
/// # Stream
/// # }
/// # }
/// # impl futures_lite::io::AsyncRead for Stream {
/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
/// # Poll::Ready(Ok(0))
/// # }
/// # }
/// # mod cooltls {
/// # pub struct Session<W> {
/// # reader: W,
/// # }
/// # impl<W> Session<W> {
/// # pub fn new(reader: W) -> Session<W> {
/// # Session { reader }
/// # }
/// # }
/// # impl<W: std::io::Read> std::io::Read for Session<W> {
/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
/// # self.reader.read(buf)
/// # }
/// # }
/// # }
/// ```
#[derive(Debug)]
pub struct AsyncAsSync<'r, 'ctx, T> {
/// The context we are using to poll the future.
pub context: &'r mut Context<'ctx>,
/// The actual reader/writer we are wrapping.
pub inner: T,
}
impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
/// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let async_reader = AsyncAsSync::new(&mut context, reader);
/// ```
#[inline]
pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
AsyncAsSync { context, inner }
}
/// Attempt to shutdown the I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncAsSync;
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: Vec<u8> = b"hello".to_vec();
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// async_reader.close().unwrap();
/// ```
#[inline]
pub fn close(&mut self) -> Result<()>
where
T: AsyncWrite + Unpin,
{
self.poll_with(|io, cx| io.poll_close(cx))
}
/// Poll this `AsyncAsSync` for some function.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncAsSync, AsyncRead};
/// use std::task::Context;
/// use waker_fn::waker_fn;
///
/// let reader: &[u8] = b"hello";
/// let waker = waker_fn(|| {});
/// let mut context = Context::from_waker(&waker);
///
/// let mut async_reader = AsyncAsSync::new(&mut context, reader);
/// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
/// assert_eq!(r.unwrap(), 5);
/// ```
#[inline]
pub fn poll_with<R>(
&mut self,
f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
) -> Result<R>
where
T: Unpin,
{
match f(Pin::new(&mut self.inner), self.context) {
Poll::Ready(res) => res,
Poll::Pending => Err(ErrorKind::WouldBlock.into()),
}
}
}
impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read(cx, buf))
}
#[inline]
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
}
}
impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write(cx, buf))
}
#[inline]
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
}
#[inline]
fn flush(&mut self) -> Result<()> {
self.poll_with(|io, cx| io.poll_flush(cx))
}
}
impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
#[inline]
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.poll_with(|io, cx| io.poll_seek(cx, pos))
}
}
impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_ref(&self) -> &T {
&self.inner
}
}
impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn as_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow(&self) -> &T {
&self.inner
}
}
impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
#[inline]
fn borrow_mut(&mut self) -> &mut T {
&mut self.inner
}
}
/// Blocks on all async I/O operations and implements [`std::io`] traits.
///
/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
/// manually all the time becomes too tedious, use this type for more convenient blocking on async
/// I/O operations.
///
/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
/// [`AsyncSeek`], respectively.
///
/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
/// dropping the [`BlockOn`] handle or some buffered data might get lost.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
/// use std::io::Read;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let mut blocking_reader = BlockOn::new(reader);
/// let mut contents = String::new();
///
/// // This line blocks - note that there is no await:
/// blocking_reader.read_to_string(&mut contents)?;
/// # std::io::Result::Ok(())
/// ```
#[derive(Debug)]
pub struct BlockOn<T>(T);
impl<T> BlockOn<T> {
/// Wraps an async I/O handle into a blocking interface.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let blocking_reader = BlockOn::new(reader);
/// ```
pub fn new(io: T) -> BlockOn<T> {
BlockOn(io)
}
/// Gets a reference to the async I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let blocking_reader = BlockOn::new(reader);
/// let r = blocking_reader.get_ref();
/// ```
pub fn get_ref(&self) -> &T {
&self.0
}
/// Gets a mutable reference to the async I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let mut blocking_reader = BlockOn::new(reader);
/// let r = blocking_reader.get_mut();
/// ```
pub fn get_mut(&mut self) -> &mut T {
&mut self.0
}
/// Extracts the inner async I/O handle.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BlockOn;
/// use futures_lite::pin;
///
/// let reader: &[u8] = b"hello";
/// pin!(reader);
///
/// let blocking_reader = BlockOn::new(reader);
/// let inner = blocking_reader.into_inner();
/// ```
pub fn into_inner(self) -> T {
self.0
}
}
impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
future::block_on(self.0.read(buf))
}
}
impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
fn fill_buf(&mut self) -> Result<&[u8]> {
future::block_on(self.0.fill_buf())
}
fn consume(&mut self, amt: usize) {
Pin::new(&mut self.0).consume(amt)
}
}
impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
future::block_on(self.0.write(buf))
}
fn flush(&mut self) -> Result<()> {
future::block_on(self.0.flush())
}
}
impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
future::block_on(self.0.seek(pos))
}
}
pin_project! {
/// Adds buffering to a reader.
///
/// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
/// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
/// maintains an in-memory buffer of the incoming byte stream.
///
/// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
/// the same file or networking socket. It does not help when reading very large amounts at
/// once, or reading just once or a few times. It also provides no advantage when reading from
/// a source that is already in memory, like a `Vec<u8>`.
///
/// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
/// multiple instances of [`BufReader`] on the same reader can cause data loss.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::new(input);
///
/// let mut line = String::new();
/// reader.read_line(&mut line).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub struct BufReader<R> {
#[pin]
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
}
impl<R: AsyncRead> BufReader<R> {
/// Creates a buffered reader with the default buffer capacity.
///
/// The default capacity is currently 8 KB, but that may change in the future.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
/// ```
pub fn new(inner: R) -> BufReader<R> {
BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
}
/// Creates a buffered reader with the specified capacity.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let reader = BufReader::with_capacity(1024, input);
/// ```
pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
BufReader {
inner,
buf: vec![0; capacity].into_boxed_slice(),
pos: 0,
cap: 0,
}
}
}
impl<R> BufReader<R> {
/// Gets a reference to the underlying reader.
///
/// It is not advisable to directly read from the underlying reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
///
/// let r = reader.get_ref();
/// ```
pub fn get_ref(&self) -> &R {
&self.inner
}
/// Gets a mutable reference to the underlying reader.
///
/// It is not advisable to directly read from the underlying reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::new(input);
///
/// let r = reader.get_mut();
/// ```
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
/// Gets a pinned mutable reference to the underlying reader.
///
/// It is not advisable to directly read from the underlying reader.
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().inner
}
/// Returns a reference to the internal buffer.
///
/// This method will not attempt to fill the buffer if it is empty.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
///
/// // The internal buffer is empty until the first read request.
/// assert_eq!(reader.buffer(), &[]);
/// ```
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
}
/// Unwraps the buffered reader, returning the underlying reader.
///
/// Note that any leftover data in the internal buffer will be lost.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufReader;
///
/// let input: &[u8] = b"hello";
/// let reader = BufReader::new(input);
///
/// assert_eq!(reader.into_inner(), input);
/// ```
pub fn into_inner(self) -> R {
self.inner
}
/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(self: Pin<&mut Self>) {
let this = self.project();
*this.pos = 0;
*this.cap = 0;
}
}
impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() {
let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = std::io::Read::read(&mut rem, buf)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
let mut this = self.project();
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
if *this.pos >= *this.cap {
debug_assert!(*this.pos == *this.cap);
*this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
*this.pos = 0;
}
Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
*this.pos = cmp::min(*this.pos + amt, *this.cap);
}
}
impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
.field("reader", &self.inner)
.field(
"buffer",
&format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
)
.finish()
}
}
impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
/// Seeks to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
/// reader would be at if the [`BufReader`] had no internal buffer.
///
/// Seeking always discards the internal buffer, even if the seek position would otherwise fall
/// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
/// immediately after a seek yields the underlying reader at the same position.
///
/// See [`AsyncSeek`] for more details.
///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
/// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
/// the second seek returns `Err`, the underlying reader will be left at the same position it
/// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = (self.cap - self.pos) as i64;
// it should be safe to assume that remainder fits within an i64 as the alternative
// means we managed to allocate 8 exbibytes and that's absurd.
// But it's not out of the realm of possibility for some weird underlying reader to
// support seeking by i64::min_value() so we need to handle underflow when subtracting
// remainder.
if let Some(offset) = n.checked_sub(remainder) {
result = ready!(self
.as_mut()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(offset)))?;
} else {
// seek backwards by our remainder, and then by the offset
ready!(self
.as_mut()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(-remainder)))?;
self.as_mut().discard_buffer();
result = ready!(self
.as_mut()
.get_pin_mut()
.poll_seek(cx, SeekFrom::Current(n)))?;
}
} else {
// Seeking with Start/End doesn't care about our buffer length.
result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
}
self.discard_buffer();
Poll::Ready(Ok(result))
}
}
impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
self.as_mut().get_pin_mut().poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.as_mut().get_pin_mut().poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.as_mut().get_pin_mut().poll_close(cx)
}
}
pin_project! {
/// Adds buffering to a writer.
///
/// It can be excessively inefficient to work directly with something that implements
/// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
/// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
/// writes it to the underlying writer in large, infrequent batches.
///
/// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
/// the same file or networking socket. It does not help when writing very large amounts at
/// once, or writing just once or a few times. It also provides no advantage when writing to a
/// destination that is in memory, like a `Vec<u8>`.
///
/// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
/// it is dropped. Therefore, it is important that users explicitly flush the buffer before
/// dropping the [`BufWriter`].
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// writer.write_all(b"hello").await?;
/// writer.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub struct BufWriter<W> {
#[pin]
inner: W,
buf: Vec<u8>,
written: usize,
}
}
impl<W: AsyncWrite> BufWriter<W> {
/// Creates a buffered writer with the default buffer capacity.
///
/// The default capacity is currently 8 KB, but that may change in the future.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufWriter;
///
/// let mut output = Vec::new();
/// let writer = BufWriter::new(&mut output);
/// ```
pub fn new(inner: W) -> BufWriter<W> {
BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
}
/// Creates a buffered writer with the specified buffer capacity.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufWriter;
///
/// let mut output = Vec::new();
/// let writer = BufWriter::with_capacity(100, &mut output);
/// ```
pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
BufWriter {
inner,
buf: Vec::with_capacity(capacity),
written: 0,
}
}
/// Gets a reference to the underlying writer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufWriter;
///
/// let mut output = Vec::new();
/// let writer = BufWriter::new(&mut output);
///
/// let r = writer.get_ref();
/// ```
pub fn get_ref(&self) -> &W {
&self.inner
}
/// Gets a mutable reference to the underlying writer.
///
/// It is not advisable to directly write to the underlying writer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufWriter;
///
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// let r = writer.get_mut();
/// ```
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner
}
/// Gets a pinned mutable reference to the underlying writer.
///
/// It is not not advisable to directly write to the underlying writer.
fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
self.project().inner
}
/// Unwraps the buffered writer, returning the underlying writer.
///
/// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
/// that data, flush the buffered writer before unwrapping it.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = vec![1, 2, 3];
/// let mut writer = BufWriter::new(&mut output);
///
/// writer.write_all(&[4]).await?;
/// writer.flush().await?;
/// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
/// # std::io::Result::Ok(()) });
/// ```
pub fn into_inner(self) -> W {
self.inner
}
/// Returns a reference to the internal buffer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::BufWriter;
///
/// let mut output = Vec::new();
/// let writer = BufWriter::new(&mut output);
///
/// // The internal buffer is empty until the first write request.
/// assert_eq!(writer.buffer(), &[]);
/// ```
pub fn buffer(&self) -> &[u8] {
&self.buf
}
/// Flush the buffer.
fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut this = self.project();
let len = this.buf.len();
let mut ret = Ok(());
while *this.written < len {
match this
.inner
.as_mut()
.poll_write(cx, &this.buf[*this.written..])
{
Poll::Ready(Ok(0)) => {
ret = Err(Error::new(
ErrorKind::WriteZero,
"Failed to write buffered data",
));
break;
}
Poll::Ready(Ok(n)) => *this.written += n,
Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
Poll::Ready(Err(e)) => {
ret = Err(e);
break;
}
Poll::Pending => return Poll::Pending,
}
}
if *this.written > 0 {
this.buf.drain(..*this.written);
}
*this.written = 0;
Poll::Ready(ret)
}
}
impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufWriter")
.field("writer", &self.inner)
.field("buf", &self.buf)
.finish()
}
}
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
if self.buf.len() + buf.len() > self.buf.capacity() {
ready!(self.as_mut().poll_flush_buf(cx))?;
}
if buf.len() >= self.buf.capacity() {
self.get_pin_mut().poll_write(cx, buf)
} else {
Pin::new(&mut *self.project().buf).poll_write(cx, buf)
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.get_pin_mut().poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.get_pin_mut().poll_close(cx)
}
}
impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
/// Seek to the offset, in bytes, in the underlying writer.
///
/// Seeking always writes out the internal buffer before seeking.
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.get_pin_mut().poll_seek(cx, pos)
}
}
/// Gives an in-memory buffer a cursor for reading and writing.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
///
/// # spin_on::spin_on(async {
/// let mut bytes = b"hello".to_vec();
/// let mut cursor = Cursor::new(&mut bytes);
///
/// // Overwrite 'h' with 'H'.
/// cursor.write_all(b"H").await?;
///
/// // Move the cursor one byte forward.
/// cursor.seek(SeekFrom::Current(1)).await?;
///
/// // Read a byte.
/// let mut byte = [0];
/// cursor.read_exact(&mut byte).await?;
/// assert_eq!(&byte, b"l");
///
/// // Check the final buffer.
/// assert_eq!(bytes, b"Hello");
/// # std::io::Result::Ok(()) });
/// ```
#[derive(Clone, Debug, Default)]
pub struct Cursor<T> {
inner: std::io::Cursor<T>,
}
impl<T> Cursor<T> {
/// Creates a cursor for an in-memory buffer.
///
/// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
/// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
/// the buffer using [`set_position()`][Cursor::set_position()`] or
/// [`seek()`][`AsyncSeekExt::seek()`].
///
/// # Examples
///
/// ```
/// use futures_lite::io::Cursor;
///
/// let cursor = Cursor::new(Vec::<u8>::new());
/// ```
pub fn new(inner: T) -> Cursor<T> {
Cursor {
inner: std::io::Cursor::new(inner),
}
}
/// Gets a reference to the underlying buffer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::Cursor;
///
/// let cursor = Cursor::new(Vec::<u8>::new());
/// let r = cursor.get_ref();
/// ```
pub fn get_ref(&self) -> &T {
self.inner.get_ref()
}
/// Gets a mutable reference to the underlying buffer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::Cursor;
///
/// let mut cursor = Cursor::new(Vec::<u8>::new());
/// let r = cursor.get_mut();
/// ```
pub fn get_mut(&mut self) -> &mut T {
self.inner.get_mut()
}
/// Unwraps the cursor, returning the underlying buffer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::Cursor;
///
/// let cursor = Cursor::new(vec![1, 2, 3]);
/// assert_eq!(cursor.into_inner(), [1, 2, 3]);
/// ```
pub fn into_inner(self) -> T {
self.inner.into_inner()
}
/// Returns the current position of this cursor.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
///
/// # spin_on::spin_on(async {
/// let mut cursor = Cursor::new(b"hello");
/// assert_eq!(cursor.position(), 0);
///
/// cursor.seek(SeekFrom::Start(2)).await?;
/// assert_eq!(cursor.position(), 2);
/// # std::io::Result::Ok(()) });
/// ```
pub fn position(&self) -> u64 {
self.inner.position()
}
/// Sets the position of this cursor.
///
/// # Examples
///
/// ```
/// use futures_lite::io::Cursor;
///
/// let mut cursor = Cursor::new(b"hello");
/// assert_eq!(cursor.position(), 0);
///
/// cursor.set_position(2);
/// assert_eq!(cursor.position(), 2);
/// ```
pub fn set_position(&mut self, pos: u64) {
self.inner.set_position(pos)
}
}
impl<T> AsyncSeek for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
fn poll_seek(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
}
}
impl<T> AsyncRead for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Read::read(&mut self.inner, buf))
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
}
}
impl<T> AsyncBufRead for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
std::io::BufRead::consume(&mut self.inner, amt)
}
}
impl AsyncWrite for Cursor<&mut [u8]> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_flush(cx)
}
}
impl AsyncWrite for Cursor<&mut Vec<u8>> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
}
impl AsyncWrite for Cursor<Vec<u8>> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
}
/// Creates an empty reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncReadExt};
///
/// # spin_on::spin_on(async {
/// let mut reader = io::empty();
///
/// let mut contents = Vec::new();
/// reader.read_to_end(&mut contents).await?;
/// assert!(contents.is_empty());
/// # std::io::Result::Ok(()) });
/// ```
pub fn empty() -> Empty {
Empty { _private: () }
}
/// Reader for the [`empty()`] function.
pub struct Empty {
_private: (),
}
impl fmt::Debug for Empty {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Empty { .. }")
}
}
impl AsyncRead for Empty {
#[inline]
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(0))
}
}
impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
Poll::Ready(Ok(&[]))
}
#[inline]
fn consume(self: Pin<&mut Self>, _: usize) {}
}
/// Creates an infinite reader that reads the same byte repeatedly.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncReadExt};
///
/// # spin_on::spin_on(async {
/// let mut reader = io::repeat(b'a');
///
/// let mut contents = vec![0; 5];
/// reader.read_exact(&mut contents).await?;
/// assert_eq!(contents, b"aaaaa");
/// # std::io::Result::Ok(()) });
/// ```
pub fn repeat(byte: u8) -> Repeat {
Repeat { byte }
}
/// Reader for the [`repeat()`] function.
#[derive(Debug)]
pub struct Repeat {
byte: u8,
}
impl AsyncRead for Repeat {
#[inline]
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
for b in &mut *buf {
*b = self.byte;
}
Poll::Ready(Ok(buf.len()))
}
}
/// Creates a writer that consumes and drops all data.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, AsyncWriteExt};
///
/// # spin_on::spin_on(async {
/// let mut writer = io::sink();
/// writer.write_all(b"hello").await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn sink() -> Sink {
Sink { _private: () }
}
/// Writer for the [`sink()`] function.
#[derive(Debug)]
pub struct Sink {
_private: (),
}
impl AsyncWrite for Sink {
#[inline]
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(buf.len()))
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}
/// Extension trait for [`AsyncBufRead`].
pub trait AsyncBufReadExt: AsyncBufRead {
/// Returns the contents of the internal buffer, filling it with more data if empty.
///
/// If the stream has reached EOF, an empty buffer will be returned.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
/// use std::pin::Pin;
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello world";
/// let mut reader = BufReader::with_capacity(5, input);
///
/// assert_eq!(reader.fill_buf().await?, b"hello");
/// reader.consume(2);
/// assert_eq!(reader.fill_buf().await?, b"llo");
/// reader.consume(3);
/// assert_eq!(reader.fill_buf().await?, b" worl");
/// # std::io::Result::Ok(()) });
/// ```
fn fill_buf(&mut self) -> FillBuf<'_, Self>
where
Self: Unpin,
{
FillBuf { reader: Some(self) }
}
/// Consumes `amt` buffered bytes.
///
/// This method does not perform any I/O, it simply consumes some amount of bytes from the
/// internal buffer.
///
/// The `amt` must be <= the number of bytes in the buffer returned by
/// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
/// use std::pin::Pin;
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::with_capacity(4, input);
///
/// assert_eq!(reader.fill_buf().await?, b"hell");
/// reader.consume(2);
/// assert_eq!(reader.fill_buf().await?, b"ll");
/// # std::io::Result::Ok(()) });
/// ```
fn consume(&mut self, amt: usize)
where
Self: Unpin,
{
AsyncBufRead::consume(Pin::new(self), amt);
}
/// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
///
/// This method will read bytes from the underlying stream until the delimiter or EOF is
/// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
///
/// If successful, returns the total number of bytes read.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::new(input);
///
/// let mut buf = Vec::new();
/// let n = reader.read_until(b'\n', &mut buf).await?;
/// # std::io::Result::Ok(()) });
/// ```
fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
where
Self: Unpin,
{
ReadUntilFuture {
reader: self,
byte,
buf,
read: 0,
}
}
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
///
/// This method will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
/// will be appended to `buf`.
///
/// If successful, returns the total number of bytes read.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::new(input);
///
/// let mut line = String::new();
/// let n = reader.read_line(&mut line).await?;
/// # std::io::Result::Ok(()) });
/// ```
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
where
Self: Unpin,
{
ReadLineFuture {
reader: self,
buf,
bytes: Vec::new(),
read: 0,
}
}
/// Returns a stream over the lines of this byte stream.
///
/// The stream returned from this method yields items of type
/// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
/// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
/// at the end.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, BufReader};
/// use futures_lite::stream::StreamExt;
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello\nworld\n";
/// let mut reader = BufReader::new(input);
/// let mut lines = reader.lines();
///
/// while let Some(line) = lines.next().await {
/// println!("{}", line?);
/// }
/// # std::io::Result::Ok(()) });
/// ```
fn lines(self) -> Lines<Self>
where
Self: Unpin + Sized,
{
Lines {
reader: self,
buf: String::new(),
bytes: Vec::new(),
read: 0,
}
}
/// Returns a stream over the contents of this reader split on the specified `byte`.
///
/// The stream returned from this method yields items of type
/// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
/// Each vector returned will *not* have the delimiter byte at the end.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncBufReadExt, Cursor};
/// use futures_lite::stream::StreamExt;
///
/// # spin_on::spin_on(async {
/// let cursor = Cursor::new(b"lorem-ipsum-dolor");
/// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
///
/// assert_eq!(items[0], b"lorem");
/// assert_eq!(items[1], b"ipsum");
/// assert_eq!(items[2], b"dolor");
/// # std::io::Result::Ok(()) });
/// ```
fn split(self, byte: u8) -> Split<Self>
where
Self: Sized,
{
Split {
reader: self,
buf: Vec::new(),
delim: byte,
read: 0,
}
}
}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FillBuf<'a, R: ?Sized> {
reader: Option<&'a mut R>,
}
impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
impl<'a, R> Future for FillBuf<'a, R>
where
R: AsyncBufRead + Unpin + ?Sized,
{
type Output = Result<&'a [u8]>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let reader = this
.reader
.take()
.expect("polled `FillBuf` after completion");
match Pin::new(&mut *reader).poll_fill_buf(cx) {
Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
},
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
this.reader = Some(reader);
Poll::Pending
}
}
}
}
/// Future for the [`AsyncBufReadExt::read_until()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
byte: u8,
buf: &'a mut Vec<u8>,
read: usize,
}
impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}
fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
byte: u8,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<Result<usize>> {
loop {
let (done, used) = {
let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
if let Some(i) = memchr(byte, available) {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
buf.extend_from_slice(available);
(false, available.len())
}
};
reader.as_mut().consume(used);
*read += used;
if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
}
}
}
/// Future for the [`AsyncBufReadExt::read_line()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut String,
bytes: Vec<u8>,
read: usize,
}
impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
read_line_internal(Pin::new(reader), cx, buf, bytes, read)
}
}
pin_project! {
/// Stream for the [`AsyncBufReadExt::lines()`] method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Lines<R> {
#[pin]
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}
}
impl<R: AsyncBufRead> Stream for Lines<R> {
type Item = Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let n = ready!(read_line_internal(
this.reader,
cx,
this.buf,
this.bytes,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if this.buf.ends_with('\n') {
this.buf.pop();
if this.buf.ends_with('\r') {
this.buf.pop();
}
}
Poll::Ready(Some(Ok(mem::take(this.buf))))
}
}
fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<Result<usize>> {
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
match String::from_utf8(mem::take(bytes)) {
Ok(s) => {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
*buf = s;
Poll::Ready(ret)
}
Err(_) => Poll::Ready(ret.and_then(|_| {
Err(Error::new(
ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
})),
}
}
pin_project! {
/// Stream for the [`AsyncBufReadExt::split()`] method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Split<R> {
#[pin]
reader: R,
buf: Vec<u8>,
read: usize,
delim: u8,
}
}
impl<R: AsyncBufRead> Stream for Split<R> {
type Item = Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let n = ready!(read_until_internal(
this.reader,
cx,
*this.delim,
this.buf,
this.read
))?;
if n == 0 && this.buf.is_empty() {
return Poll::Ready(None);
}
if this.buf[this.buf.len() - 1] == *this.delim {
this.buf.pop();
}
Poll::Ready(Some(Ok(mem::take(this.buf))))
}
}
/// Extension trait for [`AsyncRead`].
pub trait AsyncReadExt: AsyncRead {
/// Reads some bytes from the byte stream.
///
/// On success, returns the total number of bytes read.
///
/// If the return value is `Ok(n)`, then it must be guaranteed that
/// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
/// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
/// scenarios:
///
/// 1. This reader has reached its "end of file" and will likely no longer be able to
/// produce bytes. Note that this does not mean that the reader will always no
/// longer be able to produce bytes.
/// 2. The buffer specified was 0 bytes in length.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, BufReader};
///
/// # spin_on::spin_on(async {
/// let input: &[u8] = b"hello";
/// let mut reader = BufReader::new(input);
///
/// let mut buf = vec![0; 1024];
/// let n = reader.read(&mut buf).await?;
/// # std::io::Result::Ok(()) });
/// ```
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
where
Self: Unpin,
{
ReadFuture { reader: self, buf }
}
/// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
///
/// Data is copied to fill each buffer in order, with the final buffer possibly being
/// only partially filled. This method must behave same as a single call to
/// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSliceMut<'a>],
) -> ReadVectoredFuture<'a, Self>
where
Self: Unpin,
{
ReadVectoredFuture { reader: self, bufs }
}
/// Reads the entire contents and appends them to a [`Vec`].
///
/// On success, returns the total number of bytes read.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// # spin_on::spin_on(async {
/// let mut reader = Cursor::new(vec![1, 2, 3]);
/// let mut contents = Vec::new();
///
/// let n = reader.read_to_end(&mut contents).await?;
/// assert_eq!(n, 3);
/// assert_eq!(contents, [1, 2, 3]);
/// # std::io::Result::Ok(()) });
/// ```
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
where
Self: Unpin,
{
let start_len = buf.len();
ReadToEndFuture {
reader: self,
buf,
start_len,
}
}
/// Reads the entire contents and appends them to a [`String`].
///
/// On success, returns the total number of bytes read.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// # spin_on::spin_on(async {
/// let mut reader = Cursor::new(&b"hello");
/// let mut contents = String::new();
///
/// let n = reader.read_to_string(&mut contents).await?;
/// assert_eq!(n, 5);
/// assert_eq!(contents, "hello");
/// # std::io::Result::Ok(()) });
/// ```
fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
where
Self: Unpin,
{
ReadToStringFuture {
reader: self,
buf,
bytes: Vec::new(),
start_len: 0,
}
}
/// Reads the exact number of bytes required to fill `buf`.
///
/// On success, returns the total number of bytes read.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// # spin_on::spin_on(async {
/// let mut reader = Cursor::new(&b"hello");
/// let mut contents = vec![0; 3];
///
/// reader.read_exact(&mut contents).await?;
/// assert_eq!(contents, b"hel");
/// # std::io::Result::Ok(()) });
/// ```
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
where
Self: Unpin,
{
ReadExactFuture { reader: self, buf }
}
/// Creates an adapter which will read at most `limit` bytes from it.
///
/// This method returns a new instance of [`AsyncRead`] which will read at most
/// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// # spin_on::spin_on(async {
/// let mut reader = Cursor::new(&b"hello");
/// let mut contents = String::new();
///
/// let n = reader.take(3).read_to_string(&mut contents).await?;
/// assert_eq!(n, 3);
/// assert_eq!(contents, "hel");
/// # std::io::Result::Ok(()) });
/// ```
fn take(self, limit: u64) -> Take<Self>
where
Self: Sized,
{
Take { inner: self, limit }
}
/// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
///
/// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
/// use futures_lite::stream::StreamExt;
///
/// # spin_on::spin_on(async {
/// let reader = Cursor::new(&b"hello");
/// let mut bytes = reader.bytes();
///
/// while let Some(byte) = bytes.next().await {
/// println!("byte: {}", byte?);
/// }
/// # std::io::Result::Ok(()) });
/// ```
fn bytes(self) -> Bytes<Self>
where
Self: Sized,
{
Bytes { inner: self }
}
/// Creates an adapter which will chain this stream with another.
///
/// The returned [`AsyncRead`] instance will first read all bytes from this reader
/// until EOF is found, and then continue with `next`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// # spin_on::spin_on(async {
/// let r1 = Cursor::new(&b"hello");
/// let r2 = Cursor::new(&b"world");
/// let mut reader = r1.chain(r2);
///
/// let mut contents = String::new();
/// reader.read_to_string(&mut contents).await?;
/// assert_eq!(contents, "helloworld");
/// # std::io::Result::Ok(()) });
/// ```
fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
where
Self: Sized,
{
Chain {
first: self,
second: next,
done_first: false,
}
}
/// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncReadExt;
///
/// let reader = [1, 2, 3].boxed_reader();
/// ```
#[cfg(feature = "alloc")]
fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
where
Self: Sized + Send + 'a,
{
Box::pin(self)
}
}
impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
/// Future for the [`AsyncReadExt::read()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
}
impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
Pin::new(reader).poll_read(cx, buf)
}
}
/// Future for the [`AsyncReadExt::read_vectored()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
bufs: &'a mut [IoSliceMut<'a>],
}
impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, bufs } = &mut *self;
Pin::new(reader).poll_read_vectored(cx, bufs)
}
}
/// Future for the [`AsyncReadExt::read_to_end()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
start_len: usize,
}
impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
start_len,
} = &mut *self;
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
}
}
/// Future for the [`AsyncReadExt::read_to_string()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut String,
bytes: Vec<u8>,
start_len: usize,
}
impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
start_len,
} = &mut *self;
let reader = Pin::new(reader);
let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
match String::from_utf8(mem::take(bytes)) {
Ok(s) => {
debug_assert!(buf.is_empty());
**buf = s;
Poll::Ready(ret)
}
Err(_) => Poll::Ready(ret.and_then(|_| {
Err(Error::new(
ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
})),
}
}
}
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
start_len: usize,
) -> Poll<Result<usize>> {
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}
impl Drop for Guard<'_> {
fn drop(&mut self) {
self.buf.resize(self.len, 0);
}
}
let mut g = Guard {
len: buf.len(),
buf,
};
let ret;
loop {
if g.len == g.buf.len() {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.resize(capacity, 0);
}
match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
}
ret
}
/// Future for the [`AsyncReadExt::read_exact()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
}
impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
while !buf.is_empty() {
let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
let (_, rest) = mem::take(buf).split_at_mut(n);
*buf = rest;
if n == 0 {
return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
}
}
Poll::Ready(Ok(()))
}
}
pin_project! {
/// Reader for the [`AsyncReadExt::take()`] method.
#[derive(Debug)]
pub struct Take<R> {
#[pin]
inner: R,
limit: u64,
}
}
impl<R> Take<R> {
/// Returns the number of bytes before this adapter will return EOF.
///
/// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let reader = Cursor::new("hello");
///
/// let reader = reader.take(3);
/// assert_eq!(reader.limit(), 3);
/// ```
pub fn limit(&self) -> u64 {
self.limit
}
/// Puts a limit on the number of bytes.
///
/// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let reader = Cursor::new("hello");
///
/// let mut reader = reader.take(10);
/// assert_eq!(reader.limit(), 10);
///
/// reader.set_limit(3);
/// assert_eq!(reader.limit(), 3);
/// ```
pub fn set_limit(&mut self, limit: u64) {
self.limit = limit;
}
/// Gets a reference to the underlying reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let reader = Cursor::new("hello");
///
/// let reader = reader.take(3);
/// let r = reader.get_ref();
/// ```
pub fn get_ref(&self) -> &R {
&self.inner
}
/// Gets a mutable reference to the underlying reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let reader = Cursor::new("hello");
///
/// let mut reader = reader.take(3);
/// let r = reader.get_mut();
/// ```
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
/// Unwraps the adapter, returning the underlying reader.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let reader = Cursor::new("hello");
///
/// let reader = reader.take(3);
/// let reader = reader.into_inner();
/// ```
pub fn into_inner(self) -> R {
self.inner
}
}
impl<R: AsyncRead> AsyncRead for Take<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let this = self.project();
take_read_internal(this.inner, cx, buf, this.limit)
}
}
fn take_read_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut [u8],
limit: &mut u64,
) -> Poll<Result<usize>> {
// Don't call into inner reader at all at EOF because it may still block
if *limit == 0 {
return Poll::Ready(Ok(0));
}
let max = cmp::min(buf.len() as u64, *limit) as usize;
match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
Ok(n) => {
*limit -= n as u64;
Poll::Ready(Ok(n))
}
Err(e) => Poll::Ready(Err(e)),
}
}
impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
let this = self.project();
if *this.limit == 0 {
return Poll::Ready(Ok(&[]));
}
match ready!(this.inner.poll_fill_buf(cx)) {
Ok(buf) => {
let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
Poll::Ready(Ok(&buf[..cap]))
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
// Don't let callers reset the limit by passing an overlarge value
let amt = cmp::min(amt as u64, *this.limit) as usize;
*this.limit -= amt as u64;
this.inner.consume(amt);
}
}
pin_project! {
/// Reader for the [`AsyncReadExt::bytes()`] method.
#[derive(Debug)]
pub struct Bytes<R> {
#[pin]
inner: R,
}
}
impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
type Item = Result<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut byte = 0;
let rd = Pin::new(&mut self.inner);
match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
Ok(0) => Poll::Ready(None),
Ok(..) => Poll::Ready(Some(Ok(byte))),
Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
impl<R: AsyncRead> AsyncRead for Bytes<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
self.project().inner.poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
self.project().inner.poll_read_vectored(cx, bufs)
}
}
pin_project! {
/// Reader for the [`AsyncReadExt::chain()`] method.
pub struct Chain<R1, R2> {
#[pin]
first: R1,
#[pin]
second: R2,
done_first: bool,
}
}
impl<R1, R2> Chain<R1, R2> {
/// Gets references to the underlying readers.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let r1 = Cursor::new(b"hello");
/// let r2 = Cursor::new(b"world");
///
/// let reader = r1.chain(r2);
/// let (r1, r2) = reader.get_ref();
/// ```
pub fn get_ref(&self) -> (&R1, &R2) {
(&self.first, &self.second)
}
/// Gets mutable references to the underlying readers.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let r1 = Cursor::new(b"hello");
/// let r2 = Cursor::new(b"world");
///
/// let mut reader = r1.chain(r2);
/// let (r1, r2) = reader.get_mut();
/// ```
pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
(&mut self.first, &mut self.second)
}
/// Unwraps the adapter, returning the underlying readers.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncReadExt, Cursor};
///
/// let r1 = Cursor::new(b"hello");
/// let r2 = Cursor::new(b"world");
///
/// let reader = r1.chain(r2);
/// let (r1, r2) = reader.into_inner();
/// ```
pub fn into_inner(self) -> (R1, R2) {
(self.first, self.second)
}
}
impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Chain")
.field("r1", &self.first)
.field("r2", &self.second)
.finish()
}
}
impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let this = self.project();
if !*this.done_first {
match ready!(this.first.poll_read(cx, buf)) {
Ok(0) if !buf.is_empty() => *this.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
this.second.poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
let this = self.project();
if !*this.done_first {
match ready!(this.first.poll_read_vectored(cx, bufs)) {
Ok(0) if !bufs.is_empty() => *this.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
this.second.poll_read_vectored(cx, bufs)
}
}
impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
let this = self.project();
if !*this.done_first {
match ready!(this.first.poll_fill_buf(cx)) {
Ok([]) => *this.done_first = true,
Ok(buf) => return Poll::Ready(Ok(buf)),
Err(err) => return Poll::Ready(Err(err)),
}
}
this.second.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();
if !*this.done_first {
this.first.consume(amt)
} else {
this.second.consume(amt)
}
}
}
/// Extension trait for [`AsyncSeek`].
pub trait AsyncSeekExt: AsyncSeek {
/// Seeks to a new position in a byte stream.
///
/// Returns the new position in the byte stream.
///
/// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
///
/// # spin_on::spin_on(async {
/// let mut cursor = Cursor::new("hello");
///
/// // Move the cursor to the end.
/// cursor.seek(SeekFrom::End(0)).await?;
///
/// // Check the current position.
/// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
/// # std::io::Result::Ok(()) });
/// ```
fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
where
Self: Unpin,
{
SeekFuture { seeker: self, pos }
}
}
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
/// Future for the [`AsyncSeekExt::seek()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SeekFuture<'a, S: Unpin + ?Sized> {
seeker: &'a mut S,
pos: SeekFrom,
}
impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
type Output = Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pos = self.pos;
Pin::new(&mut *self.seeker).poll_seek(cx, pos)
}
}
/// Extension trait for [`AsyncWrite`].
pub trait AsyncWriteExt: AsyncWrite {
/// Writes some bytes into the byte stream.
///
/// Returns the number of bytes written from the start of the buffer.
///
/// If the return value is `Ok(n)` then it must be guaranteed that
/// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
/// object is no longer able to accept bytes and will likely not be able to in the
/// future as well, or that the provided buffer is empty.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// let n = writer.write(b"hello").await?;
/// # std::io::Result::Ok(()) });
/// ```
fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
where
Self: Unpin,
{
WriteFuture { writer: self, buf }
}
/// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
///
/// Data is copied from each buffer in order, with the final buffer possibly being only
/// partially consumed. This method must behave same as a call to
/// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
where
Self: Unpin,
{
WriteVectoredFuture { writer: self, bufs }
}
/// Writes an entire buffer into the byte stream.
///
/// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
/// data to be written or an error occurs. It will not return before the entire buffer is
/// successfully written or an error occurs.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// let n = writer.write_all(b"hello").await?;
/// # std::io::Result::Ok(()) });
/// ```
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
where
Self: Unpin,
{
WriteAllFuture { writer: self, buf }
}
/// Flushes the stream to ensure that all buffered contents reach their destination.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// writer.write_all(b"hello").await?;
/// writer.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
fn flush(&mut self) -> FlushFuture<'_, Self>
where
Self: Unpin,
{
FlushFuture { writer: self }
}
/// Closes the writer.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{AsyncWriteExt, BufWriter};
///
/// # spin_on::spin_on(async {
/// let mut output = Vec::new();
/// let mut writer = BufWriter::new(&mut output);
///
/// writer.close().await?;
/// # std::io::Result::Ok(()) });
/// ```
fn close(&mut self) -> CloseFuture<'_, Self>
where
Self: Unpin,
{
CloseFuture { writer: self }
}
/// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncWriteExt;
///
/// let writer = Vec::<u8>::new().boxed_writer();
/// ```
#[cfg(feature = "alloc")]
fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
where
Self: Sized + Send + 'a,
{
Box::pin(self)
}
}
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
/// Future for the [`AsyncWriteExt::write()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteFuture<'a, W: Unpin + ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
}
impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let buf = self.buf;
Pin::new(&mut *self.writer).poll_write(cx, buf)
}
}
/// Future for the [`AsyncWriteExt::write_vectored()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
writer: &'a mut W,
bufs: &'a [IoSlice<'a>],
}
impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let bufs = self.bufs;
Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
}
}
/// Future for the [`AsyncWriteExt::write_all()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
}
impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { writer, buf } = &mut *self;
while !buf.is_empty() {
let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
let (_, rest) = mem::take(buf).split_at(n);
*buf = rest;
if n == 0 {
return Poll::Ready(Err(ErrorKind::WriteZero.into()));
}
}
Poll::Ready(Ok(()))
}
}
/// Future for the [`AsyncWriteExt::flush()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FlushFuture<'a, W: Unpin + ?Sized> {
writer: &'a mut W,
}
impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.writer).poll_flush(cx)
}
}
/// Future for the [`AsyncWriteExt::close()`] method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CloseFuture<'a, W: Unpin + ?Sized> {
writer: &'a mut W,
}
impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.writer).poll_close(cx)
}
}
/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncReadExt;
///
/// let reader = [1, 2, 3].boxed_reader();
/// ```
#[cfg(feature = "alloc")]
pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
///
/// # Examples
///
/// ```
/// use futures_lite::io::AsyncWriteExt;
///
/// let writer = Vec::<u8>::new().boxed_writer();
/// ```
#[cfg(feature = "alloc")]
pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
///
/// # Examples
///
/// ```
/// use futures_lite::io::{self, Cursor};
///
/// # spin_on::spin_on(async {
/// let stream = Cursor::new(vec![]);
/// let (mut reader, mut writer) = io::split(stream);
/// # std::io::Result::Ok(()) });
/// ```
pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
where
T: AsyncRead + AsyncWrite + Unpin,
{
let inner = Arc::new(Mutex::new(stream));
(ReadHalf(inner.clone()), WriteHalf(inner))
}
/// The read half returned by [`split()`].
#[derive(Debug)]
pub struct ReadHalf<T>(Arc<Mutex<T>>);
/// The write half returned by [`split()`].
#[derive(Debug)]
pub struct WriteHalf<T>(Arc<Mutex<T>>);
impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let mut inner = self.0.lock().unwrap();
Pin::new(&mut *inner).poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
let mut inner = self.0.lock().unwrap();
Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
let mut inner = self.0.lock().unwrap();
Pin::new(&mut *inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut inner = self.0.lock().unwrap();
Pin::new(&mut *inner).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut inner = self.0.lock().unwrap();
Pin::new(&mut *inner).poll_close(cx)
}
}
#[cfg(feature = "memchr")]
use memchr::memchr;
/// Unoptimized memchr fallback.
#[cfg(not(feature = "memchr"))]
fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
haystack.iter().position(|&b| b == needle)
}