diff --git a/Cargo.lock b/Cargo.lock index 0618ae0..f3c0446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,16 +1310,11 @@ name = "otto-auctioneer" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "otto-eventbus 0.1.0", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,13 +1327,17 @@ name = "otto-eventbus" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/auctioneer/Cargo.toml b/auctioneer/Cargo.toml index b0749b0..704b958 100644 --- a/auctioneer/Cargo.toml +++ b/auctioneer/Cargo.toml @@ -10,12 +10,6 @@ actix = "~0.9.0" actix-http = "~1.0.1" actix-rt = "~1.0.0" actix-web = "~2.0.0" -# Used for the websocket client -actix-codec = "~0.2.0" -actix-web-actors = "~2.0.0" -awc = "~1.0.1" -futures = "~0.3.1" -bytes = "~0.5.3" # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index 06a8090..fc13fff 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -4,26 +4,13 @@ extern crate actix; extern crate actix_http; extern crate actix_web; -extern crate awc; extern crate pretty_env_logger; -use actix::io::SinkWrite; -use actix::*; -use actix_codec::Framed; use actix_web::{middleware, web}; use actix_web::{App, HttpResponse, HttpServer}; -use awc::{ - error::WsProtocolError, - ws::{Codec, Frame, Message}, - BoxedSocket, Client, -}; -use bytes::Bytes; -use futures::stream::{SplitSink, StreamExt}; -use log::{error, info}; +use log::debug; -use std::time::Duration; - -use otto_eventbus::*; +use otto_eventbus::client::*; /** * The index handler for the root of the Auctioneer web interface @@ -36,27 +23,8 @@ async fn route_index() -> HttpResponse { async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - /* - * Creating a awc::Client to handle the first part of our WebSocket client bootstrap - */ - let (response, framed) = Client::new() - .ws("http://127.0.0.1:8000/ws/") - .connect() - .await - .map_err(|e| { - error!("Error: {}", e); - }) - .unwrap(); - - info!("{:?}", response); - let (sink, stream) = framed.split(); - let _addr = EventBusClient::create(|ctx| { - EventBusClient::add_stream(stream, ctx); - EventBusClient { - sink: SinkWrite::new(sink, ctx), - id: "auctioneer", - } - }); + let client = connect("http://127.0.0.1:8000/ws/").await; + debug!("Client created: {:?}", client); HttpServer::new(move || { App::new() @@ -69,118 +37,6 @@ async fn main() -> std::io::Result<()> { .await } - -/** - * An EventBusClient is capable of connecting to, reading messages from, and sending messages to - * the eventbus. - */ -struct EventBusClient { - /** - * The sink is a writable object which can send messages back to the EventBus - */ - sink: SinkWrite, Message>>, - /** - * String identifier for the client - * - * This should be persisted between invocations of the process if the client - * should have any semblence of persistence with its messages - */ - id: &'static str, -} - -#[derive(Message)] -#[rtype(result = "()")] -struct ClientCommand(String); - -impl Actor for EventBusClient { - type Context = Context; - - fn started(&mut self, ctx: &mut Context) { - let input = InputMessage { - meta: Meta::default(), - msg: Input::Connect { - name: self.id.to_string(), - }, - }; - self.sink - .write(Message::Text(serde_json::to_string(&input).unwrap())) - .unwrap(); - // start heartbeats otherwise server will disconnect after 10 seconds - self.hb(ctx) - } - - fn stopped(&mut self, _: &mut Context) { - info!("Disconnected"); - } -} - -impl EventBusClient { - fn hb(&self, ctx: &mut Context) { - ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); - act.hb(ctx); - - // client should also check for a timeout here, similar to the - // server code - }); - } -} - -/// Handle stdin commands -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { - self.sink.write(Message::Text(msg.0)).unwrap(); - } -} - -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { - /* - * For heartbeats we really don't need to do anything - */ - } -} - -/// Handle server websocket messages -impl StreamHandler> for EventBusClient { - fn handle(&mut self, msg: Result, ctx: &mut Context) { - if let Ok(Frame::Text(txt)) = msg { - /* - * We have received _some_ message from the eventbus, let's try to - * decode it! - */ - let msg = serde_json::from_slice::(&txt); - match msg { - Ok(output) => { - /* - * Dispatch the message basically back to ourself for easy - * handling - */ - ctx.address().do_send(output); - } - Err(e) => { - error!("Received invalid message: {}", e); - } - } - } - } - - fn started(&mut self, _ctx: &mut Context) { - info!("Connected"); - } - - fn finished(&mut self, ctx: &mut Context) { - info!("Server disconnected"); - ctx.stop() - } -} - -impl actix::io::WriteHandler for EventBusClient {} - #[cfg(test)] mod test { use super::*; diff --git a/eventbus/Cargo.toml b/eventbus/Cargo.toml index 19c0f32..9053a2b 100644 --- a/eventbus/Cargo.toml +++ b/eventbus/Cargo.toml @@ -10,7 +10,7 @@ path = "src/server/main.rs" [[bin]] name = "otto-ebc" -path = "src/cli.rs" +path = "src/cli/main.rs" [dependencies] # Actix provides the web and websocket basis we sit on top of @@ -47,5 +47,12 @@ url = "~2.1.0" rustyline = "~5.0.6" +# Used for the websocket client +actix-codec = "~0.2.0" +awc = "~1.0.1" +futures = "~0.3.1" +bytes = "~0.5.3" + + [dev-dependencies] regex = "1" diff --git a/eventbus/src/cli.rs b/eventbus/src/cli/main.rs similarity index 100% rename from eventbus/src/cli.rs rename to eventbus/src/cli/main.rs diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs new file mode 100644 index 0000000..03b7bf4 --- /dev/null +++ b/eventbus/src/client.rs @@ -0,0 +1,161 @@ +/**! + * This module is the eventbus client + */ +use actix::io::SinkWrite; +use actix::*; +use actix_codec::Framed; +use awc::{ + error::WsProtocolError, + ws::{Codec, Frame, Message}, + BoxedSocket, Client, +}; +use bytes::Bytes; +use futures::stream::{SplitSink, StreamExt}; +use log::{error, info}; + +use std::time::Duration; + +use crate::*; + +/** + * An EventBusClient is capable of connecting to, reading messages from, and sending messages to + * the eventbus. + */ +pub struct EventBusClient { + /** + * The sink is a writable object which can send messages back to the EventBus + */ + sink: SinkWrite, Message>>, + /** + * String identifier for the client + * + * This should be persisted between invocations of the process if the client + * should have any semblence of persistence with its messages + */ + id: &'static str, +} + +/** + * Implementation of the Debug trait so that the EventBusClient can be included + * in logging statements and print each instance's `id` + */ +impl std::fmt::Debug for EventBusClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EventBusClient", self.id) + } +} + +/** + * connect will create begin the connection process and start the EventBusClient + * actor. + * + * The caller will need to await in order to get the EventBusClient + */ +pub async fn connect(ws: &str) -> Addr { + /* + * Creating a awc::Client to handle the first part of our WebSocket client bootstrap + */ + let (response, framed) = Client::new() + .ws(ws) + .connect() + .await + .map_err(|e| { + error!("Error: {}", e); + }) + .unwrap(); + + info!("{:?}", response); + let (sink, stream) = framed.split(); + + EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: "auctioneer", + } + }) +} + +impl EventBusClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code + }); + } +} + +impl Actor for EventBusClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + let input = InputMessage { + meta: Meta::default(), + msg: Input::Connect { + name: self.id.to_string(), + }, + }; + self.sink + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + info!("Disconnected"); + } +} + +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { + /* + * For heartbeats we really don't need to do anything + */ + } +} + +/// Handle server websocket messages +impl StreamHandler> for EventBusClient { + fn handle(&mut self, msg: Result, ctx: &mut Context) { + if let Ok(Frame::Text(txt)) = msg { + /* + * We have received _some_ message from the eventbus, let's try to + * decode it! + */ + let msg = serde_json::from_slice::(&txt); + match msg { + Ok(output) => { + /* + * Dispatch the message basically back to ourself for easy + * handling + */ + ctx.address().do_send(output); + } + Err(e) => { + error!("Received invalid message: {}", e); + } + } + } + } + + fn started(&mut self, _ctx: &mut Context) { + info!("Connected"); + } + + fn finished(&mut self, ctx: &mut Context) { + info!("Server disconnected"); + ctx.stop() + } +} + +impl actix::io::WriteHandler for EventBusClient {} + +#[cfg(test)] +mod test { +} diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index ec941ec..e2ee201 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -12,6 +12,8 @@ use serde_json::Value; use std::sync::Arc; +pub mod client; + /** * Default function for deserialize/serialize of times, always defaults to 1970-01-01 */ @@ -142,7 +144,6 @@ impl Default for Meta { } } - #[cfg(test)] mod test { use super::*;