feat(server): configurable timeout + drop max_reqs

`MAX_REQUESTS` is introducing errors when they are not actually needed in the current configuration. If a user needs to limit incoming connections, this should happen more intelligently at a higher level in the stack.
This commit is contained in:
dignifiedquire 2020-04-10 16:00:17 +02:00
parent 5b9e6ca914
commit fb185f9d3e
2 changed files with 46 additions and 17 deletions

View File

@ -115,4 +115,4 @@ mod server;
pub mod client;
pub use client::connect;
pub use server::accept;
pub use server::{accept, accept_with_opts, ServerOptions};

View File

@ -13,33 +13,62 @@ mod encode;
use decode::decode;
use encode::Encoder;
/// Configure the server.
#[derive(Debug, Clone)]
pub struct ServerOptions {
/// Timeout to handle headers. Defaults to 60s.
headers_timeout: Option<Duration>,
}
impl Default for ServerOptions {
fn default() -> Self {
Self {
headers_timeout: Some(Duration::from_secs(60)),
}
}
}
/// Accept a new incoming HTTP/1.1 connection.
///
/// Supports `KeepAlive` requests by default.
pub async fn accept<RW, F, Fut>(addr: &str, mut io: RW, endpoint: F) -> http_types::Result<()>
pub async fn accept<RW, F, Fut>(addr: &str, io: RW, endpoint: F) -> http_types::Result<()>
where
RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
F: Fn(Request) -> Fut,
Fut: Future<Output = http_types::Result<Response>>,
{
// TODO: make these values configurable
let timeout_duration = Duration::from_secs(10);
const MAX_REQUESTS: usize = 200;
let mut num_requests = 0;
accept_with_opts(addr, io, endpoint, Default::default()).await
}
/// Accept a new incoming HTTP/1.1 connection.
///
/// Supports `KeepAlive` requests by default.
pub async fn accept_with_opts<RW, F, Fut>(
addr: &str,
mut io: RW,
endpoint: F,
opts: ServerOptions,
) -> http_types::Result<()>
where
RW: Read + Write + Clone + Send + Sync + Unpin + 'static,
F: Fn(Request) -> Fut,
Fut: Future<Output = http_types::Result<Response>>,
{
loop {
// Stop parsing requests if we exceed the threshold.
match num_requests {
MAX_REQUESTS => return Ok(()),
_ => num_requests += 1,
};
// Decode a new request, timing out if this takes longer than the timeout duration.
let fut = decode(addr, io.clone());
// Decode a new request, timing out if this takes longer than the
// timeout duration.
let req = match timeout(timeout_duration, decode(addr, io.clone())).await {
Ok(Ok(Some(r))) => r,
Ok(Ok(None)) | Err(TimeoutError { .. }) => break, /* EOF or timeout */
Ok(Err(e)) => return Err(e),
let req = if let Some(timeout_duration) = opts.headers_timeout {
match timeout(timeout_duration, fut).await {
Ok(Ok(Some(r))) => r,
Ok(Ok(None)) | Err(TimeoutError { .. }) => break, /* EOF or timeout */
Ok(Err(e)) => return Err(e),
}
} else {
match fut.await? {
Some(r) => r,
None => break, /* EOF */
}
};
// Pass the request to the endpoint and encode the response.