feat: unstable HttpClient Config

This adds an `unstable-config` feature, with a new `Config` struct, which can be used to configure any `HttpClient` which implements support for it.

Currently it supports two features - the most important and most generally supported:
- `timeout` (`Duration`)
- `no_delay` (`bool`)

Implementations are provided for async-h1, isahc, and hyper (partial, no `no_delay` support due to the tls connector).

No serious attempt has been made to add this to the wasm client at this point, since I don't understand well how to even build the wasm client or if it even works anymore with the state of rust wasm web build tools.
This commit is contained in:
Jeremiah Senkpiel 2021-05-04 16:01:35 -07:00
parent 6ba80b3ce3
commit 6508f13644
8 changed files with 324 additions and 29 deletions

View File

@ -27,11 +27,13 @@ h1_client = ["async-h1", "async-std", "deadpool", "futures"]
native_client = ["curl_client", "wasm_client"]
curl_client = ["isahc", "async-std"]
wasm_client = ["js-sys", "web-sys", "wasm-bindgen", "wasm-bindgen-futures", "futures"]
hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util"]
hyper_client = ["hyper", "hyper-tls", "http-types/hyperium_http", "futures-util", "tokio"]
native-tls = ["async-native-tls"]
rustls = ["async-tls"]
unstable-config = []
[dependencies]
async-trait = "0.1.37"
dashmap = "4.0.2"
@ -53,6 +55,7 @@ async-tls = { version = "0.10.0", optional = true }
hyper = { version = "0.13.6", features = ["tcp"], optional = true }
hyper-tls = { version = "0.4.3", optional = true }
futures-util = { version = "0.3.5", features = ["io"], optional = true }
tokio = { version = "0.2", features = ["time"], optional = true }
# curl_client
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]

47
src/config.rs Normal file
View File

