From 7066789a3bc20c43bfe36f62c9a2716549d8dc97 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 16:16:54 -0800 Subject: [PATCH] Refactor the EventBusClient code into the eventus crate where it belongs Now that I have something simple and functioning, this no longer needs to be in the auctioneer The fun work begins now, supporting custom messages known only to the auctioneer for its needs, but passing those through EventBusClient --- Cargo.lock | 9 +- auctioneer/Cargo.toml | 6 - auctioneer/src/main.rs | 152 +------------------------ eventbus/Cargo.toml | 9 +- eventbus/src/{cli.rs => cli/main.rs} | 0 eventbus/src/client.rs | 161 +++++++++++++++++++++++++++ eventbus/src/lib.rs | 3 +- 7 files changed, 179 insertions(+), 161 deletions(-) rename eventbus/src/{cli.rs => cli/main.rs} (100%) create mode 100644 eventbus/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 0618ae0..f3c0446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,16 +1310,11 @@ name = "otto-auctioneer" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "otto-eventbus 0.1.0", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,13 +1327,17 @@ name = "otto-eventbus" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/auctioneer/Cargo.toml b/auctioneer/Cargo.toml index b0749b0..704b958 100644 --- a/auctioneer/Cargo.toml +++ b/auctioneer/Cargo.toml @@ -10,12 +10,6 @@ actix = "~0.9.0" actix-http = "~1.0.1" actix-rt = "~1.0.0" actix-web = "~2.0.0" -# Used for the websocket client -actix-codec = "~0.2.0" -actix-web-actors = "~2.0.0" -awc = "~1.0.1" -futures = "~0.3.1" -bytes = "~0.5.3" # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index 06a8090..fc13fff 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -4,26 +4,13 @@ extern crate actix; extern crate actix_http; extern crate actix_web; -extern crate awc; extern crate pretty_env_logger; -use actix::io::SinkWrite; -use actix::*; -use actix_codec::Framed; use actix_web::{middleware, web}; use actix_web::{App, HttpResponse, HttpServer}; -use awc::{ - error::WsProtocolError, - ws::{Codec, Frame, Message}, - BoxedSocket, Client, -}; -use bytes::Bytes; -use futures::stream::{SplitSink, StreamExt}; -use log::{error, info}; +use log::debug; -use std::time::Duration; - -use otto_eventbus::*; +use otto_eventbus::client::*; /** * The index handler for the root of the Auctioneer web interface @@ -36,27 +23,8 @@ async fn route_index() -> HttpResponse { async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - /* - * Creating a awc::Client to handle the first part of our WebSocket client bootstrap - */ - let (response, framed) = Client::new() - .ws("http://127.0.0.1:8000/ws/") - .connect() - .await - .map_err(|e| { - error!("Error: {}", e); - }) - .unwrap(); - - info!("{:?}", response); - let (sink, stream) = framed.split(); - let _addr = EventBusClient::create(|ctx| { - EventBusClient::add_stream(stream, ctx); - EventBusClient { - sink: SinkWrite::new(sink, ctx), - id: "auctioneer", - } - }); + let client = connect("http://127.0.0.1:8000/ws/").await; + debug!("Client created: {:?}", client); HttpServer::new(move || { App::new() @@ -69,118 +37,6 @@ async fn main() -> std::io::Result<()> { .await } - -/** - * An EventBusClient is capable of connecting to, reading messages from, and sending messages to - * the eventbus. - */ -struct EventBusClient { - /** - * The sink is a writable object which can send messages back to the EventBus - */ - sink: SinkWrite, Message>>, - /** - * String identifier for the client - * - * This should be persisted between invocations of the process if the client - * should have any semblence of persistence with its messages - */ - id: &'static str, -} - -#[derive(Message)] -#[rtype(result = "()")] -struct ClientCommand(String); - -impl Actor for EventBusClient { - type Context = Context; - - fn started(&mut self, ctx: &mut Context) { - let input = InputMessage { - meta: Meta::default(), - msg: Input::Connect { - name: self.id.to_string(), - }, - }; - self.sink - .write(Message::Text(serde_json::to_string(&input).unwrap())) - .unwrap(); - // start heartbeats otherwise server will disconnect after 10 seconds - self.hb(ctx) - } - - fn stopped(&mut self, _: &mut Context) { - info!("Disconnected"); - } -} - -impl EventBusClient { - fn hb(&self, ctx: &mut Context) { - ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); - act.hb(ctx); - - // client should also check for a timeout here, similar to the - // server code - }); - } -} - -/// Handle stdin commands -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { - self.sink.write(Message::Text(msg.0)).unwrap(); - } -} - -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { - /* - * For heartbeats we really don't need to do anything - */ - } -} - -/// Handle server websocket messages -impl StreamHandler> for EventBusClient { - fn handle(&mut self, msg: Result, ctx: &mut Context) { - if let Ok(Frame::Text(txt)) = msg { - /* - * We have received _some_ message from the eventbus, let's try to - * decode it! - */ - let msg = serde_json::from_slice::(&txt); - match msg { - Ok(output) => { - /* - * Dispatch the message basically back to ourself for easy - * handling - */ - ctx.address().do_send(output); - } - Err(e) => { - error!("Received invalid message: {}", e); - } - } - } - } - - fn started(&mut self, _ctx: &mut Context) { - info!("Connected"); - } - - fn finished(&mut self, ctx: &mut Context) { - info!("Server disconnected"); - ctx.stop() - } -} - -impl actix::io::WriteHandler for EventBusClient {} - #[cfg(test)] mod test { use super::*; diff --git a/eventbus/Cargo.toml b/eventbus/Cargo.toml index 19c0f32..9053a2b 100644 --- a/eventbus/Cargo.toml +++ b/eventbus/Cargo.toml @@ -10,7 +10,7 @@ path = "src/server/main.rs" [[bin]] name = "otto-ebc" -path = "src/cli.rs" +path = "src/cli/main.rs" [dependencies] # Actix provides the web and websocket basis we sit on top of @@ -47,5 +47,12 @@ url = "~2.1.0" rustyline = "~5.0.6" +# Used for the websocket client +actix-codec = "~0.2.0" +awc = "~1.0.1" +futures = "~0.3.1" +bytes = "~0.5.3" + + [dev-dependencies] regex = "1" diff --git a/eventbus/src/cli.rs b/eventbus/src/cli/main.rs similarity index 100% rename from eventbus/src/cli.rs rename to eventbus/src/cli/main.rs diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs new file mode 100644 index 0000000..03b7bf4 --- /dev/null +++ b/eventbus/src/client.rs @@ -0,0 +1,161 @@ +/**! + * This module is the eventbus client + */ +use actix::io::SinkWrite; +use actix::*; +use actix_codec::Framed; +use awc::{ + error::WsProtocolError, + ws::{Codec, Frame, Message}, + BoxedSocket, Client, +}; +use bytes::Bytes; +use futures::stream::{SplitSink, StreamExt}; +use log::{error, info}; + +use std::time::Duration; + +use crate::*; + +/** + * An EventBusClient is capable of connecting to, reading messages from, and sending messages to + * the eventbus. + */ +pub struct EventBusClient { + /** + * The sink is a writable object which can send messages back to the EventBus + */ + sink: SinkWrite, Message>>, + /** + * String identifier for the client + * + * This should be persisted between invocations of the process if the client + * should have any semblence of persistence with its messages + */ + id: &'static str, +} + +/** + * Implementation of the Debug trait so that the EventBusClient can be included + * in logging statements and print each instance's `id` + */ +impl std::fmt::Debug for EventBusClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EventBusClient", self.id) + } +} + +/** + * connect will create begin the connection process and start the EventBusClient + * actor. + * + * The caller will need to await in order to get the EventBusClient + */ +pub async fn connect(ws: &str) -> Addr { + /* + * Creating a awc::Client to handle the first part of our WebSocket client bootstrap + */ + let (response, framed) = Client::new() + .ws(ws) + .connect() + .await + .map_err(|e| { + error!("Error: {}", e); + }) + .unwrap(); + + info!("{:?}", response); + let (sink, stream) = framed.split(); + + EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: "auctioneer", + } + }) +} + +impl EventBusClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code + }); + } +} + +impl Actor for EventBusClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + let input = InputMessage { + meta: Meta::default(), + msg: Input::Connect { + name: self.id.to_string(), + }, + }; + self.sink + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + info!("Disconnected"); + } +} + +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { + /* + * For heartbeats we really don't need to do anything + */ + } +} + +/// Handle server websocket messages +impl StreamHandler> for EventBusClient { + fn handle(&mut self, msg: Result, ctx: &mut Context) { + if let Ok(Frame::Text(txt)) = msg { + /* + * We have received _some_ message from the eventbus, let's try to + * decode it! + */ + let msg = serde_json::from_slice::(&txt); + match msg { + Ok(output) => { + /* + * Dispatch the message basically back to ourself for easy + * handling + */ + ctx.address().do_send(output); + } + Err(e) => { + error!("Received invalid message: {}", e); + } + } + } + } + + fn started(&mut self, _ctx: &mut Context) { + info!("Connected"); + } + + fn finished(&mut self, ctx: &mut Context) { + info!("Server disconnected"); + ctx.stop() + } +} + +impl actix::io::WriteHandler for EventBusClient {} + +#[cfg(test)] +mod test { +} diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index ec941ec..e2ee201 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -12,6 +12,8 @@ use serde_json::Value; use std::sync::Arc; +pub mod client; + /** * Default function for deserialize/serialize of times, always defaults to 1970-01-01 */ @@ -142,7 +144,6 @@ impl Default for Meta { } } - #[cfg(test)] mod test { use super::*;