Implement backoff on connection errors for the eventbus client

This uses a fibonacci backoff (yay I pass the coding interview) for attempting
to reconnect the connection if something cannot work properly
This commit is contained in:
R Tyler Croy 2020-01-11 20:01:23 -08:00
parent 7066789a3b
commit 22fef36da9
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
3 changed files with 37 additions and 25 deletions

View File

@ -11,6 +11,7 @@ use actix_web::{App, HttpResponse, HttpServer};
use log::debug; use log::debug;
use otto_eventbus::client::*; use otto_eventbus::client::*;
use otto_eventbus::*;
/** /**
* The index handler for the root of the Auctioneer web interface * The index handler for the root of the Auctioneer web interface
@ -23,7 +24,7 @@ async fn route_index() -> HttpResponse {
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
pretty_env_logger::init(); pretty_env_logger::init();
let client = connect("http://127.0.0.1:8000/ws/").await; let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await;
debug!("Client created: {:?}", client); debug!("Client created: {:?}", client);
HttpServer::new(move || { HttpServer::new(move || {

View File

@ -45,41 +45,49 @@ impl std::fmt::Debug for EventBusClient {
} }
} }
pub trait EventDispatch<T> {
fn dispatch(&self, payload: T);
}
/** /**
* connect will create begin the connection process and start the EventBusClient * connect will create begin the connection process and start the EventBusClient
* actor. * actor.
* *
* The caller will need to await in order to get the EventBusClient * The caller will need to await in order to get the EventBusClient
*/ */
pub async fn connect(ws: &str) -> Addr<EventBusClient> { pub async fn connect(ws: &'static str, id: &'static str) -> Addr<EventBusClient> {
/* let mut backoff = 1;
* Creating a awc::Client to handle the first part of our WebSocket client bootstrap
*/
let (response, framed) = Client::new()
.ws(ws)
.connect()
.await
.map_err(|e| {
error!("Error: {}", e);
})
.unwrap();
info!("{:?}", response); loop {
let (sink, stream) = framed.split(); let r = Client::new().ws(ws).connect().await;
EventBusClient::create(|ctx| { match r {
EventBusClient::add_stream(stream, ctx); Ok((response, framed)) => {
EventBusClient { let (sink, stream) = framed.split();
sink: SinkWrite::new(sink, ctx),
id: "auctioneer", return EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: id,
}
});
}
Err(e) => {
error!("Failed establish WebSocket: {}", e);
backoff = backoff + backoff;
std::thread::sleep(Duration::from_secs(backoff));
}
} }
}) }
} }
impl EventBusClient { impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) { fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| { ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); act.sink
.write(Message::Ping(Bytes::from_static(b"")))
.unwrap();
act.hb(ctx); act.hb(ctx);
// client should also check for a timeout here, similar to the // client should also check for a timeout here, similar to the
@ -88,6 +96,7 @@ impl EventBusClient {
} }
} }
//impl Actor<dyn EventDispatch> for EventBusClient {
impl Actor for EventBusClient { impl Actor for EventBusClient {
type Context = Context<Self>; type Context = Context<Self>;
@ -157,5 +166,4 @@ impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {} impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)] #[cfg(test)]
mod test { mod test {}
}

View File

@ -54,7 +54,10 @@ pub struct Meta {
#[rtype(result = "()")] #[rtype(result = "()")]
pub enum Output { pub enum Output {
Heartbeat, Heartbeat,
Message { payload: Value }, Message {
#[serde(default)]
payload: Value,
},
} }
/** /**