@ -0,0 +1,47 @@
//! Configuration for `HttpClient`s.
use std::time::Duration;
/// Configuration for `HttpClient`s.
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct Config {
/// TCP `NO_DELAY`.
///
/// Default: `false`.
pub no_delay: bool,
/// Connection timeout duration.
///
/// Default: `Some(Duration::from_secs(60))`.
pub timeout: Option<Duration>,
}
impl Config {
/// Construct new empty config.
pub fn new() -> Self {
Self {
no_delay: false,
timeout: Some(Duration::from_secs(60)),
}
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
impl Config {
/// Set TCP `NO_DELAY`.
pub fn set_no_delay(mut self, no_delay: bool) -> Self {
self.no_delay = no_delay;
self
}
/// Set connection timeout duration.
pub fn set_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
}

View File

@ -1,5 +1,8 @@
//! http-client implementation for async-h1, with connecton pooling ("Keep-Alive").
#[cfg(feature = "unstable-config")]
use std::convert::{Infallible, TryFrom};
use std::fmt::Debug;
use std::net::SocketAddr;
@ -17,6 +20,8 @@ cfg_if::cfg_if! {
}
}
use crate::Config;
use super::{async_trait, Error, HttpClient, Request, Response};
mod tcp;
@ -40,6 +45,7 @@ pub struct H1Client {
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: HttpsPool,
max_concurrent_connections: usize,
config: Config,
}
impl Debug for H1Client {
@ -75,6 +81,7 @@ impl Debug for H1Client {
.collect::<Vec<String>>(),
)
.field("https_pools", &https_pools)
.field("config", &self.config)
.field(
"max_concurrent_connections",
&self.max_concurrent_connections,
@ -97,6 +104,7 @@ impl H1Client {
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
config: Config::default(),
}
}
@ -107,6 +115,7 @@ impl H1Client {
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: max,
config: Config::default(),
}
}
}
@ -152,7 +161,7 @@ impl HttpClient for H1Client {
let pool_ref = if let Some(pool_ref) = self.http_pools.get(&addr) {
pool_ref
} else {
let manager = TcpConnection::new(addr);
let manager = TcpConnection::new(addr, self.config.clone());
let pool = Pool::<TcpStream, std::io::Error>::new(
manager,
self.max_concurrent_connections,
@ -168,19 +177,28 @@ impl HttpClient for H1Client {
let stream = match pool.get().await {
Ok(s) => s,
Err(_) if has_another_addr => continue,
Err(e) => return Err(Error::from_str(400, e.to_string()))?,
Err(e) => return Err(Error::from_str(400, e.to_string())),
};
req.set_peer_addr(stream.peer_addr().ok());
req.set_local_addr(stream.local_addr().ok());
return client::connect(TcpConnWrapper::new(stream), req).await;
let tcp_conn = client::connect(TcpConnWrapper::new(stream), req);
#[cfg(feature = "unstable-config")]
return if let Some(timeout) = self.config.timeout {
async_std::future::timeout(timeout, tcp_conn).await?
} else {
tcp_conn.await
};
#[cfg(not(feature = "unstable-config"))]
return tcp_conn.await;
}
#[cfg(any(feature = "native-tls", feature = "rustls"))]
"https" => {
let pool_ref = if let Some(pool_ref) = self.https_pools.get(&addr) {
pool_ref
} else {
let manager = TlsConnection::new(host.clone(), addr);
let manager = TlsConnection::new(host.clone(), addr, self.config.clone());
let pool = Pool::<TlsStream<TcpStream>, Error>::new(
manager,
self.max_concurrent_connections,
@ -196,13 +214,21 @@ impl HttpClient for H1Client {
let stream = match pool.get().await {
Ok(s) => s,
Err(_) if has_another_addr => continue,
Err(e) => return Err(Error::from_str(400, e.to_string()))?,
Err(e) => return Err(Error::from_str(400, e.to_string())),
};
req.set_peer_addr(stream.get_ref().peer_addr().ok());
req.set_local_addr(stream.get_ref().local_addr().ok());
return client::connect(TlsConnWrapper::new(stream), req).await;
let tls_conn = client::connect(TlsConnWrapper::new(stream), req);
#[cfg(feature = "unstable-config")]
return if let Some(timeout) = self.config.timeout {
async_std::future::timeout(timeout, tls_conn).await?
} else {
tls_conn.await
};
#[cfg(not(feature = "unstable-config"))]
return tls_conn.await;
}
_ => unreachable!(),
}
@ -213,6 +239,37 @@ impl HttpClient for H1Client {
"missing valid address",
))
}
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
self.config = config;
Ok(())
}
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
&self.config
}
}
#[cfg(feature = "unstable-config")]
impl TryFrom<Config> for H1Client {
type Error = Infallible;
fn try_from(config: Config) -> Result<Self, Self::Error> {
Ok(Self {
http_pools: DashMap::new(),
#[cfg(any(feature = "native-tls", feature = "rustls"))]
https_pools: DashMap::new(),
max_concurrent_connections: DEFAULT_MAX_CONCURRENT_CONNECTIONS,
config,
})
}
}
#[cfg(test)]

View File

