smol/examples/hyper-client.rs

181 lines
5.5 KiB
Rust
Raw Normal View History

2020-04-20 15:45:14 +00:00
//! An HTTP+TLS client based on `hyper` and `async-native-tls`.
//!
//! Run with:
//!
//! ```
//! cargo run --example hyper-client
//! ```
2020-03-26 13:57:06 +00:00
2020-07-14 19:19:01 +00:00
use std::net::Shutdown;
2020-07-15 09:37:50 +00:00
use std::net::{TcpStream, ToSocketAddrs};
2020-03-26 13:57:06 +00:00
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Error, Result};
2020-03-29 15:59:28 +00:00
use async_native_tls::TlsStream;
2020-03-26 13:57:06 +00:00
use http::Uri;
2020-03-26 22:36:03 +00:00
use hyper::{Body, Client, Request, Response};
2020-08-26 21:59:49 +00:00
use smol::{io, prelude::*, Async};
2020-03-26 13:57:06 +00:00
2020-04-20 15:45:14 +00:00
/// Sends a request and fetches the response.
2020-03-29 15:59:28 +00:00
async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
2020-03-26 22:36:03 +00:00
Ok(Client::builder()
2020-03-26 13:57:06 +00:00
.executor(SmolExecutor)
2020-03-26 22:36:03 +00:00
.build::<_, Body>(SmolConnector)
.request(req)
.await?)
2020-03-26 13:57:06 +00:00
}
fn main() -> Result<()> {
2020-08-26 21:59:49 +00:00
smol::block_on(async {
2020-04-20 15:45:14 +00:00
// Create a request.
2020-03-26 22:36:03 +00:00
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?;
2020-04-20 15:45:14 +00:00
// Fetch the response.
2020-03-29 15:59:28 +00:00
let resp = fetch(req).await?;
2020-03-26 13:57:06 +00:00
println!("{:#?}", resp);
2020-04-20 15:45:14 +00:00
// Read the message body.
2020-03-26 13:57:06 +00:00
let body = resp
.into_body()
2020-07-19 22:55:35 +00:00
.try_fold(Vec::new(), |mut body, chunk| {
2020-03-26 22:36:03 +00:00
body.extend_from_slice(&chunk);
Ok(body)
2020-03-26 13:57:06 +00:00
})
.await?;
2020-04-01 19:40:27 +00:00
println!("{}", String::from_utf8_lossy(&body));
2020-04-20 15:45:14 +00:00
2020-03-26 13:57:06 +00:00
Ok(())
})
}
2020-04-26 17:37:29 +00:00
/// Spawns futures.
2020-03-26 13:57:06 +00:00
#[derive(Clone)]
struct SmolExecutor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for SmolExecutor {
fn execute(&self, fut: F) {
2020-08-26 21:59:49 +00:00
smol::spawn(async { drop(fut.await) }).detach();
2020-03-26 13:57:06 +00:00
}
}
2020-04-26 17:37:29 +00:00
/// Connects to URLs.
2020-03-26 13:57:06 +00:00
#[derive(Clone)]
struct SmolConnector;
impl hyper::service::Service<Uri> for SmolConnector {
type Response = SmolStream;
type Error = Error;
2020-07-19 22:55:35 +00:00
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
2020-03-26 13:57:06 +00:00
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, uri: Uri) -> Self::Future {
Box::pin(async move {
let host = uri.host().context("cannot parse host")?;
match uri.scheme_str() {
Some("http") => {
2020-07-15 09:37:50 +00:00
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(80);
2020-08-26 21:59:49 +00:00
smol::unblock(move || (host.as_str(), port).to_socket_addrs())
.await?
2020-07-15 09:37:50 +00:00
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
2020-03-29 15:59:28 +00:00
Ok(SmolStream::Plain(stream))
2020-03-26 13:57:06 +00:00
}
Some("https") => {
2020-04-20 15:45:14 +00:00
// In case of HTTPS, establish a secure TLS connection first.
2020-07-15 09:37:50 +00:00
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(443);
2020-08-26 21:59:49 +00:00
smol::unblock(move || (host.as_str(), port).to_socket_addrs())
.await?
2020-07-15 09:37:50 +00:00
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
2020-03-29 15:59:28 +00:00
let stream = async_native_tls::connect(host, stream).await?;
Ok(SmolStream::Tls(stream))
2020-03-26 13:57:06 +00:00
}
scheme => bail!("unsupported scheme: {:?}", scheme),
}
})
}
}
2020-04-26 17:37:29 +00:00
/// A TCP or TCP+TLS connection.
2020-03-26 13:57:06 +00:00
enum SmolStream {
2020-04-26 17:37:29 +00:00
/// A plain TCP connection.
2020-07-15 09:37:50 +00:00
Plain(Async<TcpStream>),
2020-04-26 17:37:29 +00:00
/// A TCP connection secured by TLS.
2020-07-15 09:37:50 +00:00
Tls(TlsStream<Async<TcpStream>>),
2020-03-26 13:57:06 +00:00
}
impl hyper::client::connect::Connection for SmolStream {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
}
}
impl tokio::io::AsyncRead for SmolStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
2020-03-26 13:57:06 +00:00
match &mut *self {
SmolStream::Plain(s) => {
Pin::new(s)
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
})
}
SmolStream::Tls(s) => {
Pin::new(s)
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
})
}
2020-03-26 13:57:06 +00:00
}
}
}
impl tokio::io::AsyncWrite for SmolStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
2020-03-29 15:59:28 +00:00
SmolStream::Plain(s) => Pin::new(s).poll_write(cx, buf),
SmolStream::Tls(s) => Pin::new(s).poll_write(cx, buf),
2020-03-26 13:57:06 +00:00
}
}
2020-03-26 22:36:03 +00:00
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
2020-03-26 13:57:06 +00:00
match &mut *self {
2020-03-29 15:59:28 +00:00
SmolStream::Plain(s) => Pin::new(s).poll_flush(cx),
SmolStream::Tls(s) => Pin::new(s).poll_flush(cx),
2020-03-26 13:57:06 +00:00
}
}
2020-03-31 16:04:16 +00:00
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
2020-03-26 13:57:06 +00:00
match &mut *self {
2020-03-31 16:04:16 +00:00
SmolStream::Plain(s) => {
2020-07-15 09:37:50 +00:00
s.get_ref().shutdown(Shutdown::Write)?;
2020-03-31 16:04:16 +00:00
Poll::Ready(Ok(()))
}
SmolStream::Tls(s) => Pin::new(s).poll_close(cx),
2020-03-26 13:57:06 +00:00
}
}
}