diff --git a/Cargo.toml b/Cargo.toml index 4888b7a..cf92da9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,11 +37,15 @@ ctrlc = "3" doc-comment = "0.3" futures = "0.3" http = "0.2" +http-body-util = "0.1.0" http-types = "2" -hyper = { version = "0.14", default-features = false, features = ["client", "http1", "server", "stream"] } +hyper = { version = "1.0", default-features = false, features = ["client", "http1", "server"] } +macro_rules_attribute = "0.2.0" native-tls = "0.2" scraper = "0.18" signal-hook = "0.3" +smol-hyper = "0.1.0" +smol-macros = "0.1.0" surf = { version = "2", default-features = false, features = ["h1-client"] } tempfile = "3" tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } diff --git a/examples/hyper-client.rs b/examples/hyper-client.rs index 63676cf..bd00997 100644 --- a/examples/hyper-client.rs +++ b/examples/hyper-client.rs @@ -6,175 +6,140 @@ //! cargo run --example hyper-client //! ``` -use std::net::Shutdown; -use std::net::{TcpStream, ToSocketAddrs}; +use std::convert::TryInto; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; -use anyhow::{bail, Context as _, Error, Result}; +use anyhow::{bail, Context as _, Result}; use async_native_tls::TlsStream; -use http::Uri; -use hyper::{Body, Client, Request, Response}; -use smol::{io, prelude::*, Async}; +use http_body_util::{BodyStream, Empty}; +use hyper::body::Incoming; +use hyper::{Request, Response}; +use macro_rules_attribute::apply; +use smol::{io, net::TcpStream, prelude::*, Executor}; +use smol_hyper::rt::FuturesIo; +use smol_macros::main; /// Sends a request and fetches the response. -async fn fetch(req: Request) -> Result> { - Ok(Client::builder() - .executor(SmolExecutor) - .build::<_, Body>(SmolConnector) - .request(req) - .await?) -} +async fn fetch( + ex: &Arc>, + req: Request>, +) -> Result> { + // Connect to the HTTP server. + let io = { + let host = req.uri().host().context("cannot parse host")?; -fn main() -> Result<()> { - smol::block_on(async { - // Create a request. - let req = Request::get("https://www.rust-lang.org").body(Body::empty())?; - - // Fetch the response. - let resp = fetch(req).await?; - println!("{:#?}", resp); - - // Read the message body. - let body = resp - .into_body() - .try_fold(Vec::new(), |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok(body) - }) - .await?; - println!("{}", String::from_utf8_lossy(&body)); - - Ok(()) - }) -} - -/// Spawns futures. -#[derive(Clone)] -struct SmolExecutor; - -impl hyper::rt::Executor for SmolExecutor { - fn execute(&self, fut: F) { - smol::spawn(async { drop(fut.await) }).detach(); - } -} - -/// Connects to URLs. -#[derive(Clone)] -struct SmolConnector; - -impl hyper::service::Service for SmolConnector { - type Response = SmolStream; - type Error = Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, uri: Uri) -> Self::Future { - Box::pin(async move { - let host = uri.host().context("cannot parse host")?; - - match uri.scheme_str() { - Some("http") => { - let socket_addr = { - let host = host.to_string(); - let port = uri.port_u16().unwrap_or(80); - smol::unblock(move || (host.as_str(), port).to_socket_addrs()) - .await? - .next() - .context("cannot resolve address")? - }; - let stream = Async::::connect(socket_addr).await?; - Ok(SmolStream::Plain(stream)) - } - Some("https") => { - // In case of HTTPS, establish a secure TLS connection first. - let socket_addr = { - let host = host.to_string(); - let port = uri.port_u16().unwrap_or(443); - smol::unblock(move || (host.as_str(), port).to_socket_addrs()) - .await? - .next() - .context("cannot resolve address")? - }; - let stream = Async::::connect(socket_addr).await?; - let stream = async_native_tls::connect(host, stream).await?; - Ok(SmolStream::Tls(stream)) - } - scheme => bail!("unsupported scheme: {:?}", scheme), + match req.uri().scheme_str() { + Some("http") => { + let stream = { + let port = req.uri().port_u16().unwrap_or(80); + TcpStream::connect((host, port)).await? + }; + SmolStream::Plain(stream) } + Some("https") => { + // In case of HTTPS, establish a secure TLS connection first. + let stream = { + let port = req.uri().port_u16().unwrap_or(443); + TcpStream::connect((host, port)).await? + }; + let stream = async_native_tls::connect(host, stream).await?; + SmolStream::Tls(stream) + } + scheme => bail!("unsupported scheme: {:?}", scheme), + } + }; + + // Spawn the HTTP/1 connection. + let (mut sender, conn) = hyper::client::conn::http1::handshake(FuturesIo::new(io)).await?; + ex.spawn(async move { + if let Err(e) = conn.await { + println!("Connection failed: {:?}", e); + } + }) + .detach(); + + // Get the result + let result = sender.send_request(req).await?; + Ok(result) +} + +#[apply(main!)] +async fn main(ex: Arc>) -> Result<()> { + // Create a request. + let url: hyper::Uri = "https://www.rust-lang.org".try_into()?; + let req = Request::builder() + .header( + hyper::header::HOST, + url.authority().unwrap().clone().as_str(), + ) + .uri(url) + .body(Empty::new())?; + + // Fetch the response. + let resp = fetch(&ex, req).await?; + println!("{:#?}", resp); + + // Read the message body. + let body: Vec = BodyStream::new(resp.into_body()) + .try_fold(Vec::new(), |mut body, chunk| { + if let Some(chunk) = chunk.data_ref() { + body.extend_from_slice(chunk); + } + Ok(body) }) - } + .await?; + println!("{}", String::from_utf8_lossy(&body)); + + Ok(()) } /// A TCP or TCP+TLS connection. enum SmolStream { /// A plain TCP connection. - Plain(Async), + Plain(TcpStream), /// A TCP connection secured by TLS. - Tls(TlsStream>), + Tls(TlsStream), } -impl hyper::client::connect::Connection for SmolStream { - fn connected(&self) -> hyper::client::connect::Connected { - hyper::client::connect::Connected::new() - } -} - -impl tokio::io::AsyncRead for SmolStream { +impl AsyncRead for SmolStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { + buf: &mut [u8], + ) -> Poll> { match &mut *self { - SmolStream::Plain(s) => { - Pin::new(s) - .poll_read(cx, buf.initialize_unfilled()) - .map_ok(|size| { - buf.advance(size); - }) - } - SmolStream::Tls(s) => { - Pin::new(s) - .poll_read(cx, buf.initialize_unfilled()) - .map_ok(|size| { - buf.advance(size); - }) - } + SmolStream::Plain(stream) => Pin::new(stream).poll_read(cx, buf), + SmolStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf), } } } -impl tokio::io::AsyncWrite for SmolStream { +impl AsyncWrite for SmolStream { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { match &mut *self { - SmolStream::Plain(s) => Pin::new(s).poll_write(cx, buf), - SmolStream::Tls(s) => Pin::new(s).poll_write(cx, buf), + SmolStream::Plain(stream) => Pin::new(stream).poll_write(cx, buf), + SmolStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + SmolStream::Plain(stream) => Pin::new(stream).poll_close(cx), + SmolStream::Tls(stream) => Pin::new(stream).poll_close(cx), } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { - SmolStream::Plain(s) => Pin::new(s).poll_flush(cx), - SmolStream::Tls(s) => Pin::new(s).poll_flush(cx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match &mut *self { - SmolStream::Plain(s) => { - s.get_ref().shutdown(Shutdown::Write)?; - Poll::Ready(Ok(())) - } - SmolStream::Tls(s) => Pin::new(s).poll_close(cx), + SmolStream::Plain(stream) => Pin::new(stream).poll_flush(cx), + SmolStream::Tls(stream) => Pin::new(stream).poll_flush(cx), } } }