@ -8,13 +8,17 @@ use deadpool::managed::{Manager, Object, RecycleResult};
use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
use crate::Config;
#[derive(Clone, Debug)]
pub(crate) struct TcpConnection {
addr: SocketAddr,
config: Config,
}
impl TcpConnection {
pub(crate) fn new(addr: SocketAddr) -> Self {
Self { addr }
pub(crate) fn new(addr: SocketAddr, config: Config) -> Self {
Self { addr, config }
}
}
@ -58,12 +62,21 @@ impl AsyncWrite for TcpConnWrapper {
#[async_trait]
impl Manager<TcpStream, std::io::Error> for TcpConnection {
async fn create(&self) -> Result<TcpStream, std::io::Error> {
TcpStream::connect(self.addr).await
let tcp_stream = TcpStream::connect(self.addr).await?;
#[cfg(feature = "unstable-config")]
tcp_stream.set_nodelay(self.config.no_delay)?;
Ok(tcp_stream)
}
async fn recycle(&self, conn: &mut TcpStream) -> RecycleResult<std::io::Error> {
let mut buf = [0; 4];
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
#[cfg(feature = "unstable-config")]
conn.set_nodelay(self.config.no_delay)?;
match Pin::new(conn).poll_read(&mut cx, &mut buf) {
Poll::Ready(Err(error)) => Err(error),
Poll::Ready(Ok(bytes)) if bytes == 0 => Err(std::io::Error::new(

View File

@ -16,16 +16,18 @@ cfg_if::cfg_if! {
}
}
use crate::Error;
use crate::{Config, Error};
#[derive(Clone, Debug)]
pub(crate) struct TlsConnection {
host: String,
addr: SocketAddr,
config: Config,
}
impl TlsConnection {
pub(crate) fn new(host: String, addr: SocketAddr) -> Self {
Self { host, addr }
pub(crate) fn new(host: String, addr: SocketAddr, config: Config) -> Self {
Self { host, addr, config }
}
}
@ -70,6 +72,10 @@ impl AsyncWrite for TlsConnWrapper {
impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
async fn create(&self) -> Result<TlsStream<TcpStream>, Error> {
let raw_stream = async_std::net::TcpStream::connect(self.addr).await?;
#[cfg(feature = "unstable-config")]
raw_stream.set_nodelay(self.config.no_delay)?;
let tls_stream = add_tls(&self.host, raw_stream).await?;
Ok(tls_stream)
}
@ -77,6 +83,12 @@ impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
async fn recycle(&self, conn: &mut TlsStream<TcpStream>) -> RecycleResult<Error> {
let mut buf = [0; 4];
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
#[cfg(feature = "unstable-config")]
conn.get_ref()
.set_nodelay(self.config.no_delay)
.map_err(Error::from)?;
match Pin::new(conn).poll_read(&mut cx, &mut buf) {
Poll::Ready(Err(error)) => Err(error),
Poll::Ready(Ok(bytes)) if bytes == 0 => Err(std::io::Error::new(
@ -86,6 +98,7 @@ impl Manager<TlsStream<TcpStream>, Error> for TlsConnection {
_ => Ok(()),
}
.map_err(Error::from)?;
Ok(())
}
}

View File

@ -1,16 +1,22 @@
//! http-client implementation for reqwest
use super::{async_trait, Error, HttpClient, Request, Response};
#[cfg(feature = "unstable-config")]
use std::convert::Infallible;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::io;
use std::str::FromStr;
use futures_util::stream::TryStreamExt;
use http_types::headers::{HeaderName, HeaderValue};
use http_types::StatusCode;
use hyper::body::HttpBody;
use hyper::client::connect::Connect;
use hyper_tls::HttpsConnector;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::io;
use std::str::FromStr;
use crate::Config;
use super::{async_trait, Error, HttpClient, Request, Response};
type HyperRequest = hyper::Request<hyper::Body>;
@ -27,14 +33,21 @@ impl<C: Clone + Connect + Debug + Send + Sync + 'static> HyperClientObject for h
/// Hyper-based HTTP Client.
#[derive(Debug)]
pub struct HyperClient(Box<dyn HyperClientObject>);
pub struct HyperClient {
client: Box<dyn HyperClientObject>,
config: Config,
}
impl HyperClient {
/// Create a new client instance.
pub fn new() -> Self {
let https = HttpsConnector::new();
let client = hyper::Client::builder().build(https);
Self(Box::new(client))
Self {
client: Box::new(client),
config: Config::default(),
}
}
/// Create from externally initialized and configured client.
@ -42,7 +55,10 @@ impl HyperClient {
where
C: Clone + Connect + Debug + Send + Sync + 'static,
{
Self(Box::new(client))
Self {
client: Box::new(client),
config: Config::default(),
}
}
}
@ -57,11 +73,55 @@ impl HttpClient for HyperClient {
async fn send(&self, req: Request) -> Result<Response, Error> {
let req = HyperHttpRequest::try_from(req).await?.into_inner();
let response = self.0.dyn_request(req).await?;
let conn_fut = self.client.dyn_request(req);
#[cfg(feature = "unstable-config")]
let response = if let Some(timeout) = self.config.timeout {
match tokio::time::timeout(timeout, conn_fut).await {
Err(_elapsed) => Err(Error::from_str(400, "Client timed out")),
Ok(Ok(try_res)) => Ok(try_res),
Ok(Err(e)) => Err(e.into()),
}?
} else {
conn_fut.await?
};
#[cfg(not(feature = "unstable-config"))]
let response = conn_fut.await?;
let res = HttpTypesResponse::try_from(response).await?.into_inner();
Ok(res)
}
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
self.config = config;
Ok(())
}
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
&self.config
}
}
#[cfg(feature = "unstable-config")]
impl TryFrom<Config> for HyperClient {
type Error = Infallible;
fn try_from(config: Config) -> Result<Self, Self::Error> {
let connector = HttpsConnector::new();
let builder = hyper::Client::builder();
Ok(Self {
client: Box::new(builder.build(connector)),
config,
})
}
}
struct HyperHttpRequest(HyperRequest);

View File

@ -1,13 +1,23 @@
//! http-client implementation for isahc
#[cfg(feature = "unstable-config")]
use std::convert::TryFrom;
use async_std::io::BufReader;
#[cfg(feature = "unstable-config")]
use isahc::config::Configurable;
use isahc::{http, ResponseExt};
use crate::Config;
use super::{async_trait, Body, Error, HttpClient, Request, Response};
use async_std::io::BufReader;
use isahc::{http, ResponseExt};
/// Curl-based HTTP Client.
#[derive(Debug)]
pub struct IsahcClient(isahc::HttpClient);
pub struct IsahcClient {
client: isahc::HttpClient,
config: Config,
}
impl Default for IsahcClient {
fn default() -> Self {
@ -23,7 +33,10 @@ impl IsahcClient {
/// Create from externally initialized and configured client.
pub fn from_client(client: isahc::HttpClient) -> Self {
Self(client)
Self {
client,
config: Config::default(),
}
}
}
@ -45,7 +58,7 @@ impl HttpClient for IsahcClient {
};
let request = builder.body(body).unwrap();
let res = self.0.send_async(request).await.map_err(Error::from)?;
let res = self.client.send_async(request).await.map_err(Error::from)?;
let maybe_metrics = res.metrics().cloned();
let (parts, body) = res.into_parts();
let body = Body::from_reader(BufReader::new(body), None);
@ -61,6 +74,53 @@ impl HttpClient for IsahcClient {
response.set_body(body);
Ok(response)
}
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
let mut builder = isahc::HttpClient::builder();
if config.no_delay {
builder = builder.tcp_nodelay();
}
if let Some(timeout) = config.timeout {
builder = builder.timeout(timeout);
}
self.client = builder.build()?;
self.config = config;
Ok(())
}
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
&self.config
}
}
#[cfg(feature = "unstable-config")]
impl TryFrom<Config> for IsahcClient {
type Error = isahc::Error;
fn try_from(config: Config) -> Result<Self, Self::Error> {
let mut builder = isahc::HttpClient::builder();
if config.no_delay {
builder = builder.tcp_nodelay();
}
if let Some(timeout) = config.timeout {
builder = builder.timeout(timeout);
}
Ok(Self {
client: builder.build()?,
config,
})
}
}
#[cfg(test)]

View File

@ -14,6 +14,13 @@
forbid(unsafe_code)
)]
#[cfg(feature = "unstable-config")]
mod config;
#[cfg(feature = "unstable-config")]
pub use config::Config;
#[cfg(not(feature = "unstable-config"))]
type Config = ();
#[cfg_attr(feature = "docs", doc(cfg(curl_client)))]
#[cfg(all(feature = "curl_client", not(target_arch = "wasm32")))]
pub mod isahc;
@ -60,6 +67,31 @@ pub use http_types;
pub trait HttpClient: std::fmt::Debug + Unpin + Send + Sync + 'static {
/// Perform a request.
async fn send(&self, req: Request) -> Result<Response, Error>;
#[cfg(feature = "unstable-config")]
/// Override the existing configuration with new configuration.
///
/// Config options may not impact existing connections.
fn set_config(&mut self, _config: Config) -> http_types::Result<()> {
unimplemented!(
"{} has not implemented `HttpClient::set_config()`",
type_name_of(self)
)
}
#[cfg(feature = "unstable-config")]
/// Get the current configuration.
fn config(&self) -> &Config {
unimplemented!(
"{} has not implemented `HttpClient::config()`",
type_name_of(self)
)
}
}
#[cfg(feature = "unstable-config")]
fn type_name_of<T: ?Sized>(_val: &T) -> &'static str {
std::any::type_name::<T>()
}
/// The raw body of an http request or response.
@ -70,7 +102,17 @@ pub type Error = http_types::Error;
#[async_trait]
impl HttpClient for Box<dyn HttpClient> {
async fn send(&self, req: Request) -> Result<Response, Error> {
async fn send(&self, req: Request) -> http_types::Result<Response> {
self.as_ref().send(req).await
}
#[cfg(feature = "unstable-config")]
fn set_config(&mut self, config: Config) -> http_types::Result<()> {
self.as_mut().set_config(config)
}
#[cfg(feature = "unstable-config")]
fn config(&self) -> &Config {
self.as_ref().config()
}
}