feat: switch to http-types and add http-service-h1
Removes http-service-lambda, as the lambda-runtime is still not updated to http@0.2
This commit is contained in:
parent
9e86b252ed
commit
f716daadf6
13
Cargo.toml
13
Cargo.toml
|
@ -16,14 +16,11 @@ name = "http-service"
|
|||
version = "0.4.0"
|
||||
|
||||
[workspace]
|
||||
members = ["http-service-hyper", "http-service-lambda", "http-service-mock"]
|
||||
members = ["http-service-hyper", "http-service-mock", "http-service-h1"]
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4.12"
|
||||
futures = "0.3.1"
|
||||
http = "0.1.17"
|
||||
async-std = { version = "1.0.1", default-features = false, features = ["std"] }
|
||||
pin-project-lite = "0.1.0"
|
||||
http-types = "1.0.1"
|
||||
async-std = { version = "1.5.0", default-features = false, features = ["std"] }
|
||||
|
||||
# [dev-dependencies]
|
||||
# http-service-hyper = { version = "0.4.0", path = "./http-service-hyper" }
|
||||
[dev-dependencies]
|
||||
http-service-hyper = { version = "0.4.0", path = "./http-service-hyper" }
|
||||
|
|
|
@ -28,7 +28,7 @@ impl HttpService for Server {
|
|||
|
||||
fn respond(&self, _conn: &mut (), _req: http_service::Request) -> Self::ResponseFuture {
|
||||
let message = self.message.clone();
|
||||
async move { Ok(Response::new(http_service::Body::from(message))) }.boxed()
|
||||
async move { Ok(Response::from(message)) }.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
[package]
|
||||
authors = [
|
||||
"dignifiedquire <me@dignifiedquire.com>"
|
||||
]
|
||||
description = "HttpService server that uses http-types and async-h1 as backend"
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
name = "http-service-h1"
|
||||
repository = "https://github.com/http-rs/http-service"
|
||||
documentation = "https://docs.rs/http-service-h1"
|
||||
version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
http-service = { version = "0.4.0", path = ".." }
|
||||
http-types = "1.0.1"
|
||||
async-h1 = "1.0.0"
|
||||
async-std = { version = "1.5.0", default-features = false, features = ["std"] }
|
||||
|
||||
[features]
|
||||
default = ["runtime"]
|
||||
runtime = ["async-std/default"]
|
|
@ -0,0 +1,153 @@
|
|||
//! `HttpService` server that uses async-h1 as backend.
|
||||
|
||||
#![forbid(future_incompatible, rust_2018_idioms)]
|
||||
#![deny(missing_debug_implementations, nonstandard_style)]
|
||||
#![warn(missing_docs, missing_doc_code_examples)]
|
||||
#![cfg_attr(test, deny(warnings))]
|
||||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use http_service::{Error, HttpService};
|
||||
|
||||
use async_std::io;
|
||||
use async_std::net::{SocketAddr, TcpStream};
|
||||
use async_std::prelude::*;
|
||||
use async_std::stream::Stream;
|
||||
use async_std::sync::Arc;
|
||||
|
||||
/// A listening HTTP server that accepts connections in HTTP1.
|
||||
#[derive(Debug)]
|
||||
pub struct Server<I, S: HttpService> {
|
||||
incoming: I,
|
||||
service: Arc<S>,
|
||||
addr: String,
|
||||
}
|
||||
|
||||
impl<I: Stream<Item = io::Result<TcpStream>>, S: HttpService> Server<I, S>
|
||||
where
|
||||
<<S as HttpService>::ResponseFuture as Future>::Output: Send,
|
||||
<S as HttpService>::Connection: Sync,
|
||||
I: Unpin + Send + Sync,
|
||||
{
|
||||
/// Consume this [`Builder`], creating a [`Server`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use http_service::{Response, Body};
|
||||
/// use http_service_h1::Server;
|
||||
/// use async_std::net::TcpListener;
|
||||
///
|
||||
/// // And an HttpService to handle each connection...
|
||||
/// let service = |req| async {
|
||||
/// Ok::<Response, async_std::io::Error>(Response::from("Hello World"))
|
||||
/// };
|
||||
///
|
||||
/// async_std::task::block_on(async move {
|
||||
/// // Then bind, configure the spawner to our pool, and serve...
|
||||
/// let mut listener = TcpListener::bind("127.0.0.1:3000").await?;
|
||||
/// let addr = format!("http://{}", listener.local_addr()?);
|
||||
/// let mut server = Server::new(addr, listener.incoming(), service);
|
||||
/// server.run().await?;
|
||||
/// Ok::<(), Box<dyn std::error::Error>>(())
|
||||
/// })?;
|
||||
/// # Ok::<(), Box<dyn std::error::Error>>(())
|
||||
pub fn new(addr: String, incoming: I, service: S) -> Self {
|
||||
Server {
|
||||
service: Arc::new(service),
|
||||
incoming,
|
||||
addr,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the server forever-ish.
|
||||
pub async fn run(&mut self) -> io::Result<()> {
|
||||
while let Some(stream) = self.incoming.next().await {
|
||||
let stream = stream?;
|
||||
async_std::task::spawn(accept(self.addr.clone(), self.service.clone(), stream));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept a new connection.
|
||||
async fn accept<S>(addr: String, service: Arc<S>, stream: TcpStream) -> Result<(), Error>
|
||||
where
|
||||
S: HttpService,
|
||||
<<S as HttpService>::ResponseFuture as Future>::Output: Send,
|
||||
<S as HttpService>::Connection: Sync,
|
||||
{
|
||||
// TODO: Delete this line when we implement `Clone` for `TcpStream`.
|
||||
let stream = WrapStream(Arc::new(stream));
|
||||
|
||||
let conn = service
|
||||
.clone()
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
|
||||
|
||||
async_h1::accept(&addr, stream.clone(), |req| async {
|
||||
let conn = conn.clone();
|
||||
let service = service.clone();
|
||||
async move {
|
||||
let resp = service
|
||||
.respond(conn, req)
|
||||
.await
|
||||
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
.await
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serve the given `HttpService` at the given address, using `async-h1` as backend, and return a
|
||||
/// `Future` that can be `await`ed on.
|
||||
pub async fn serve<S: HttpService>(service: S, addr: SocketAddr) -> io::Result<()>
|
||||
where
|
||||
<<S as HttpService>::ResponseFuture as Future>::Output: Send,
|
||||
<S as HttpService>::Connection: Sync,
|
||||
{
|
||||
let listener = async_std::net::TcpListener::bind(addr).await?;
|
||||
let addr = format!("http://{}", listener.local_addr()?); // TODO: https
|
||||
let mut server = Server::<_, S>::new(addr, listener.incoming(), service);
|
||||
|
||||
server.run().await
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct WrapStream(Arc<TcpStream>);
|
||||
|
||||
impl io::Read for WrapStream {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut &*self.0).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for WrapStream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut &*self.0).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut &*self.0).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut &*self.0).poll_close(cx)
|
||||
}
|
||||
}
|
|
@ -8,14 +8,15 @@ description = "HttpService server that uses Hyper as backend"
|
|||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
name = "http-service-hyper"
|
||||
repository = "https://github.com/rustasync/http-service"
|
||||
repository = "https://github.com/http-rs/http-service"
|
||||
documentation = "https://docs.rs/http-service-hyper"
|
||||
version = "0.4.1"
|
||||
|
||||
[dependencies]
|
||||
http = "0.1"
|
||||
http = "0.2"
|
||||
http-service = { version = "0.4.0", path = ".." }
|
||||
hyper = { version = "0.12.27", default-features = false }
|
||||
hyper = { version = "0.13.1", default-features = false, features = ["stream"] }
|
||||
http-types = { version = "1.0.1", features = ["hyperium_http"] }
|
||||
|
||||
[dependencies.futures]
|
||||
features = ["compat", "io-compat"]
|
||||
|
@ -26,4 +27,4 @@ default = ["runtime"]
|
|||
runtime = ["hyper/runtime"]
|
||||
|
||||
[dev-dependencies]
|
||||
romio = "0.3.0-alpha.8"
|
||||
async-std = "1.5.0"
|
||||
|
|
|
@ -5,27 +5,18 @@
|
|||
#![warn(missing_docs, missing_doc_code_examples)]
|
||||
#![cfg_attr(test, deny(warnings))]
|
||||
|
||||
use futures::compat::Future01CompatExt;
|
||||
#[cfg(feature = "runtime")]
|
||||
use futures::compat::{Compat as Compat03As01, Compat01As03};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::prelude::*;
|
||||
use futures::stream;
|
||||
use futures::task::Spawn;
|
||||
use http_service::{Body, HttpService};
|
||||
use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
|
||||
|
||||
use std::io;
|
||||
use std::convert::TryInto;
|
||||
#[cfg(feature = "runtime")]
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{self, Context, Poll};
|
||||
|
||||
// Wrapper type to allow us to provide a blanket `MakeService` impl
|
||||
struct WrapHttpService<H> {
|
||||
service: Arc<H>,
|
||||
}
|
||||
use std::task::{self, Poll};
|
||||
|
||||
// Wrapper type to allow us to provide a blanket `Service` impl
|
||||
struct WrapConnection<H: HttpService> {
|
||||
|
@ -33,64 +24,39 @@ struct WrapConnection<H: HttpService> {
|
|||
connection: H::Connection,
|
||||
}
|
||||
|
||||
impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
|
||||
impl<H> hyper::service::Service<hyper::Body> for WrapConnection<H>
|
||||
where
|
||||
H: HttpService,
|
||||
{
|
||||
type ReqBody = hyper::Body;
|
||||
type ResBody = hyper::Body;
|
||||
type Response = http::Response<hyper::Body>;
|
||||
type Error = std::io::Error;
|
||||
type Service = WrapConnection<H>;
|
||||
type Future = Compat03As01<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
|
||||
type MakeError = std::io::Error;
|
||||
|
||||
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
|
||||
let service = self.service.clone();
|
||||
let error = std::io::Error::from(std::io::ErrorKind::Other);
|
||||
async move {
|
||||
let connection = service.connect().into_future().await.map_err(|_| error)?;
|
||||
Ok(WrapConnection {
|
||||
service,
|
||||
connection,
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
.compat()
|
||||
}
|
||||
}
|
||||
|
||||
impl<H> hyper::service::Service for WrapConnection<H>
|
||||
where
|
||||
H: HttpService,
|
||||
{
|
||||
type ReqBody = hyper::Body;
|
||||
type ResBody = hyper::Body;
|
||||
type Error = std::io::Error;
|
||||
type Future =
|
||||
Compat03As01<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
|
||||
|
||||
fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
|
||||
// Convert Request
|
||||
let error = std::io::Error::from(std::io::ErrorKind::Other);
|
||||
let req = req.map(|body| {
|
||||
let body_stream = Compat01As03::new(body)
|
||||
.map(|chunk| chunk.map(|chunk| chunk.to_vec()))
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
|
||||
let req_hyper: http::Request<Body> = req.map(|body| {
|
||||
use futures::stream::TryStreamExt;
|
||||
let body_stream = body.map(|chunk| chunk.map(|c| c.to_vec()).map_err(|_| error));
|
||||
let body_reader = body_stream.into_async_read();
|
||||
Body::from_reader(body_reader)
|
||||
Body::from_reader(body_reader, None)
|
||||
});
|
||||
|
||||
let fut = self.service.respond(&mut self.connection, req);
|
||||
let req: http_types::Request = req_hyper.try_into().unwrap();
|
||||
let fut = self.service.respond(self.connection, req);
|
||||
|
||||
// Convert Request
|
||||
async move {
|
||||
let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
|
||||
let (parts, body) = res.into_parts();
|
||||
let body = hyper::Body::wrap_stream(Compat03As01::new(ChunkStream { body }));
|
||||
let fut = async {
|
||||
let res: http_types::Response = fut.into_future().await.map_err(|_| error)?;
|
||||
let res_hyper = hyper::Response::<Body>::from(res);
|
||||
|
||||
let (parts, body) = res_hyper.into_parts();
|
||||
let body = hyper::Body::wrap_stream(body);
|
||||
|
||||
Ok(hyper::Response::from_parts(parts, body))
|
||||
}
|
||||
.boxed()
|
||||
.compat()
|
||||
};
|
||||
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,13 +67,7 @@ where
|
|||
/// run by an executor.
|
||||
#[allow(clippy::type_complexity)] // single-use type with many compat layers
|
||||
pub struct Server<I: TryStream, S, Sp> {
|
||||
inner: Compat01As03<
|
||||
HyperServer<
|
||||
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
|
||||
WrapHttpService<S>,
|
||||
Compat03As01<Sp>,
|
||||
>,
|
||||
>,
|
||||
inner: HyperServer<I, S, Sp>,
|
||||
}
|
||||
|
||||
impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
|
||||
|
@ -119,10 +79,7 @@ impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
|
|||
/// A builder for a [`Server`].
|
||||
#[allow(clippy::type_complexity)] // single-use type with many compat layers
|
||||
pub struct Builder<I: TryStream, Sp> {
|
||||
inner: HyperBuilder<
|
||||
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
|
||||
Compat03As01<Sp>,
|
||||
>,
|
||||
inner: HyperBuilder<I, Sp>,
|
||||
}
|
||||
|
||||
impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
|
||||
|
@ -135,8 +92,7 @@ impl<I: TryStream> Server<I, (), ()> {
|
|||
/// Starts a [`Builder`] with the provided incoming stream.
|
||||
pub fn builder(incoming: I) -> Builder<I, ()> {
|
||||
Builder {
|
||||
inner: HyperServer::builder(Compat03As01::new(incoming.map_ok(Compat03As01::new as _)))
|
||||
.executor(Compat03As01::new(())),
|
||||
inner: HyperServer::builder(incoming),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +101,7 @@ impl<I: TryStream, Sp> Builder<I, Sp> {
|
|||
/// Sets the [`Spawn`] to deal with starting connection tasks.
|
||||
pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
|
||||
Builder {
|
||||
inner: self.inner.executor(Compat03As01::new(new_spawner)),
|
||||
inner: self.inner.executor(new_spawner),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,7 +112,7 @@ impl<I: TryStream, Sp> Builder<I, Sp> {
|
|||
/// ```no_run
|
||||
/// use http_service::{Response, Body};
|
||||
/// use http_service_hyper::Server;
|
||||
/// use romio::TcpListener;
|
||||
/// use async_std::net::TcpListener;
|
||||
///
|
||||
/// // Construct an executor to run our tasks on
|
||||
/// let mut pool = futures::executor::ThreadPool::new()?;
|
||||
|
@ -185,9 +141,7 @@ impl<I: TryStream, Sp> Builder<I, Sp> {
|
|||
for<'a> &'a Sp: Spawn,
|
||||
{
|
||||
Server {
|
||||
inner: Compat01As03::new(self.inner.serve(WrapHttpService {
|
||||
service: Arc::new(service),
|
||||
})),
|
||||
inner: self.inner.serve(service),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,10 +169,7 @@ pub fn serve<S: HttpService>(
|
|||
s: S,
|
||||
addr: SocketAddr,
|
||||
) -> impl Future<Output = Result<(), hyper::Error>> {
|
||||
let service = WrapHttpService {
|
||||
service: Arc::new(s),
|
||||
};
|
||||
hyper::Server::bind(&addr).serve(service).compat()
|
||||
hyper::Server::bind(&addr).serve(s).compat()
|
||||
}
|
||||
|
||||
/// Run the given `HttpService` at the given address on the default runtime, using `hyper` as
|
||||
|
@ -226,28 +177,5 @@ pub fn serve<S: HttpService>(
|
|||
#[cfg(feature = "runtime")]
|
||||
pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
|
||||
let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
|
||||
hyper::rt::run(server);
|
||||
}
|
||||
|
||||
/// A type that wraps an `AsyncRead` into a `Stream` of `hyper::Chunk`. Used for writing data to a
|
||||
/// Hyper response.
|
||||
struct ChunkStream<R: AsyncRead> {
|
||||
body: R,
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
|
||||
type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// This is not at all efficient, but that's okay for now.
|
||||
let mut buf = vec![0; 1024];
|
||||
let read = futures::ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf))?;
|
||||
if read == 0 {
|
||||
return Poll::Ready(None);
|
||||
} else {
|
||||
buf.truncate(read);
|
||||
let chunk = hyper::Chunk::from(buf);
|
||||
Poll::Ready(Some(Ok(chunk)))
|
||||
}
|
||||
}
|
||||
hyper::rt::Executor::execute(server);
|
||||
}
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
[package]
|
||||
authors = [
|
||||
"Aaron Turon <aturon@mozilla.com>",
|
||||
"Yoshua Wuyts <yoshuawuyts@gmail.com>",
|
||||
"Kan-Ru Chen <kanru@kanru.info>",
|
||||
]
|
||||
description = "HttpService server that uses AWS Lambda Rust Runtime as backend"
|
||||
documentation = "https://docs.rs/http-service-lambda"
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
name = "http-service-lambda"
|
||||
repository = "https://github.com/rustasync/http-service"
|
||||
version = "0.4.0"
|
||||
|
||||
[dependencies]
|
||||
http-service = { version = "0.4.0", path = ".." }
|
||||
lambda_http = "0.1.1"
|
||||
lambda_runtime = "0.2.1"
|
||||
futures = { version = "0.3.1", features = ["compat"] }
|
||||
tokio = "0.1.22"
|
||||
|
||||
[dev-dependencies]
|
||||
log = "0.4.6"
|
||||
simple_logger = { version = "1.3.0", default-features = false }
|
||||
tide = { version = "0.4.0", features = [], default-features = false }
|
|
@ -1,23 +0,0 @@
|
|||
## Quick Start
|
||||
|
||||
See the document for
|
||||
[aws-lambda-rust-runtime](https://github.com/awslabs/aws-lambda-rust-runtime)
|
||||
for how to build and deploy a lambda function.
|
||||
|
||||
Alternatively, build the example and deploy to lambda manually:
|
||||
|
||||
1. Build with musl target
|
||||
|
||||
```sh
|
||||
rustup target add x86_64-unknown-linux-musl
|
||||
cargo build --release --example hello_world --target x86_64-unknown-linux-musl
|
||||
```
|
||||
|
||||
2. Package
|
||||
|
||||
```sh
|
||||
cp ../../target/x86_64-unknown-linux-musl/release/examples/hello_world bootstrap
|
||||
zip lambda.zip bootstrap
|
||||
```
|
||||
|
||||
3. Use [AWS CLI](https://aws.amazon.com/cli/) or AWS console to create new lambda function.
|
|
@ -1,11 +0,0 @@
|
|||
use http_service_lambda;
|
||||
use simple_logger;
|
||||
|
||||
fn main() {
|
||||
simple_logger::init_with_level(log::Level::Info).unwrap();
|
||||
|
||||
let mut app = tide::new();
|
||||
app.at("/").get(|_| async move { "Hello, world!" });
|
||||
|
||||
http_service_lambda::run(app.into_http_service());
|
||||
}
|
|
@ -1,207 +0,0 @@
|
|||
//! `HttpService` server that uses AWS Lambda Rust Runtime as backend.
|
||||
//!
|
||||
//! This crate builds on the standard http interface provided by the
|
||||
//! [lambda_http](https://docs.rs/lambda_http) crate and provides a http server
|
||||
//! that runs on the lambda runtime.
|
||||
//!
|
||||
//! Compatible services like [tide](https://github.com/rustasync/tide) apps can
|
||||
//! run on lambda and processing events from API Gateway or ALB without much
|
||||
//! change.
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! **Hello World**
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! fn main() {
|
||||
//! let mut app = tide::new();
|
||||
//! app.at("/").get(async move |_| "Hello, world!");
|
||||
//! http_service_lambda::run(app.into_http_service());
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
#![forbid(future_incompatible, rust_2018_idioms)]
|
||||
#![deny(missing_debug_implementations, nonstandard_style)]
|
||||
#![warn(missing_docs, missing_doc_code_examples)]
|
||||
#![cfg_attr(test, deny(warnings))]
|
||||
|
||||
use futures::{
|
||||
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
|
||||
AsyncReadExt, Future, FutureExt, StreamExt, TryFutureExt,
|
||||
};
|
||||
use http_service::{Body as HttpBody, HttpService, Request as HttpRequest};
|
||||
use lambda_http::{lambda, Body as LambdaBody, Handler, Request as LambdaHttpRequest};
|
||||
use lambda_runtime::{error::HandlerError, Context};
|
||||
use std::{
|
||||
sync::mpsc::{channel as sync_channel, Sender as SyncSender},
|
||||
thread,
|
||||
};
|
||||
use tokio::runtime::Runtime as TokioRuntime;
|
||||
|
||||
type LambdaResponse = lambda_http::Response<LambdaBody>;
|
||||
|
||||
trait ResultExt<Ok, Error> {
|
||||
fn handler_error(self, description: &str) -> Result<Ok, HandlerError>;
|
||||
}
|
||||
|
||||
impl<Ok, Error> ResultExt<Ok, Error> for Result<Ok, Error> {
|
||||
fn handler_error(self, description: &str) -> Result<Ok, HandlerError> {
|
||||
self.map_err(|_| HandlerError::from(description))
|
||||
}
|
||||
}
|
||||
|
||||
type RequestSender = UnboundedSender<(LambdaHttpRequest, ResponseSender)>;
|
||||
type RequestReceiver = UnboundedReceiver<(LambdaHttpRequest, ResponseSender)>;
|
||||
type ResponseSender = SyncSender<Result<LambdaResponse, HandlerError>>;
|
||||
|
||||
struct Server<S> {
|
||||
service: S,
|
||||
requests: RequestReceiver,
|
||||
}
|
||||
|
||||
impl<S: HttpService> Server<S> {
|
||||
fn new(service: S, requests: RequestReceiver) -> Server<S> {
|
||||
Server { service, requests }
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Result<(), ()> {
|
||||
while let Some((req, reply)) = self.requests.next().await {
|
||||
let response = self.serve(req).await;
|
||||
reply.send(response).unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn serve(&self, req: LambdaHttpRequest) -> Result<LambdaResponse, HandlerError> {
|
||||
// Create new connection
|
||||
let mut connection = self
|
||||
.service
|
||||
.connect()
|
||||
.into_future()
|
||||
.await
|
||||
.handler_error("connect")?;
|
||||
|
||||
// Convert Lambda request to HTTP request
|
||||
let req: HttpRequest = req.map(|b| match b {
|
||||
LambdaBody::Binary(v) => HttpBody::from(v),
|
||||
LambdaBody::Text(s) => HttpBody::from(s.into_bytes()),
|
||||
LambdaBody::Empty => HttpBody::empty(),
|
||||
});
|
||||
|
||||
// Handle request
|
||||
let (parts, mut body) = self
|
||||
.service
|
||||
.respond(&mut connection, req)
|
||||
.into_future()
|
||||
.await
|
||||
.handler_error("respond")?
|
||||
.into_parts();
|
||||
|
||||
// Convert response back to Lambda response
|
||||
let mut buf = Vec::new();
|
||||
body.read_to_end(&mut buf).await.handler_error("body")?;
|
||||
let lambda_body = if buf.is_empty() {
|
||||
LambdaBody::Empty
|
||||
} else {
|
||||
match String::from_utf8(buf) {
|
||||
Ok(s) => LambdaBody::Text(s),
|
||||
Err(b) => LambdaBody::Binary(b.into_bytes()),
|
||||
}
|
||||
};
|
||||
Ok(LambdaResponse::from_parts(parts, lambda_body))
|
||||
}
|
||||
}
|
||||
|
||||
struct ProxyHandler(RequestSender);
|
||||
|
||||
impl Handler<LambdaResponse> for ProxyHandler {
|
||||
fn run(
|
||||
&mut self,
|
||||
event: LambdaHttpRequest,
|
||||
_ctx: Context,
|
||||
) -> Result<LambdaResponse, HandlerError> {
|
||||
let (reply, response_chan) = sync_channel();
|
||||
self.0
|
||||
.unbounded_send((event, reply))
|
||||
.handler_error("forward event")?;
|
||||
response_chan.recv().handler_error("receive response")?
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_proxy<S: HttpService>(
|
||||
service: S,
|
||||
) -> (ProxyHandler, impl Future<Output = Result<(), ()>>) {
|
||||
let (request_sender, requests) = unbounded();
|
||||
let server = Server::new(service, requests);
|
||||
(ProxyHandler(request_sender), server.run())
|
||||
}
|
||||
|
||||
/// Serve the given `HttpService` using `lambda_http` as backend and
|
||||
/// return a `Future` that can be `await`ed on.
|
||||
pub fn serve<S: HttpService>(s: S) -> impl Future<Output = Result<(), ()>> {
|
||||
let (handler, server_task) = prepare_proxy(s);
|
||||
thread::spawn(|| lambda!(handler));
|
||||
server_task
|
||||
}
|
||||
|
||||
/// Run the given `HttpService` on the default runtime, using
|
||||
/// `lambda_http` as backend.
|
||||
pub fn run<S: HttpService>(s: S) {
|
||||
let (handler, server) = prepare_proxy(s);
|
||||
let mut runtime = TokioRuntime::new().expect("Can not start tokio runtime");
|
||||
runtime.spawn(server.boxed().compat());
|
||||
lambda!(handler, runtime);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::future;
|
||||
use lambda_http::Handler;
|
||||
|
||||
struct DummyService;
|
||||
|
||||
impl HttpService for DummyService {
|
||||
type Connection = ();
|
||||
type ConnectionFuture = future::Ready<Result<(), ()>>;
|
||||
type ResponseFuture = future::BoxFuture<'static, Result<http_service::Response, ()>>;
|
||||
fn connect(&self) -> Self::ConnectionFuture {
|
||||
future::ok(())
|
||||
}
|
||||
fn respond(&self, _conn: &mut (), _req: http_service::Request) -> Self::ResponseFuture {
|
||||
Box::pin(async move { Ok(http_service::Response::new(http_service::Body::empty())) })
|
||||
}
|
||||
}
|
||||
|
||||
fn run_once(request: LambdaHttpRequest) -> Result<LambdaResponse, HandlerError> {
|
||||
let (mut handler, server) = prepare_proxy(DummyService);
|
||||
std::thread::spawn(|| futures::executor::block_on(server));
|
||||
handler.run(request, Context::default())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_apigw_request() {
|
||||
// from the docs
|
||||
// https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-api-gateway-request
|
||||
let input = include_str!("../tests/data/apigw_proxy_request.json");
|
||||
let request = lambda_http::request::from_str(input).unwrap();
|
||||
let result = run_once(request);
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
format!("event was not handled as expected {:?}", result)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_alb_request() {
|
||||
// from the docs
|
||||
// https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html#multi-value-headers
|
||||
let input = include_str!("../tests/data/alb_request.json");
|
||||
let request = lambda_http::request::from_str(input).unwrap();
|
||||
let result = run_once(request);
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
format!("event was not handled as expected {:?}", result)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
{
|
||||
"requestContext": {
|
||||
"elb": {
|
||||
"targetGroupArn": "arn:aws:elasticloadbalancing:region:123456789012:targetgroup/my-target-group/6d0ecf831eec9f09"
|
||||
}
|
||||
},
|
||||
"httpMethod": "GET",
|
||||
"path": "/",
|
||||
"queryStringParameters": { "myKey": "val2"},
|
||||
"headers": {
|
||||
"accept": "text/html,application/xhtml+xml",
|
||||
"accept-language": "en-US,en;q=0.8",
|
||||
"content-type": "text/plain",
|
||||
"cookie": "cookies",
|
||||
"host": "lambda-846800462-us-east-2.elb.amazonaws.com",
|
||||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6)",
|
||||
"x-amzn-trace-id": "Root=1-5bdb40ca-556d8b0c50dc66f0511bf520",
|
||||
"x-forwarded-for": "72.21.198.66",
|
||||
"x-forwarded-port": "443",
|
||||
"x-forwarded-proto": "https"
|
||||
},
|
||||
"isBase64Encoded": false,
|
||||
"body": "request_body"
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
{
|
||||
"path": "/test/hello",
|
||||
"headers": {
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
||||
"Accept-Encoding": "gzip, deflate, lzma, sdch, br",
|
||||
"Accept-Language": "en-US,en;q=0.8",
|
||||
"CloudFront-Forwarded-Proto": "https",
|
||||
"CloudFront-Is-Desktop-Viewer": "true",
|
||||
"CloudFront-Is-Mobile-Viewer": "false",
|
||||
"CloudFront-Is-SmartTV-Viewer": "false",
|
||||
"CloudFront-Is-Tablet-Viewer": "false",
|
||||
"CloudFront-Viewer-Country": "US",
|
||||
"Host": "wt6mne2s9k.execute-api.us-west-2.amazonaws.com",
|
||||
"Upgrade-Insecure-Requests": "1",
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.82 Safari/537.36 OPR/39.0.2256.48",
|
||||
"Via": "1.1 fb7cca60f0ecd82ce07790c9c5eef16c.cloudfront.net (CloudFront)",
|
||||
"X-Amz-Cf-Id": "nBsWBOrSHMgnaROZJK1wGCZ9PcRcSpq_oSXZNQwQ10OTZL4cimZo3g==",
|
||||
"X-Forwarded-For": "192.168.100.1, 192.168.1.1",
|
||||
"X-Forwarded-Port": "443",
|
||||
"X-Forwarded-Proto": "https"
|
||||
},
|
||||
"pathParameters": {
|
||||
"proxy": "hello"
|
||||
},
|
||||
"requestContext": {
|
||||
"accountId": "123456789012",
|
||||
"resourceId": "us4z18",
|
||||
"stage": "test",
|
||||
"requestId": "41b45ea3-70b5-11e6-b7bd-69b5aaebc7d9",
|
||||
"identity": {
|
||||
"cognitoIdentityPoolId": "",
|
||||
"accountId": "",
|
||||
"cognitoIdentityId": "",
|
||||
"caller": "",
|
||||
"apiKey": "",
|
||||
"sourceIp": "192.168.100.1",
|
||||
"cognitoAuthenticationType": "",
|
||||
"cognitoAuthenticationProvider": "",
|
||||
"userArn": "",
|
||||
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.82 Safari/537.36 OPR/39.0.2256.48",
|
||||
"user": ""
|
||||
},
|
||||
"resourcePath": "/{proxy+}",
|
||||
"httpMethod": "GET",
|
||||
"apiId": "wt6mne2s9k"
|
||||
},
|
||||
"resource": "/{proxy+}",
|
||||
"httpMethod": "GET",
|
||||
"queryStringParameters": {
|
||||
"name": "me"
|
||||
},
|
||||
"stageVariables": {
|
||||
"stageVarName": "stageVarValue"
|
||||
}
|
||||
}
|
|
@ -7,7 +7,7 @@ description = "Creates a HttpService server mock to test requests/responses agai
|
|||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
name = "http-service-mock"
|
||||
repository = "https://github.com/rustasync/http-service"
|
||||
repository = "https://github.com/http-rs/http-service"
|
||||
documentation = "https://docs.rs/http-service-mock"
|
||||
version = "0.4.0"
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ impl<T: HttpService> TestBackend<T> {
|
|||
) -> Result<Response, <T::ResponseFuture as TryFuture>::Error> {
|
||||
block_on(
|
||||
self.service
|
||||
.respond(&mut self.connection, req)
|
||||
.respond(self.connection.clone(), req)
|
||||
.into_future(),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
edition = "2018"
|
144
src/lib.rs
144
src/lib.rs
|
@ -6,85 +6,21 @@
|
|||
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
|
||||
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
|
||||
|
||||
use async_std::io::{self, prelude::*};
|
||||
use async_std::task::{Context, Poll};
|
||||
|
||||
use futures::future::TryFuture;
|
||||
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// The raw body of an http request or response.
|
||||
pub struct Body {
|
||||
#[pin]
|
||||
reader: Pin<Box<dyn BufRead + Send + 'static>>,
|
||||
}
|
||||
}
|
||||
|
||||
impl Body {
|
||||
/// Create a new empty body.
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
reader: Box::pin(io::empty()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new instance from a reader.
|
||||
pub fn from_reader(reader: impl BufRead + Unpin + Send + 'static) -> Self {
|
||||
Self {
|
||||
reader: Box::pin(reader),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for Body {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.reader).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufRead for Body {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'_ [u8]>> {
|
||||
let this = self.project();
|
||||
this.reader.poll_fill_buf(cx)
|
||||
}
|
||||
|
||||
fn consume(mut self: Pin<&mut Self>, amt: usize) {
|
||||
Pin::new(&mut self.reader).consume(amt)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Body {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Body").field("reader", &"<hidden>").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for Body {
|
||||
fn from(vec: Vec<u8>) -> Body {
|
||||
Self {
|
||||
reader: Box::pin(io::Cursor::new(vec)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: BufRead + Unpin + Send + 'static> From<Pin<Box<R>>> for Body {
|
||||
/// Converts an `AsyncRead` into a Body.
|
||||
fn from(reader: Pin<Box<R>>) -> Self {
|
||||
Self { reader }
|
||||
}
|
||||
}
|
||||
/// The raw body of an http request or response.
|
||||
pub type Body = http_types::Body;
|
||||
|
||||
/// An HTTP request with a streaming body.
|
||||
pub type Request = http::Request<Body>;
|
||||
pub type Request = http_types::Request;
|
||||
|
||||
/// An HTTP response with a streaming body.
|
||||
pub type Response = http::Response<Body>;
|
||||
pub type Response = http_types::Response;
|
||||
|
||||
/// An HTTP compatible error type.
|
||||
pub type Error = http_types::Error;
|
||||
|
||||
/// An async HTTP service
|
||||
///
|
||||
|
@ -95,7 +31,10 @@ pub trait HttpService: Send + Sync + 'static {
|
|||
///
|
||||
/// This associated type is used to establish and hold any per-connection state
|
||||
/// needed by the service.
|
||||
type Connection: Send + 'static;
|
||||
type Connection: Send + 'static + Clone;
|
||||
|
||||
/// Error when trying to connect.
|
||||
type ConnectionError: Into<Error> + Send;
|
||||
|
||||
/// A future for setting up an individual connection.
|
||||
///
|
||||
|
@ -104,7 +43,9 @@ pub trait HttpService: Send + Sync + 'static {
|
|||
///
|
||||
/// Returning an error will result in the server immediately dropping
|
||||
/// the connection.
|
||||
type ConnectionFuture: Send + 'static + TryFuture<Ok = Self::Connection>;
|
||||
type ConnectionFuture: Send
|
||||
+ 'static
|
||||
+ Future<Output = Result<Self::Connection, Self::ConnectionError>>;
|
||||
|
||||
/// Initiate a new connection.
|
||||
///
|
||||
|
@ -112,27 +53,56 @@ pub trait HttpService: Send + Sync + 'static {
|
|||
/// handles to connection pools, thread pools, or other global data.
|
||||
fn connect(&self) -> Self::ConnectionFuture;
|
||||
|
||||
/// Response error.
|
||||
type ResponseError: Into<Error> + Send;
|
||||
|
||||
/// The async computation for producing the response.
|
||||
///
|
||||
/// Returning an error will result in the server immediately dropping
|
||||
/// the connection. It is usually preferable to instead return an HTTP response
|
||||
/// with an error status code.
|
||||
type ResponseFuture: Send + 'static + TryFuture<Ok = Response>;
|
||||
type ResponseFuture: Send + 'static + Future<Output = Result<Response, Self::ResponseError>>;
|
||||
|
||||
/// Begin handling a single request.
|
||||
///
|
||||
/// The handler is given shared access to the service itself, and mutable access
|
||||
/// to the state for the connection where the request is taking place.
|
||||
fn respond(&self, conn: &mut Self::Connection, req: Request) -> Self::ResponseFuture;
|
||||
fn respond(&self, conn: Self::Connection, req: Request) -> Self::ResponseFuture;
|
||||
}
|
||||
|
||||
// impl<F, R, E> HttpService<E> for F
|
||||
// where
|
||||
// F: Send + Sync + 'static + Fn(Request) -> R,
|
||||
// R: Send + 'static + Future<Output = Result<Response, E>>,
|
||||
// {
|
||||
// type ResponseFuture = R;
|
||||
// fn respond(&self, req: Request) -> Self::ResponseFuture {
|
||||
// (self)(req)
|
||||
// }
|
||||
// }
|
||||
impl<F, R, E> HttpService for F
|
||||
where
|
||||
F: Send + Sync + 'static + Fn(Request) -> R,
|
||||
R: Send + 'static + Future<Output = Result<Response, E>>,
|
||||
E: Send + Into<Error>,
|
||||
{
|
||||
type Connection = ();
|
||||
type ConnectionError = Error;
|
||||
type ConnectionFuture = OkFuture;
|
||||
type ResponseFuture = R;
|
||||
type ResponseError = E;
|
||||
|
||||
fn connect(&self) -> Self::ConnectionFuture {
|
||||
OkFuture(true)
|
||||
}
|
||||
|
||||
fn respond(&self, _conn: Self::Connection, req: Request) -> Self::ResponseFuture {
|
||||
(self)(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// A future which resolves to `Ok(())`.
|
||||
#[derive(Debug)]
|
||||
pub struct OkFuture(bool);
|
||||
|
||||
impl Unpin for OkFuture {}
|
||||
|
||||
impl Future for OkFuture {
|
||||
type Output = Result<(), Error>;
|
||||
|
||||
#[inline]
|
||||
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.0 = false;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue