commit 109830298c0aa15938ed6b5e436bc304ad256bbe Author: Stjepan Glavina Date: Mon Jun 22 15:02:46 2020 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ea9632a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "futures-lite" +version = "0.1.1" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "A lightweight subset of the futures crate" +license = "Apache-2.0 OR MIT" + +[features] +default = ["std"] +std = [] + +[dependencies] +futures-core = "0.3.5" +futures-io = { version = "0.3.5", default-features = false, features = ["std"] } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..aa5a32d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,157 @@ +use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub use futures_core::{Future, Stream}; +pub use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; + +#[macro_export] +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => t, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} + +#[macro_export] +macro_rules! pin { + ($($x:ident),* $(,)?) => { + $( + let mut $x = $x; + + #[allow(unused_mut)] + let mut $x = unsafe { + std::pin::Pin::new_unchecked(&mut $x) + }; + )* + } +} + +pub mod future { + use super::*; + + /// Future for the [`poll_fn`] function. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollFn { + f: F, + } + + impl Unpin for PollFn {} + + /// Creates a new future wrapping around a function returning [`Poll`]. + /// + /// Polling the returned future delegates to the wrapped function. + pub fn poll_fn(f: F) -> PollFn + where + F: FnMut(&mut Context<'_>) -> Poll, + { + PollFn { f } + } + + impl fmt::Debug for PollFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } + } + + impl Future for PollFn + where + F: FnMut(&mut Context<'_>) -> Poll, + { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } + } +} + +pub mod stream { + use super::*; + + /// Creates a `Stream` from a seed and a closure returning a `Future`. + /// + /// This function is the dual for the `Stream::fold()` adapter: while + /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a + /// `Stream` from a seed value. + /// + /// `unfold()` will call the provided closure with the provided seed, then wait + /// for the returned `Future` to complete with `(a, b)`. It will then yield the + /// value `a`, and use `b` as the next internal state. + /// + /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` + /// will stop producing items and return `Poll::Ready(None)` in future + /// calls to `poll()`. + /// + /// This function can typically be used when wanting to go from the "world of + /// futures" to the "world of streams": the provided closure can build a + /// `Future` using other library functions working on futures, and `unfold()` + /// will turn it into a `Stream` by repeating the operation. + pub fn unfold(init: T, f: F) -> Unfold + where + F: FnMut(T) -> Fut, + Fut: Future>, + { + Unfold { + f, + state: Some(init), + fut: None, + } + } + + /// Stream for the [`unfold`] function. + #[must_use = "streams do nothing unless polled"] + pub struct Unfold { + f: F, + state: Option, + fut: Option, + } + + impl fmt::Debug for Unfold + where + T: fmt::Debug, + Fut: fmt::Debug, + { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Unfold") + .field("state", &self.state) + .field("fut", &self.fut) + .finish() + } + } + + impl Stream for Unfold + where + F: FnMut(T) -> Fut, + Fut: Future>, + { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; + + if let Some(state) = this.state.take() { + this.fut = Some((this.f)(state)); + } + + let fut = unsafe { + Pin::new_unchecked( + this.fut + .as_mut() + .expect("Unfold must not be polled after it returned `Poll::Ready(None)`"), + ) + }; + let step = futures_core::ready!(fut.poll(cx)); + this.fut = None; + + if let Some((item, next_state)) = step { + this.state = Some(next_state); + Poll::Ready(Some(item)) + } else { + Poll::Ready(None) + } + } + } +}