From 22fef36da9960eb8800087694838f1398aea7220 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 20:01:23 -0800 Subject: [PATCH] Implement backoff on connection errors for the eventbus client This uses a fibonacci backoff (yay I pass the coding interview) for attempting to reconnect the connection if something cannot work properly --- auctioneer/src/main.rs | 3 ++- eventbus/src/client.rs | 54 ++++++++++++++++++++++++------------------ eventbus/src/lib.rs | 5 +++- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index fc13fff..1df3c48 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -11,6 +11,7 @@ use actix_web::{App, HttpResponse, HttpServer}; use log::debug; use otto_eventbus::client::*; +use otto_eventbus::*; /** * The index handler for the root of the Auctioneer web interface @@ -23,7 +24,7 @@ async fn route_index() -> HttpResponse { async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - let client = connect("http://127.0.0.1:8000/ws/").await; + let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await; debug!("Client created: {:?}", client); HttpServer::new(move || { diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs index 03b7bf4..687f734 100644 --- a/eventbus/src/client.rs +++ b/eventbus/src/client.rs @@ -45,41 +45,49 @@ impl std::fmt::Debug for EventBusClient { } } +pub trait EventDispatch { + fn dispatch(&self, payload: T); +} + /** * connect will create begin the connection process and start the EventBusClient * actor. * * The caller will need to await in order to get the EventBusClient */ -pub async fn connect(ws: &str) -> Addr { - /* - * Creating a awc::Client to handle the first part of our WebSocket client bootstrap - */ - let (response, framed) = Client::new() - .ws(ws) - .connect() - .await - .map_err(|e| { - error!("Error: {}", e); - }) - .unwrap(); +pub async fn connect(ws: &'static str, id: &'static str) -> Addr { + let mut backoff = 1; - info!("{:?}", response); - let (sink, stream) = framed.split(); + loop { + let r = Client::new().ws(ws).connect().await; - EventBusClient::create(|ctx| { - EventBusClient::add_stream(stream, ctx); - EventBusClient { - sink: SinkWrite::new(sink, ctx), - id: "auctioneer", + match r { + Ok((response, framed)) => { + let (sink, stream) = framed.split(); + + return EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: id, + } + }); + } + Err(e) => { + error!("Failed establish WebSocket: {}", e); + backoff = backoff + backoff; + std::thread::sleep(Duration::from_secs(backoff)); + } } - }) + } } impl EventBusClient { fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.sink + .write(Message::Ping(Bytes::from_static(b""))) + .unwrap(); act.hb(ctx); // client should also check for a timeout here, similar to the @@ -88,6 +96,7 @@ impl EventBusClient { } } +//impl Actor for EventBusClient { impl Actor for EventBusClient { type Context = Context; @@ -157,5 +166,4 @@ impl StreamHandler> for EventBusClient { impl actix::io::WriteHandler for EventBusClient {} #[cfg(test)] -mod test { -} +mod test {} diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index e2ee201..15a97e1 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -54,7 +54,10 @@ pub struct Meta { #[rtype(result = "()")] pub enum Output { Heartbeat, - Message { payload: Value }, + Message { + #[serde(default)] + payload: Value, + }, } /**