ex: Use smol-hyper in hyper-server example

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-01-15 10:47:14 -08:00 committed by John Nunley
parent f3054c3537
commit 490c4c6e64
2 changed files with 89 additions and 143 deletions

View File

@ -8,7 +8,6 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
@ -23,7 +22,7 @@ use smol_macros::main;
/// Sends a request and fetches the response. /// Sends a request and fetches the response.
async fn fetch( async fn fetch(
ex: &Arc<Executor<'static>>, ex: &Executor<'static>,
req: Request<Empty<&'static [u8]>>, req: Request<Empty<&'static [u8]>>,
) -> Result<Response<Incoming>> { ) -> Result<Response<Incoming>> {
// Connect to the HTTP server. // Connect to the HTTP server.
@ -66,7 +65,7 @@ async fn fetch(
} }
#[apply(main!)] #[apply(main!)]
async fn main(ex: Arc<Executor<'static>>) -> Result<()> { async fn main(ex: &Executor<'static>) -> Result<()> {
// Create a request. // Create a request.
let url: hyper::Uri = "https://www.rust-lang.org".try_into()?; let url: hyper::Uri = "https://www.rust-lang.org".try_into()?;
let req = Request::builder() let req = Request::builder()
@ -78,7 +77,7 @@ async fn main(ex: Arc<Executor<'static>>) -> Result<()> {
.body(Empty::new())?; .body(Empty::new())?;
// Fetch the response. // Fetch the response.
let resp = fetch(&ex, req).await?; let resp = fetch(ex, req).await?;
println!("{:#?}", resp); println!("{:#?}", resp);
// Read the message body. // Read the message body.

View File

@ -13,24 +13,54 @@
//! //!
//! Refer to `README.md` to see how to the TLS certificate was generated. //! Refer to `README.md` to see how to the TLS certificate was generated.
use std::net::{Shutdown, TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{Error, Result}; use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use hyper::service::{make_service_fn, service_fn}; use http_body_util::Full;
use hyper::{Body, Request, Response, Server}; use hyper::body::Incoming;
use smol::{future, io, prelude::*, Async}; use hyper::service::service_fn;
use hyper::{Request, Response};
use macro_rules_attribute::apply;
use smol::{future, io, prelude::*, Async, Executor};
use smol_hyper::rt::{FuturesIo, SmolTimer};
use smol_macros::main;
/// Serves a request and returns a response. /// Serves a request and returns a response.
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> { async fn serve(req: Request<Incoming>) -> Result<Response<Full<&'static [u8]>>> {
println!("Serving {}{}", host, req.uri()); println!("Serving {}", req.uri());
Ok(Response::new(Body::from("Hello from hyper!"))) Ok(Response::new(Full::new("Hello from hyper!".as_bytes())))
}
/// Handle a new client.
async fn handle_client(client: Async<TcpStream>, tls: Option<TlsAcceptor>) -> Result<()> {
// Wrap it in TLS if necessary.
let client = match &tls {
None => SmolStream::Plain(client),
Some(tls) => {
// In case of HTTPS, establish a secure TLS connection.
SmolStream::Tls(tls.accept(client).await?)
}
};
// Build the server.
hyper::server::conn::http1::Builder::new()
.timer(SmolTimer::new())
.serve_connection(FuturesIo::new(client), service_fn(serve))
.await?;
Ok(())
} }
/// Listens for incoming connections and serves them. /// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> { async fn listen(
ex: &Arc<Executor<'static>>,
listener: Async<TcpListener>,
tls: Option<TlsAcceptor>,
) -> Result<()> {
// Format the full host address. // Format the full host address.
let host = &match tls { let host = &match tls {
None => format!("http://{}", listener.get_ref().local_addr()?), None => format!("http://{}", listener.get_ref().local_addr()?),
@ -38,86 +68,42 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
}; };
println!("Listening on {}", host); println!("Listening on {}", host);
// Start a hyper server. loop {
Server::builder(SmolListener::new(&listener, tls)) // Wait for a new client.
.executor(SmolExecutor) let (client, _) = listener.accept().await?;
.serve(make_service_fn(move |_| {
let host = host.clone();
async { Ok::<_, Error>(service_fn(move |req| serve(req, host.clone()))) }
}))
.await?;
Ok(()) // Spawn a task to handle this connection.
ex.spawn({
let tls = tls.clone();
async move {
if let Err(e) = handle_client(client, tls).await {
println!("Error while handling client: {}", e);
}
}
})
.detach();
}
} }
fn main() -> Result<()> { #[apply(main!)]
async fn main(ex: &Arc<Executor<'static>>) -> Result<()> {
// Initialize TLS with the local certificate, private key, and password. // Initialize TLS with the local certificate, private key, and password.
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?; let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers. // Start HTTP and HTTPS servers.
smol::block_on(async { let http = listen(
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None); ex,
let https = listen( Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?,
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?, None,
Some(tls), );
); let https = listen(
future::try_zip(http, https).await?; ex,
Ok(()) Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
}) Some(tls),
} );
future::try_zip(http, https).await?;
/// Spawns futures. Ok(())
#[derive(Clone)]
struct SmolExecutor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for SmolExecutor {
fn execute(&self, fut: F) {
smol::spawn(async { drop(fut.await) }).detach();
}
}
/// Listens for incoming connections.
struct SmolListener<'a> {
tls: Option<TlsAcceptor>,
incoming: Pin<Box<dyn Stream<Item = io::Result<Async<TcpStream>>> + Send + 'a>>,
}
impl<'a> SmolListener<'a> {
fn new(listener: &'a Async<TcpListener>, tls: Option<TlsAcceptor>) -> Self {
Self {
incoming: Box::pin(listener.incoming()),
tls,
}
}
}
impl hyper::server::accept::Accept for SmolListener<'_> {
type Conn = SmolStream;
type Error = Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let stream = smol::ready!(self.incoming.as_mut().poll_next(cx)).unwrap()?;
let stream = match &self.tls {
None => SmolStream::Plain(stream),
Some(tls) => {
// In case of HTTPS, start establishing a secure TLS connection.
let tls = tls.clone();
SmolStream::Handshake(Box::pin(async move {
tls.accept(stream).await.map_err(|err| {
println!("Failed to establish secure TLS connection: {:#?}", err);
io::Error::new(io::ErrorKind::Other, Box::new(err))
})
}))
}
};
Poll::Ready(Some(Ok(stream)))
}
} }
/// A TCP or TCP+TLS connection. /// A TCP or TCP+TLS connection.
@ -127,83 +113,44 @@ enum SmolStream {
/// A TCP connection secured by TLS. /// A TCP connection secured by TLS.
Tls(TlsStream<Async<TcpStream>>), Tls(TlsStream<Async<TcpStream>>),
/// A TCP connection that is in process of getting secured by TLS.
#[allow(clippy::type_complexity)]
Handshake(Pin<Box<dyn Future<Output = io::Result<TlsStream<Async<TcpStream>>>> + Send>>),
} }
impl hyper::client::connect::Connection for SmolStream { impl AsyncRead for SmolStream {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
}
}
impl tokio::io::AsyncRead for SmolStream {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut [u8],
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<usize>> {
loop { match &mut *self {
match &mut *self { Self::Plain(s) => Pin::new(s).poll_read(cx, buf),
SmolStream::Plain(s) => { Self::Tls(s) => Pin::new(s).poll_read(cx, buf),
return Pin::new(s)
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
});
}
SmolStream::Tls(s) => {
return Pin::new(s)
.poll_read(cx, buf.initialize_unfilled())
.map_ok(|size| {
buf.advance(size);
});
}
SmolStream::Handshake(f) => {
let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s);
}
}
} }
} }
} }
impl tokio::io::AsyncWrite for SmolStream { impl AsyncWrite for SmolStream {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
loop { match &mut *self {
match &mut *self { Self::Plain(s) => Pin::new(s).poll_write(cx, buf),
SmolStream::Plain(s) => return Pin::new(s).poll_write(cx, buf), Self::Tls(s) => Pin::new(s).poll_write(cx, buf),
SmolStream::Tls(s) => return Pin::new(s).poll_write(cx, buf), }
SmolStream::Handshake(f) => { }
let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s); fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
} match &mut *self {
} Self::Plain(s) => Pin::new(s).poll_close(cx),
Self::Tls(s) => Pin::new(s).poll_close(cx),
} }
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self { match &mut *self {
SmolStream::Plain(s) => Pin::new(s).poll_flush(cx), Self::Plain(s) => Pin::new(s).poll_close(cx),
SmolStream::Tls(s) => Pin::new(s).poll_flush(cx), Self::Tls(s) => Pin::new(s).poll_close(cx),
SmolStream::Handshake(_) => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(s) => {
s.get_ref().shutdown(Shutdown::Write)?;
Poll::Ready(Ok(()))
}
SmolStream::Tls(s) => Pin::new(s).poll_close(cx),
SmolStream::Handshake(_) => Poll::Ready(Ok(())),
} }
} }
} }