ex: Use smol-hyper in hyper-client example

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-01-15 10:27:39 -08:00 committed by John Nunley
parent 5b505fc26d
commit f3054c3537
2 changed files with 102 additions and 133 deletions

View File

@ -37,11 +37,15 @@ ctrlc = "3"
doc-comment = "0.3" doc-comment = "0.3"
futures = "0.3" futures = "0.3"
http = "0.2" http = "0.2"
http-body-util = "0.1.0"
http-types = "2" http-types = "2"
hyper = { version = "0.14", default-features = false, features = ["client", "http1", "server", "stream"] } hyper = { version = "1.0", default-features = false, features = ["client", "http1", "server"] }
macro_rules_attribute = "0.2.0"
native-tls = "0.2" native-tls = "0.2"
scraper = "0.18" scraper = "0.18"
signal-hook = "0.3" signal-hook = "0.3"
smol-hyper = "0.1.0"
smol-macros = "0.1.0"
surf = { version = "2", default-features = false, features = ["h1-client"] } surf = { version = "2", default-features = false, features = ["h1-client"] }
tempfile = "3" tempfile = "3"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }

View File

@ -6,175 +6,140 @@
//! cargo run --example hyper-client //! cargo run --example hyper-client
//! ``` //! ```
use std::net::Shutdown; use std::convert::TryInto;
use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Error, Result}; use anyhow::{bail, Context as _, Result};
use async_native_tls::TlsStream; use async_native_tls::TlsStream;
use http::Uri; use http_body_util::{BodyStream, Empty};
use hyper::{Body, Client, Request, Response}; use hyper::body::Incoming;
use smol::{io, prelude::*, Async}; use hyper::{Request, Response};
use macro_rules_attribute::apply;
use smol::{io, net::TcpStream, prelude::*, Executor};
use smol_hyper::rt::FuturesIo;
use smol_macros::main;
/// Sends a request and fetches the response. /// Sends a request and fetches the response.
async fn fetch(req: Request<Body>) -> Result<Response<Body>> { async fn fetch(
Ok(Client::builder() ex: &Arc<Executor<'static>>,
.executor(SmolExecutor) req: Request<Empty<&'static [u8]>>,
.build::<_, Body>(SmolConnector) ) -> Result<Response<Incoming>> {
.request(req) // Connect to the HTTP server.
.await?) let io = {
} let host = req.uri().host().context("cannot parse host")?;
fn main() -> Result<()> { match req.uri().scheme_str() {
smol::block_on(async { Some("http") => {
// Create a request. let stream = {
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?; let port = req.uri().port_u16().unwrap_or(80);
TcpStream::connect((host, port)).await?
// Fetch the response. };
let resp = fetch(req).await?; SmolStream::Plain(stream)
println!("{:#?}", resp);
// Read the message body.
let body = resp
.into_body()
.try_fold(Vec::new(), |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok(body)
})
.await?;
println!("{}", String::from_utf8_lossy(&body));
Ok(())
})
}
/// Spawns futures.
#[derive(Clone)]
struct SmolExecutor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for SmolExecutor {
fn execute(&self, fut: F) {
smol::spawn(async { drop(fut.await) }).detach();
}
}
/// Connects to URLs.
#[derive(Clone)]
struct SmolConnector;
impl hyper::service::Service<Uri> for SmolConnector {
type Response = SmolStream;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, uri: Uri) -> Self::Future {
Box::pin(async move {
let host = uri.host().context("cannot parse host")?;
match uri.scheme_str() {
Some("http") => {
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(80);
smol::unblock(move || (host.as_str(), port).to_socket_addrs())
.await?
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
Ok(SmolStream::Plain(stream))
}
Some("https") => {
// In case of HTTPS, establish a secure TLS connection first.
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(443);
smol::unblock(move || (host.as_str(), port).to_socket_addrs())
.await?
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let stream = async_native_tls::connect(host, stream).await?;
Ok(SmolStream::Tls(stream))
}
scheme => bail!("unsupported scheme: {:?}", scheme),
} }
Some("https") => {
// In case of HTTPS, establish a secure TLS connection first.
let stream = {
let port = req.uri().port_u16().unwrap_or(443);
TcpStream::connect((host, port)).await?
};
let stream = async_native_tls::connect(host, stream).await?;
SmolStream::Tls(stream)
}
scheme => bail!("unsupported scheme: {:?}", scheme),
}
};
// Spawn the HTTP/1 connection.
let (mut sender, conn) = hyper::client::conn::http1::handshake(FuturesIo::new(io)).await?;
ex.spawn(async move {
if let Err(e) = conn.await {
println!("Connection failed: {:?}", e);
}
})
.detach();
// Get the result
let result = sender.send_request(req).await?;
Ok(result)
}
#[apply(main!)]
async fn main(ex: Arc<Executor<'static>>) -> Result<()> {
// Create a request.
let url: hyper::Uri = "https://www.rust-lang.org".try_into()?;
let req = Request::builder()
.header(
hyper::header::HOST,
url.authority().unwrap().clone().as_str(),
)
.uri(url)
.body(Empty::new())?;
// Fetch the response.
let resp = fetch(&ex, req).await?;
println!("{:#?}", resp);
// Read the message body.
let body: Vec<u8> = BodyStream::new(resp.into_body())
.try_fold(Vec::new(), |mut body, chunk| {
if let Some(chunk) = chunk.data_ref() {
body.extend_from_slice(chunk);
}
Ok(body)
}) })
} .await?;
println!("{}", String::from_utf8_lossy(&body));
Ok(())
} }
/// A TCP or TCP+TLS connection. /// A TCP or TCP+TLS connection.
enum SmolStream { enum SmolStream {
/// A plain TCP connection. /// A plain TCP connection.
Plain(Async<TcpStream>), Plain(TcpStream),
/// A TCP connection secured by TLS. /// A TCP connection secured by TLS.
Tls(TlsStream<Async<TcpStream>>), Tls(TlsStream<TcpStream>),
} }
impl hyper::client::connect::Connection for SmolStream { impl AsyncRead for SmolStream {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
}
}
impl tokio::io::AsyncRead for SmolStream {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut [u8],
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<usize>> {
match &mut *self { match &mut *self {
SmolStream::Plain(s) => { SmolStream::Plain(stream) => Pin::new(stream).poll_read(cx, buf),
Pin::new(s) SmolStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
})
}
SmolStream::Tls(s) => {
Pin::new(s)
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
})
}
} }
} }
} }
impl tokio::io::AsyncWrite for SmolStream { impl AsyncWrite for SmolStream {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
match &mut *self { match &mut *self {
SmolStream::Plain(s) => Pin::new(s).poll_write(cx, buf), SmolStream::Plain(stream) => Pin::new(stream).poll_write(cx, buf),
SmolStream::Tls(s) => Pin::new(s).poll_write(cx, buf), SmolStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_close(cx),
SmolStream::Tls(stream) => Pin::new(stream).poll_close(cx),
} }
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self { match &mut *self {
SmolStream::Plain(s) => Pin::new(s).poll_flush(cx), SmolStream::Plain(stream) => Pin::new(stream).poll_flush(cx),
SmolStream::Tls(s) => Pin::new(s).poll_flush(cx), SmolStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(s) => {
s.get_ref().shutdown(Shutdown::Write)?;
Poll::Ready(Ok(()))
}
SmolStream::Tls(s) => Pin::new(s).poll_close(cx),
} }
} }
} }