From a5ae7e15a861170b53ac739aa838e7876f582842 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 10 Jan 2020 22:22:42 -0800 Subject: [PATCH 01/12] Begin working on the per-client inbox functionality There's some refactoring I need to do in the server/eventbus module before continuing, but this contains the initial cut of code necessary to create a new inbox when a client connects that will map to that client name --- eventbus/src/lib.rs | 9 ++++ eventbus/src/server/connection.rs | 83 +++++++++++++++++++++++-------- eventbus/src/server/eventbus.rs | 73 +++++++++++++++++++++++---- 3 files changed, 132 insertions(+), 33 deletions(-) diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index cbc8afb..e72981b 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -71,6 +71,15 @@ pub struct OutputMessage { #[serde(tag = "type", rename_all = "camelCase")] #[rtype(result = "()")] pub enum Input { + /** + * A Connect message must be sent for the client to start receiving messages + * + * This message instructs the eventbus to subscribe the client to the "all" + * channel and its own private inbox channel + */ + Connect { + name: String, + }, /** * A Subscribe message must be sent for each channel the client wishes to subscribe to. * diff --git a/eventbus/src/server/connection.rs b/eventbus/src/server/connection.rs index 9a107b5..1f1c05f 100644 --- a/eventbus/src/server/connection.rs +++ b/eventbus/src/server/connection.rs @@ -6,7 +6,7 @@ use actix::*; use actix_web_actors::ws; use chrono::prelude::Utc; -use log::{error, info}; +use log::{error, info, trace}; use serde_json; use std::sync::Arc; @@ -24,12 +24,55 @@ pub struct WSClient { events: Addr, } +/** + * The WSClient is an actor responsible for the stateful behavior of each websocket + * connected client + */ impl WSClient { + /** + * Construct the WSClient with the given EventBus actor to communicate with + */ pub fn new(eb: Addr) -> Self { Self { events: eb } } - fn handle_text(&self, text: String, ctx: &::Context) { + /** + * handle_connect() takes care of the Input::Connect message and will ensure + * that the client is connected to the "all" channel as well as its inbox" + */ + fn handle_connect(&self, client_name: String, ctx: &mut ::Context) { + self.events.do_send(eventbus::Subscribe { + to: "all".to_owned(), + addr: ctx.address(), + }); + + /* + * Both of these message sends are creating their own String to represent + * the inbox channel. + * + * This SHOULD be fixed, but will require some more thinking about how + * the lifetimes of Channel objects should work + */ + + self.events.do_send(eventbus::CreateChannel { + channel: eventbus::Channel { + name: format!("inbox.{}", client_name), + stateful: true, + }, + }); + + self.events.do_send(eventbus::Subscribe { + to: format!("inbox.{}", client_name), + addr: ctx.address(), + }); + } + + /** + * handle_text handles _all_ incoming messages from the websocket connection, + * it is responsible for translating those JSON messages into something the + * eventbus can pass around internally + */ + fn handle_text(&self, text: String, ctx: &mut ::Context) { let command = serde_json::from_str::(&text); match command { @@ -38,6 +81,11 @@ impl WSClient { match c { InputMessage { msg, meta } => { match msg { + Input::Connect { name } => { + info!("Received connect for client named: {}", name); + self.handle_connect(name, ctx) + }, + Input::Publish { payload } => { info!("received publish: {:?}", payload); self.events.do_send(eventbus::Event { @@ -95,27 +143,13 @@ impl Handler for WSClient { */ impl Actor for WSClient { type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - let sub = eventbus::Subscribe { - to: "all".to_owned(), - addr: ctx.address(), - }; - self.events - .send(sub) - .into_actor(self) - .then(|result, _actor, ctx| { - match result { - Ok(_) => (), - _ => ctx.stop(), - } - fut::ready(()) - }) - .wait(ctx); - } } -/// Handler for ws::Message message +/** + * Handler for the ws::Message message for the WSClient actor + * + * This handler will be called for every message from a websocket client + */ impl StreamHandler> for WSClient { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { @@ -125,7 +159,7 @@ impl StreamHandler> for WSClient { } Ok(msg) => msg, }; - info!("WebSocket received: {:?}", msg); + trace!("WebSocket message received: {:?}", msg); match msg { ws::Message::Ping(msg) => ctx.pong(&msg), ws::Message::Text(text) => self.handle_text(text, ctx), @@ -134,3 +168,8 @@ impl StreamHandler> for WSClient { } } } + +#[cfg(test)] +mod test { + use super::*; +} diff --git a/eventbus/src/server/eventbus.rs b/eventbus/src/server/eventbus.rs index a3aa24f..7129bd0 100644 --- a/eventbus/src/server/eventbus.rs +++ b/eventbus/src/server/eventbus.rs @@ -18,6 +18,33 @@ use crate::*; */ type ClientId = Addr; +/** + * The Channel struct is used as an internal representation of each channel that + * the eventbus knows about. + * + * Channels may be either stateless or stateful, with the ladder implying persistence + * guarantees, depending on the eventbus' backing implementation. + */ +#[derive(Clone, Debug, Eq)] +pub struct Channel { + pub name: String, + pub stateful: bool, +} + +/** + * The CreateChannel message is only meant to be used by internal components of + * the eventbus. + * + * It is used primarily for creating new channels on-demand, such as those needed + * for new client inboxes + */ +#[derive(Message)] +#[rtype(result = "()")] +pub struct CreateChannel { + pub channel: Channel, +} + + #[derive(Message)] #[rtype(result = "()")] pub struct Subscribe { @@ -39,23 +66,17 @@ pub struct Unsubscribe { } /** - * The Channel struct is used as an internal representation of each channel that - * the eventbus knows about. - * - * Channels may be either stateless or stateful, with the ladder implying persistence - * guarantees, depending on the eventbus' backing implementation. + * Implementation of the Hash trait for Channel ensures that it can be placed in a HashSet */ -#[derive(Debug, Eq)] -pub struct Channel { - name: String, - stateful: bool, -} - impl Hash for Channel { fn hash(&self, state: &mut H) { self.name.hash(state); } } + +/** + * Implementation of PartialEq trait for Channel ensures that it can be placed in a HashSet + */ impl PartialEq for Channel { fn eq(&self, other: &Self) -> bool { self.name == other.name @@ -70,6 +91,10 @@ pub struct EventBus { channels: HashMap>, } +/** + * + * The Actor trait for the Eventbus allows it to act as an actor in the actix system + */ impl Actor for EventBus { type Context = Context; @@ -107,16 +132,26 @@ impl EventBus { } } +impl Handler for EventBus { + type Result = (); + + fn handle(&mut self, create: CreateChannel, _: &mut Context) { + self.channels.insert(create.channel, HashSet::new()); + } +} + impl Handler for EventBus { type Result = (); fn handle(&mut self, msg: Subscribe, _: &mut Context) { + info!("Subscribing client to {}", msg.to); // The stateful field doesn't matter here because the hashing on the // HashMap only applies to the name of the channel let ch = Channel { name: msg.to, stateful: false, }; + if self.channels.contains_key(&ch) { match self.channels.get_mut(&ch) { Some(set) => { @@ -140,6 +175,7 @@ impl Handler for EventBus { name: ev.channel.to_string(), stateful: false, }; + if self.channels.contains_key(&ch) { if let Some(clients) = self.channels.get(&ch) { /* @@ -162,3 +198,18 @@ impl Handler for EventBus { } } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_with_channels_empty() { + let _bus = EventBus::with_channels(vec![], vec![]); + } + + #[test] + fn test_with_channels_stateless() { + let _bus = EventBus::with_channels(vec![String::from("test")], vec![]); + } +} -- 2.43.0 From 9b1df700ded0f2fcd10d8d10f8154ded572d49b9 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 10 Jan 2020 22:27:25 -0800 Subject: [PATCH 02/12] Clean up a little bit of error handling in the eventbus CLI --- eventbus/src/cli.rs | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/eventbus/src/cli.rs b/eventbus/src/cli.rs index 08cc21c..65dab6b 100644 --- a/eventbus/src/cli.rs +++ b/eventbus/src/cli.rs @@ -4,15 +4,19 @@ extern crate rustyline; -use std::io::{stdin, stdout, Write}; use rustyline::error::ReadlineError; use rustyline::Editor; +use tungstenite::client::AutoStream; +use tungstenite::handshake::client::Response; use tungstenite::*; use url::Url; +fn ws_connect() -> Result<(WebSocket, Response)> { + return connect(Url::parse("ws://localhost:8000/ws/").unwrap()); +} + fn main() { - let (mut socket, response) = - connect(Url::parse("ws://localhost:8000/ws/").unwrap()).expect("Failed to connect"); + let (mut socket, _response) = ws_connect().unwrap(); println!("Connected to the server"); // `()` can be used when no completer is required @@ -28,10 +32,25 @@ fn main() { match readline { Ok(line) => { if line.len() == 0 { - let msg = socket.read_message().expect("Failed to read message"); - println!("Received: {}", msg); + let response = socket.read_message(); + match response { + Ok(msg) => { + println!("{}", msg); + }, + Err(e) => { + println!("Failed to read: {}", e); + } + } } else { - socket.write_message(Message::Text(line)); + if line == "quit" { + return; + } + match socket.write_message(Message::Text(line)) { + Ok(_) => {}, + Err(e) => { + println!("Failed to write: {}", e); + } + } } }, Err(ReadlineError::Interrupted) => { -- 2.43.0 From eb75d61259f6fec20504544cf425da5d2c297d06 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 11:27:43 -0800 Subject: [PATCH 03/12] cargo fmt --- eventbus/src/cli.rs | 19 +++++++++---------- eventbus/src/lib.rs | 4 +--- eventbus/src/server/connection.rs | 2 +- eventbus/src/server/eventbus.rs | 1 - eventbus/src/server/main.rs | 16 +++++++++------- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/eventbus/src/cli.rs b/eventbus/src/cli.rs index 65dab6b..5862fe6 100644 --- a/eventbus/src/cli.rs +++ b/eventbus/src/cli.rs @@ -1,7 +1,6 @@ /** * The CLI is meant to be used for manual testing and verification of the eventbus only. */ - extern crate rustyline; use rustyline::error::ReadlineError; @@ -33,10 +32,10 @@ fn main() { Ok(line) => { if line.len() == 0 { let response = socket.read_message(); - match response { + match response { Ok(msg) => { println!("{}", msg); - }, + } Err(e) => { println!("Failed to read: {}", e); } @@ -46,24 +45,24 @@ fn main() { return; } match socket.write_message(Message::Text(line)) { - Ok(_) => {}, + Ok(_) => {} Err(e) => { println!("Failed to write: {}", e); } } } - }, + } Err(ReadlineError::Interrupted) => { // ctrl-C - break - }, + break; + } Err(ReadlineError::Eof) => { // ctrl-D - break - }, + break; + } Err(err) => { println!("Error: {:?}", err); - break + break; } } } diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index e72981b..0c05289 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -77,9 +77,7 @@ pub enum Input { * This message instructs the eventbus to subscribe the client to the "all" * channel and its own private inbox channel */ - Connect { - name: String, - }, + Connect { name: String }, /** * A Subscribe message must be sent for each channel the client wishes to subscribe to. * diff --git a/eventbus/src/server/connection.rs b/eventbus/src/server/connection.rs index 1f1c05f..a9adb50 100644 --- a/eventbus/src/server/connection.rs +++ b/eventbus/src/server/connection.rs @@ -84,7 +84,7 @@ impl WSClient { Input::Connect { name } => { info!("Received connect for client named: {}", name); self.handle_connect(name, ctx) - }, + } Input::Publish { payload } => { info!("received publish: {:?}", payload); diff --git a/eventbus/src/server/eventbus.rs b/eventbus/src/server/eventbus.rs index 7129bd0..d3a1b3d 100644 --- a/eventbus/src/server/eventbus.rs +++ b/eventbus/src/server/eventbus.rs @@ -44,7 +44,6 @@ pub struct CreateChannel { pub channel: Channel, } - #[derive(Message)] #[rtype(result = "()")] pub struct Subscribe { diff --git a/eventbus/src/server/main.rs b/eventbus/src/server/main.rs index 38fa04a..ae69d7c 100644 --- a/eventbus/src/server/main.rs +++ b/eventbus/src/server/main.rs @@ -3,8 +3,8 @@ * server loop that the eventbus uses. */ extern crate actix; -extern crate actix_web; extern crate actix_http; +extern crate actix_web; extern crate config; extern crate log; extern crate pretty_env_logger; @@ -159,7 +159,6 @@ async fn main() -> std::io::Result<()> { .await } - #[cfg(test)] mod test { use super::*; @@ -183,10 +182,10 @@ mod test { }; let wd = web::Data::new(state); let srv = test::start(move || { - App::new() - .app_data(wd.clone()) - .route("/", web::get().to(index)) - }); + App::new() + .app_data(wd.clone()) + .route("/", web::get().to(index)) + }); let req = srv.get("/"); let mut response = req.send().await.unwrap(); @@ -199,6 +198,9 @@ mod test { let matches = re.captures(&buffer).unwrap(); let version = matches.get(1).unwrap(); - assert_eq!(version.as_str(), format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))); + assert_eq!( + version.as_str(), + format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown")) + ); } } -- 2.43.0 From b1260df2cfb7df797f8d2ca921b553e8ae05943b Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 12:25:37 -0800 Subject: [PATCH 04/12] Switch back to actix-web-actors for the eventbus client, again. >_< I originally tossed this approach since I believed that the awc + actix-web-actors approach was unnecessarily complex. After experimenting a bit with tungstenite, I believe that converting its high-level API into something that I could work with in an asynchronous manner would require too much poking around with tokio to be worth the effort. I suppose I'll have to make do with the somewhat mid-level abstraction that actix-web-actors provides me. --- Cargo.lock | 10 +- auctioneer/Cargo.toml | 14 ++- auctioneer/src/main.rs | 186 ++++++++++++++++++++++-------- eventbus/src/server/connection.rs | 2 - 4 files changed, 157 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ad1f7f..81e43a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,16 +1310,22 @@ name = "otto-auctioneer" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "otto-eventbus 0.1.0", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", - "tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", - "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/auctioneer/Cargo.toml b/auctioneer/Cargo.toml index 1bf5c2e..8845b6b 100644 --- a/auctioneer/Cargo.toml +++ b/auctioneer/Cargo.toml @@ -7,11 +7,15 @@ edition = "2018" [dependencies] # Needed for the actor system actix = "~0.9.0" +actix-http = "~1.0.1" actix-rt = "~1.0.0" - -# Needed for websockets -tungstenite = "~0.9.2" -url = "~2.1.0" +actix-web = "~2.0.0" +# Used for the websocket client +actix-codec = "~0.2.0" +actix-web-actors = "~2.0.0" +awc = "~1.0.1" +futures = "~0.3.1" +bytes = "~0.5.3" # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } @@ -28,3 +32,5 @@ serde_json = "~1.0.0" otto-eventbus = { path = "../eventbus" } +# Used for formatting times in messages +chrono = { version = "~0.4.10", features = ["serde"] } diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index 9fbb6b0..fbfa8cf 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -2,69 +2,161 @@ * The auctioneer main model */ extern crate actix; +extern crate actix_http; +extern crate actix_web; +extern crate awc; +extern crate chrono; extern crate pretty_env_logger; -extern crate tungstenite; -use std::time::Duration; -use std::{io, thread}; use actix::*; -use log::*; -use tungstenite::client::AutoStream; -use tungstenite::handshake::client::Response; -use tungstenite::*; -use url::Url; +use actix_codec::Framed; +use awc::{ + error::WsProtocolError, + ws::{Codec, Frame, Message}, + BoxedSocket, Client, +}; +use actix::io::SinkWrite; +use actix_web::{middleware, web}; +use actix_web::{App, HttpResponse, HttpServer}; +use bytes::Bytes; +use chrono::Utc; +use futures::stream::{SplitSink, StreamExt}; +use log::{info, error}; + +use std::time::Duration; use otto_eventbus::*; -struct BusClient {} - -impl Actor for BusClient { - type Context = SyncContext; - - fn started(&mut self, ctx: &mut Self::Context) { - info!("Starting client"); - } -} - /** - * Simple websocket connection function, just to allow for easy reconnections + * The index handler for the root of the Auctioneer web interface */ -fn ws_connect() -> Result<(WebSocket, Response)> { - return connect(Url::parse("ws://localhost:8000/ws/").unwrap()); +async fn route_index() -> HttpResponse { + HttpResponse::Ok().body("Auctioneer") } #[actix_rt::main] -async fn main() { +async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - let (mut socket, _response) = ws_connect().unwrap(); - - info!("Connected to the server"); - /* - * In this loop we should read the message, and then dispatch it to a handler actor immediately - * to do "the work" + * Creating a awc::Client to handle the first part of our WebSocket client bootstrap */ - loop { - let msg = socket.read_message(); - let msg = match msg { - Ok(m) => m, - Err(e) => { - error!("Failed to read a message off the WebSocket: {:?}", e); - // must reconnect - thread::sleep(Duration::from_secs(3)); + let (response, framed) = Client::new() + .ws("http://127.0.0.1:8000/ws/") + .connect() + .await + .map_err(|e| { + error!("Error: {}", e); + }) + .unwrap(); - /* - * Ignore connection errors, they're not important since we - * are in the retry loop anyways. - */ - if let Ok(recon) = ws_connect() { - info!("Server reconnected"); - socket = recon.0; - } - continue; - } + info!("{:?}", response); + let (sink, stream) = framed.split(); + let addr = EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient(SinkWrite::new(sink, ctx)) + }); + + HttpServer::new(move || { + App::new() + .wrap(middleware::Compress::default()) + .wrap(middleware::Logger::default()) + .route("/", web::get().to(route_index)) + }) + .bind("127.0.0.1:8001")? + .run() + .await +} + +struct EventBusClient(SinkWrite, Message>>); + +#[derive(Message)] +#[rtype(result = "()")] +struct ClientCommand(String); + +impl Actor for EventBusClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + let input = InputMessage { + meta: Meta { + channel: "".to_string(), + ts: Utc::now(), + }, + msg: Input::Connect { + name: "auctioneer".to_string(), + }, }; - info!("Received: {}", msg); + self.0.write(Message::Text(serde_json::to_string(&input).unwrap())).unwrap(); + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + info!("Disconnected"); + } +} + +impl EventBusClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.0.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code + }); + } +} + +/// Handle stdin commands +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { + self.0.write(Message::Text(msg.0)).unwrap(); + } +} + +/// Handle server websocket messages +impl StreamHandler> for EventBusClient { + fn handle(&mut self, msg: Result, _: &mut Context) { + if let Ok(Frame::Text(txt)) = msg { + info!("Server: {:?}", txt) + } + } + + fn started(&mut self, _ctx: &mut Context) { + info!("Connected"); + } + + fn finished(&mut self, ctx: &mut Context) { + info!("Server disconnected"); + ctx.stop() + } +} + +impl actix::io::WriteHandler for EventBusClient {} + + +#[cfg(test)] +mod test { + use super::*; + use actix_web::{test, web, App}; + + /** + * This test just ensures that the server can come online properly and render its index handler + * properly. + */ + #[actix_rt::test] + async fn test_basic_http() { + let srv = test::start(move || { + App::new() + .route("/", web::get().to(route_index)) + }); + + let req = srv.get("/"); + let response = req.send().await.unwrap(); + assert!(response.status().is_success()); } } diff --git a/eventbus/src/server/connection.rs b/eventbus/src/server/connection.rs index a9adb50..47e7863 100644 --- a/eventbus/src/server/connection.rs +++ b/eventbus/src/server/connection.rs @@ -12,7 +12,6 @@ use serde_json; use std::sync::Arc; use crate::*; -use otto_eventbus::*; /* * Define the Websocket Actor needed for Actix @@ -171,5 +170,4 @@ impl StreamHandler> for WSClient { #[cfg(test)] mod test { - use super::*; } -- 2.43.0 From 044abc397f6e642fe3483ee45030d6ce120798fc Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 12:37:45 -0800 Subject: [PATCH 05/12] Deserialize messages from the eventbus --- auctioneer/src/main.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index fbfa8cf..bdc052f 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -122,7 +122,19 @@ impl Handler for EventBusClient { impl StreamHandler> for EventBusClient { fn handle(&mut self, msg: Result, _: &mut Context) { if let Ok(Frame::Text(txt)) = msg { - info!("Server: {:?}", txt) + /* + * We have received _some_ message from the eventbus, let's try to + * decode it! + */ + let msg = serde_json::from_slice::(&txt); + match msg { + Ok(msg) => { + info!("Received valid message: {:?}", msg); + }, + Err(e) => { + error!("Received invalid message: {}", e); + }, + } } } -- 2.43.0 From e0a9b0c8706422cf3b85a202308ecf61096a6c3c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 13:03:37 -0800 Subject: [PATCH 06/12] Implement some basic command dispatching in the auctioneer This code will soon be refactored into the eventbus crate so that all other clients can implement their handlers the same way --- auctioneer/src/main.rs | 44 +++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index bdc052f..7435362 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -8,20 +8,20 @@ extern crate awc; extern crate chrono; extern crate pretty_env_logger; +use actix::io::SinkWrite; use actix::*; use actix_codec::Framed; +use actix_web::{middleware, web}; +use actix_web::{App, HttpResponse, HttpServer}; use awc::{ error::WsProtocolError, ws::{Codec, Frame, Message}, BoxedSocket, Client, }; -use actix::io::SinkWrite; -use actix_web::{middleware, web}; -use actix_web::{App, HttpResponse, HttpServer}; use bytes::Bytes; use chrono::Utc; use futures::stream::{SplitSink, StreamExt}; -use log::{info, error}; +use log::{error, info}; use std::time::Duration; @@ -52,7 +52,7 @@ async fn main() -> std::io::Result<()> { info!("{:?}", response); let (sink, stream) = framed.split(); - let addr = EventBusClient::create(|ctx| { + let _addr = EventBusClient::create(|ctx| { EventBusClient::add_stream(stream, ctx); EventBusClient(SinkWrite::new(sink, ctx)) }); @@ -87,7 +87,9 @@ impl Actor for EventBusClient { name: "auctioneer".to_string(), }, }; - self.0.write(Message::Text(serde_json::to_string(&input).unwrap())).unwrap(); + self.0 + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); // start heartbeats otherwise server will disconnect after 10 seconds self.hb(ctx) } @@ -118,9 +120,19 @@ impl Handler for EventBusClient { } } +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { + /* + * For heartbeats we really don't need to do anything + */ + } +} + /// Handle server websocket messages impl StreamHandler> for EventBusClient { - fn handle(&mut self, msg: Result, _: &mut Context) { + fn handle(&mut self, msg: Result, ctx: &mut Context) { if let Ok(Frame::Text(txt)) = msg { /* * We have received _some_ message from the eventbus, let's try to @@ -128,12 +140,16 @@ impl StreamHandler> for EventBusClient { */ let msg = serde_json::from_slice::(&txt); match msg { - Ok(msg) => { - info!("Received valid message: {:?}", msg); - }, + Ok(output) => { + /* + * Dispatch the message basically back to ourself for easy + * handling + */ + ctx.address().do_send(output); + } Err(e) => { error!("Received invalid message: {}", e); - }, + } } } } @@ -150,7 +166,6 @@ impl StreamHandler> for EventBusClient { impl actix::io::WriteHandler for EventBusClient {} - #[cfg(test)] mod test { use super::*; @@ -162,10 +177,7 @@ mod test { */ #[actix_rt::test] async fn test_basic_http() { - let srv = test::start(move || { - App::new() - .route("/", web::get().to(route_index)) - }); + let srv = test::start(move || App::new().route("/", web::get().to(route_index))); let req = srv.get("/"); let response = req.send().await.unwrap(); -- 2.43.0 From 0f3582db0191adb6b3fa52f29756448d2d0a3ee7 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 14:12:12 -0800 Subject: [PATCH 07/12] Implement the Default trait for Meta to make it easier for the auctioneer This removes a dependency from auctioneer and is the first step in refactoring the EventBusClient into the eventbus crate --- Cargo.lock | 1 - auctioneer/Cargo.toml | 3 --- auctioneer/src/main.rs | 39 +++++++++++++++++++++---------- eventbus/src/lib.rs | 35 +++++++++++++++++++++++++++ eventbus/src/server/connection.rs | 3 +-- 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81e43a7..0618ae0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1317,7 +1317,6 @@ dependencies = [ "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", - "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/auctioneer/Cargo.toml b/auctioneer/Cargo.toml index 8845b6b..b0749b0 100644 --- a/auctioneer/Cargo.toml +++ b/auctioneer/Cargo.toml @@ -31,6 +31,3 @@ serde = { version = "~1.0.103", features = ["rc"] } serde_json = "~1.0.0" otto-eventbus = { path = "../eventbus" } - -# Used for formatting times in messages -chrono = { version = "~0.4.10", features = ["serde"] } diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index 7435362..06a8090 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -5,7 +5,6 @@ extern crate actix; extern crate actix_http; extern crate actix_web; extern crate awc; -extern crate chrono; extern crate pretty_env_logger; use actix::io::SinkWrite; @@ -19,7 +18,6 @@ use awc::{ BoxedSocket, Client, }; use bytes::Bytes; -use chrono::Utc; use futures::stream::{SplitSink, StreamExt}; use log::{error, info}; @@ -54,7 +52,10 @@ async fn main() -> std::io::Result<()> { let (sink, stream) = framed.split(); let _addr = EventBusClient::create(|ctx| { EventBusClient::add_stream(stream, ctx); - EventBusClient(SinkWrite::new(sink, ctx)) + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: "auctioneer", + } }); HttpServer::new(move || { @@ -68,7 +69,24 @@ async fn main() -> std::io::Result<()> { .await } -struct EventBusClient(SinkWrite, Message>>); + +/** + * An EventBusClient is capable of connecting to, reading messages from, and sending messages to + * the eventbus. + */ +struct EventBusClient { + /** + * The sink is a writable object which can send messages back to the EventBus + */ + sink: SinkWrite, Message>>, + /** + * String identifier for the client + * + * This should be persisted between invocations of the process if the client + * should have any semblence of persistence with its messages + */ + id: &'static str, +} #[derive(Message)] #[rtype(result = "()")] @@ -79,15 +97,12 @@ impl Actor for EventBusClient { fn started(&mut self, ctx: &mut Context) { let input = InputMessage { - meta: Meta { - channel: "".to_string(), - ts: Utc::now(), - }, + meta: Meta::default(), msg: Input::Connect { - name: "auctioneer".to_string(), + name: self.id.to_string(), }, }; - self.0 + self.sink .write(Message::Text(serde_json::to_string(&input).unwrap())) .unwrap(); // start heartbeats otherwise server will disconnect after 10 seconds @@ -102,7 +117,7 @@ impl Actor for EventBusClient { impl EventBusClient { fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.0.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); act.hb(ctx); // client should also check for a timeout here, similar to the @@ -116,7 +131,7 @@ impl Handler for EventBusClient { type Result = (); fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { - self.0.write(Message::Text(msg.0)).unwrap(); + self.sink.write(Message::Text(msg.0)).unwrap(); } } diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index 0c05289..ec941ec 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -20,6 +20,14 @@ fn epoch() -> DateTime { return Utc.ymd(1970, 1, 1).and_hms_milli(0, 0, 1, 444); } +/** + * Default function for the deserialization of an empty channel, will just create an empty string + */ + +fn default_channel() -> String { + return "".to_owned(); +} + /** * The `Meta` struct contains the necessary metadata about a message which is being sent over the * wire @@ -29,6 +37,7 @@ fn epoch() -> DateTime { #[derive(Serialize, Deserialize, Debug, Message)] #[rtype(result = "()")] pub struct Meta { + #[serde(default = "default_channel")] pub channel: String, #[serde(default = "epoch")] pub ts: DateTime, @@ -120,3 +129,29 @@ pub struct InputMessage { pub msg: Input, pub meta: Meta, } + +/** + * The Default trait for `Meta` will create a functionally empty Meta struct + */ +impl Default for Meta { + fn default() -> Meta { + Meta { + channel: default_channel(), + ts: epoch(), + } + } +} + + +#[cfg(test)] +mod test { + use super::*; + use chrono::Utc; + + #[test] + fn test_default_for_meta() { + let m = Meta::default(); + assert_eq!(m.channel, ""); + assert!(m.ts < Utc::now()); + } +} diff --git a/eventbus/src/server/connection.rs b/eventbus/src/server/connection.rs index 47e7863..4a5e619 100644 --- a/eventbus/src/server/connection.rs +++ b/eventbus/src/server/connection.rs @@ -169,5 +169,4 @@ impl StreamHandler> for WSClient { } #[cfg(test)] -mod test { -} +mod test {} -- 2.43.0 From 24db3be3eab1ce6764ffd4677e4c5427db0e8dc8 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 16:16:54 -0800 Subject: [PATCH 08/12] Refactor the EventBusClient code into the eventus crate where it belongs Now that I have something simple and functioning, this no longer needs to be in the auctioneer The fun work begins now, supporting custom messages known only to the auctioneer for its needs, but passing those through EventBusClient --- Cargo.lock | 9 +- auctioneer/Cargo.toml | 6 - auctioneer/src/main.rs | 152 +------------------------ eventbus/Cargo.toml | 9 +- eventbus/src/{cli.rs => cli/main.rs} | 0 eventbus/src/client.rs | 161 +++++++++++++++++++++++++++ eventbus/src/lib.rs | 3 +- 7 files changed, 179 insertions(+), 161 deletions(-) rename eventbus/src/{cli.rs => cli/main.rs} (100%) create mode 100644 eventbus/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 0618ae0..f3c0446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,16 +1310,11 @@ name = "otto-auctioneer" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "otto-eventbus 0.1.0", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,13 +1327,17 @@ name = "otto-eventbus" version = "0.1.0" dependencies = [ "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/auctioneer/Cargo.toml b/auctioneer/Cargo.toml index b0749b0..704b958 100644 --- a/auctioneer/Cargo.toml +++ b/auctioneer/Cargo.toml @@ -10,12 +10,6 @@ actix = "~0.9.0" actix-http = "~1.0.1" actix-rt = "~1.0.0" actix-web = "~2.0.0" -# Used for the websocket client -actix-codec = "~0.2.0" -actix-web-actors = "~2.0.0" -awc = "~1.0.1" -futures = "~0.3.1" -bytes = "~0.5.3" # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index 06a8090..fc13fff 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -4,26 +4,13 @@ extern crate actix; extern crate actix_http; extern crate actix_web; -extern crate awc; extern crate pretty_env_logger; -use actix::io::SinkWrite; -use actix::*; -use actix_codec::Framed; use actix_web::{middleware, web}; use actix_web::{App, HttpResponse, HttpServer}; -use awc::{ - error::WsProtocolError, - ws::{Codec, Frame, Message}, - BoxedSocket, Client, -}; -use bytes::Bytes; -use futures::stream::{SplitSink, StreamExt}; -use log::{error, info}; +use log::debug; -use std::time::Duration; - -use otto_eventbus::*; +use otto_eventbus::client::*; /** * The index handler for the root of the Auctioneer web interface @@ -36,27 +23,8 @@ async fn route_index() -> HttpResponse { async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - /* - * Creating a awc::Client to handle the first part of our WebSocket client bootstrap - */ - let (response, framed) = Client::new() - .ws("http://127.0.0.1:8000/ws/") - .connect() - .await - .map_err(|e| { - error!("Error: {}", e); - }) - .unwrap(); - - info!("{:?}", response); - let (sink, stream) = framed.split(); - let _addr = EventBusClient::create(|ctx| { - EventBusClient::add_stream(stream, ctx); - EventBusClient { - sink: SinkWrite::new(sink, ctx), - id: "auctioneer", - } - }); + let client = connect("http://127.0.0.1:8000/ws/").await; + debug!("Client created: {:?}", client); HttpServer::new(move || { App::new() @@ -69,118 +37,6 @@ async fn main() -> std::io::Result<()> { .await } - -/** - * An EventBusClient is capable of connecting to, reading messages from, and sending messages to - * the eventbus. - */ -struct EventBusClient { - /** - * The sink is a writable object which can send messages back to the EventBus - */ - sink: SinkWrite, Message>>, - /** - * String identifier for the client - * - * This should be persisted between invocations of the process if the client - * should have any semblence of persistence with its messages - */ - id: &'static str, -} - -#[derive(Message)] -#[rtype(result = "()")] -struct ClientCommand(String); - -impl Actor for EventBusClient { - type Context = Context; - - fn started(&mut self, ctx: &mut Context) { - let input = InputMessage { - meta: Meta::default(), - msg: Input::Connect { - name: self.id.to_string(), - }, - }; - self.sink - .write(Message::Text(serde_json::to_string(&input).unwrap())) - .unwrap(); - // start heartbeats otherwise server will disconnect after 10 seconds - self.hb(ctx) - } - - fn stopped(&mut self, _: &mut Context) { - info!("Disconnected"); - } -} - -impl EventBusClient { - fn hb(&self, ctx: &mut Context) { - ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); - act.hb(ctx); - - // client should also check for a timeout here, similar to the - // server code - }); - } -} - -/// Handle stdin commands -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { - self.sink.write(Message::Text(msg.0)).unwrap(); - } -} - -impl Handler for EventBusClient { - type Result = (); - - fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { - /* - * For heartbeats we really don't need to do anything - */ - } -} - -/// Handle server websocket messages -impl StreamHandler> for EventBusClient { - fn handle(&mut self, msg: Result, ctx: &mut Context) { - if let Ok(Frame::Text(txt)) = msg { - /* - * We have received _some_ message from the eventbus, let's try to - * decode it! - */ - let msg = serde_json::from_slice::(&txt); - match msg { - Ok(output) => { - /* - * Dispatch the message basically back to ourself for easy - * handling - */ - ctx.address().do_send(output); - } - Err(e) => { - error!("Received invalid message: {}", e); - } - } - } - } - - fn started(&mut self, _ctx: &mut Context) { - info!("Connected"); - } - - fn finished(&mut self, ctx: &mut Context) { - info!("Server disconnected"); - ctx.stop() - } -} - -impl actix::io::WriteHandler for EventBusClient {} - #[cfg(test)] mod test { use super::*; diff --git a/eventbus/Cargo.toml b/eventbus/Cargo.toml index 19c0f32..9053a2b 100644 --- a/eventbus/Cargo.toml +++ b/eventbus/Cargo.toml @@ -10,7 +10,7 @@ path = "src/server/main.rs" [[bin]] name = "otto-ebc" -path = "src/cli.rs" +path = "src/cli/main.rs" [dependencies] # Actix provides the web and websocket basis we sit on top of @@ -47,5 +47,12 @@ url = "~2.1.0" rustyline = "~5.0.6" +# Used for the websocket client +actix-codec = "~0.2.0" +awc = "~1.0.1" +futures = "~0.3.1" +bytes = "~0.5.3" + + [dev-dependencies] regex = "1" diff --git a/eventbus/src/cli.rs b/eventbus/src/cli/main.rs similarity index 100% rename from eventbus/src/cli.rs rename to eventbus/src/cli/main.rs diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs new file mode 100644 index 0000000..03b7bf4 --- /dev/null +++ b/eventbus/src/client.rs @@ -0,0 +1,161 @@ +/**! + * This module is the eventbus client + */ +use actix::io::SinkWrite; +use actix::*; +use actix_codec::Framed; +use awc::{ + error::WsProtocolError, + ws::{Codec, Frame, Message}, + BoxedSocket, Client, +}; +use bytes::Bytes; +use futures::stream::{SplitSink, StreamExt}; +use log::{error, info}; + +use std::time::Duration; + +use crate::*; + +/** + * An EventBusClient is capable of connecting to, reading messages from, and sending messages to + * the eventbus. + */ +pub struct EventBusClient { + /** + * The sink is a writable object which can send messages back to the EventBus + */ + sink: SinkWrite, Message>>, + /** + * String identifier for the client + * + * This should be persisted between invocations of the process if the client + * should have any semblence of persistence with its messages + */ + id: &'static str, +} + +/** + * Implementation of the Debug trait so that the EventBusClient can be included + * in logging statements and print each instance's `id` + */ +impl std::fmt::Debug for EventBusClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EventBusClient", self.id) + } +} + +/** + * connect will create begin the connection process and start the EventBusClient + * actor. + * + * The caller will need to await in order to get the EventBusClient + */ +pub async fn connect(ws: &str) -> Addr { + /* + * Creating a awc::Client to handle the first part of our WebSocket client bootstrap + */ + let (response, framed) = Client::new() + .ws(ws) + .connect() + .await + .map_err(|e| { + error!("Error: {}", e); + }) + .unwrap(); + + info!("{:?}", response); + let (sink, stream) = framed.split(); + + EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: "auctioneer", + } + }) +} + +impl EventBusClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code + }); + } +} + +impl Actor for EventBusClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + let input = InputMessage { + meta: Meta::default(), + msg: Input::Connect { + name: self.id.to_string(), + }, + }; + self.sink + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + info!("Disconnected"); + } +} + +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context) { + /* + * For heartbeats we really don't need to do anything + */ + } +} + +/// Handle server websocket messages +impl StreamHandler> for EventBusClient { + fn handle(&mut self, msg: Result, ctx: &mut Context) { + if let Ok(Frame::Text(txt)) = msg { + /* + * We have received _some_ message from the eventbus, let's try to + * decode it! + */ + let msg = serde_json::from_slice::(&txt); + match msg { + Ok(output) => { + /* + * Dispatch the message basically back to ourself for easy + * handling + */ + ctx.address().do_send(output); + } + Err(e) => { + error!("Received invalid message: {}", e); + } + } + } + } + + fn started(&mut self, _ctx: &mut Context) { + info!("Connected"); + } + + fn finished(&mut self, ctx: &mut Context) { + info!("Server disconnected"); + ctx.stop() + } +} + +impl actix::io::WriteHandler for EventBusClient {} + +#[cfg(test)] +mod test { +} diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index ec941ec..e2ee201 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -12,6 +12,8 @@ use serde_json::Value; use std::sync::Arc; +pub mod client; + /** * Default function for deserialize/serialize of times, always defaults to 1970-01-01 */ @@ -142,7 +144,6 @@ impl Default for Meta { } } - #[cfg(test)] mod test { use super::*; -- 2.43.0 From b40abcad9d5d77f9cb99de6872eb59066a469e99 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 Jan 2020 20:01:23 -0800 Subject: [PATCH 09/12] Implement backoff on connection errors for the eventbus client This uses a fibonacci backoff (yay I pass the coding interview) for attempting to reconnect the connection if something cannot work properly --- auctioneer/src/main.rs | 3 ++- eventbus/src/client.rs | 54 ++++++++++++++++++++++++------------------ eventbus/src/lib.rs | 5 +++- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/auctioneer/src/main.rs b/auctioneer/src/main.rs index fc13fff..1df3c48 100644 --- a/auctioneer/src/main.rs +++ b/auctioneer/src/main.rs @@ -11,6 +11,7 @@ use actix_web::{App, HttpResponse, HttpServer}; use log::debug; use otto_eventbus::client::*; +use otto_eventbus::*; /** * The index handler for the root of the Auctioneer web interface @@ -23,7 +24,7 @@ async fn route_index() -> HttpResponse { async fn main() -> std::io::Result<()> { pretty_env_logger::init(); - let client = connect("http://127.0.0.1:8000/ws/").await; + let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await; debug!("Client created: {:?}", client); HttpServer::new(move || { diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs index 03b7bf4..687f734 100644 --- a/eventbus/src/client.rs +++ b/eventbus/src/client.rs @@ -45,41 +45,49 @@ impl std::fmt::Debug for EventBusClient { } } +pub trait EventDispatch { + fn dispatch(&self, payload: T); +} + /** * connect will create begin the connection process and start the EventBusClient * actor. * * The caller will need to await in order to get the EventBusClient */ -pub async fn connect(ws: &str) -> Addr { - /* - * Creating a awc::Client to handle the first part of our WebSocket client bootstrap - */ - let (response, framed) = Client::new() - .ws(ws) - .connect() - .await - .map_err(|e| { - error!("Error: {}", e); - }) - .unwrap(); +pub async fn connect(ws: &'static str, id: &'static str) -> Addr { + let mut backoff = 1; - info!("{:?}", response); - let (sink, stream) = framed.split(); + loop { + let r = Client::new().ws(ws).connect().await; - EventBusClient::create(|ctx| { - EventBusClient::add_stream(stream, ctx); - EventBusClient { - sink: SinkWrite::new(sink, ctx), - id: "auctioneer", + match r { + Ok((response, framed)) => { + let (sink, stream) = framed.split(); + + return EventBusClient::create(|ctx| { + EventBusClient::add_stream(stream, ctx); + EventBusClient { + sink: SinkWrite::new(sink, ctx), + id: id, + } + }); + } + Err(e) => { + error!("Failed establish WebSocket: {}", e); + backoff = backoff + backoff; + std::thread::sleep(Duration::from_secs(backoff)); + } } - }) + } } impl EventBusClient { fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap(); + act.sink + .write(Message::Ping(Bytes::from_static(b""))) + .unwrap(); act.hb(ctx); // client should also check for a timeout here, similar to the @@ -88,6 +96,7 @@ impl EventBusClient { } } +//impl Actor for EventBusClient { impl Actor for EventBusClient { type Context = Context; @@ -157,5 +166,4 @@ impl StreamHandler> for EventBusClient { impl actix::io::WriteHandler for EventBusClient {} #[cfg(test)] -mod test { -} +mod test {} diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index e2ee201..15a97e1 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -54,7 +54,10 @@ pub struct Meta { #[rtype(result = "()")] pub enum Output { Heartbeat, - Message { payload: Value }, + Message { + #[serde(default)] + payload: Value, + }, } /** -- 2.43.0 From 3ebcf376c63da473db9aeee5e2ea4fd490f12321 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 12 Jan 2020 14:23:37 -0800 Subject: [PATCH 10/12] Starting to scope out a simple Travis CI processor I need something to generate some simple tasks to pump into the eventbus for the auctioneer, this will have to do --- Cargo.lock | 4 +++ Cargo.toml | 1 + processors/README.adoc | 6 ++++ processors/travis-ci/Cargo.toml | 9 +++++ processors/travis-ci/example.internal.yml | 41 +++++++++++++++++++++++ processors/travis-ci/example.yml | 10 ++++++ processors/travis-ci/src/main.rs | 3 ++ 7 files changed, 74 insertions(+) create mode 100644 processors/README.adoc create mode 100644 processors/travis-ci/Cargo.toml create mode 100644 processors/travis-ci/example.internal.yml create mode 100644 processors/travis-ci/example.yml create mode 100644 processors/travis-ci/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index f3c0446..38918ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1999,6 +1999,10 @@ dependencies = [ "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "travis-ci" +version = "0.1.0" + [[package]] name = "trust-dns-proto" version = "0.18.0-alpha.2" diff --git a/Cargo.toml b/Cargo.toml index 5bfef08..ab217ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "auctioneer", "eventbus", + "processors/travis-ci", ] diff --git a/processors/README.adoc b/processors/README.adoc new file mode 100644 index 0000000..476507b --- /dev/null +++ b/processors/README.adoc @@ -0,0 +1,6 @@ += Otto Processors + + +Processors in Otto take configuration files, e.g. `.travis-ci.yml`, +`Jenkinsfile`, and convert them into the internal Otto constructs necessary for +task execution. diff --git a/processors/travis-ci/Cargo.toml b/processors/travis-ci/Cargo.toml new file mode 100644 index 0000000..2b72cbc --- /dev/null +++ b/processors/travis-ci/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "travis-ci" +version = "0.1.0" +authors = ["R. Tyler Croy "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/processors/travis-ci/example.internal.yml b/processors/travis-ci/example.internal.yml new file mode 100644 index 0000000..51a3052 --- /dev/null +++ b/processors/travis-ci/example.internal.yml @@ -0,0 +1,41 @@ +# +# This is an example of what the example.yml should convert to in terms of +# Otto's internal task execution structure +# +--- +meta: +# Tasks are a discrete higher level concept, which can be mapped to resources +# for execution +tasks: + - id: 0x1 + capabilities: + # "sudo false" basically means that the Travis workload doens't need a full + # virtual machine in order to operate + docker_run: true + # Operations should all exist in a single task and therefore run on a + # single colocated resource that has been allocated underneath + ops: + - id: 1 + # Contexts are typically going to carry environment variables and other + # things, in the simple Travis CI example, there's really a single + # "stage" (to use Jenkins terms) in which all scripts will be executed + type: BEGINCTX + data: + name: 'Travis' + - id: 2 + type: RUNPROC + data: + script: 'echo "Hello World"' + env: + # The Travis processor should set a default timeout + timeout_s: 300 + - id: 3 + type: RUNPROC + data: + script: 'env' + env: + timeout_s: 300 + - id: 4 + type: ENDCTX + data: + name: 'Travis' diff --git a/processors/travis-ci/example.yml b/processors/travis-ci/example.yml new file mode 100644 index 0000000..9ea7a92 --- /dev/null +++ b/processors/travis-ci/example.yml @@ -0,0 +1,10 @@ +# +# This is an example of a _very_ simple .travis-ci.yml file. + +--- +sudo: false +script: + - echo "Hello World" + - env + + diff --git a/processors/travis-ci/src/main.rs b/processors/travis-ci/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/processors/travis-ci/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} -- 2.43.0 From 51057413e3be544523a4db0450f5d9d2ebe7344d Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 13 Jan 2020 08:47:00 -0800 Subject: [PATCH 11/12] Implement a simple processor for travis-ci.yml files Currently this emits data only to standard out, but it will likely need to learn how to drop this payload directly onto the eventbus in the near future --- Cargo.lock | 37 ++++++- processors/travis-ci/.gitignore | 1 + processors/travis-ci/Cargo.toml | 11 ++- processors/travis-ci/src/main.rs | 163 ++++++++++++++++++++++++++++++- 4 files changed, 206 insertions(+), 6 deletions(-) create mode 100644 processors/travis-ci/.gitignore diff --git a/Cargo.lock b/Cargo.lock index 38918ee..7f76d58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1487,6 +1487,17 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "processor-travis-ci" +version = "0.1.0" +dependencies = [ + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "quick-error" version = "1.2.2" @@ -1818,6 +1829,17 @@ dependencies = [ "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "serde_yaml" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "sha-1" version = "0.8.1" @@ -1999,10 +2021,6 @@ dependencies = [ "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "travis-ci" -version = "0.1.0" - [[package]] name = "trust-dns-proto" version = "0.18.0-alpha.2" @@ -2120,6 +2138,15 @@ name = "utf8parse" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "uuid" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "vcpkg" version = "0.2.8" @@ -2438,6 +2465,7 @@ dependencies = [ "checksum serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)" = "48c575e0cc52bdd09b47f330f646cf59afc586e9c4e3ccd6fc1f625b8ea1dad7" "checksum serde_test 0.8.23 (registry+https://github.com/rust-lang/crates.io-index)" = "110b3dbdf8607ec493c22d5d947753282f3bae73c0f56d322af1e8c78e4c23d5" "checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +"checksum serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)" = "691b17f19fc1ec9d94ec0b5864859290dff279dbd7b03f017afda54eb36c3c35" "checksum sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "23962131a91661d643c98940b20fcaffe62d776a823247be80a48fcb8b6fce68" "checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" "checksum shellexpand 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a23aed0018ea07316c7826ade41e551772031cf652805f93ebc922215a44d4a" @@ -2471,6 +2499,7 @@ dependencies = [ "checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61" "checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" "checksum utf8parse 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8772a4ccbb4e89959023bc5b7cb8623a795caa7092d99f3aa9501b9484d4557d" +"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" "checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" diff --git a/processors/travis-ci/.gitignore b/processors/travis-ci/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/processors/travis-ci/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/processors/travis-ci/Cargo.toml b/processors/travis-ci/Cargo.toml index 2b72cbc..19b9a37 100644 --- a/processors/travis-ci/Cargo.toml +++ b/processors/travis-ci/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "travis-ci" +name = "processor-travis-ci" version = "0.1.0" authors = ["R. Tyler Croy "] edition = "2018" @@ -7,3 +7,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +serde = "~1.0.103" +serde_json = "~1.0.0" +serde_yaml = "~0.8.11" + +# Handling command line options +clap = { version = "~2.33.0", features = ["yaml"] } + +# Needed for generating uuids +uuid = { version = "~0.8.1", features = ["serde", "v4"] } diff --git a/processors/travis-ci/src/main.rs b/processors/travis-ci/src/main.rs index e7a11a9..abfc907 100644 --- a/processors/travis-ci/src/main.rs +++ b/processors/travis-ci/src/main.rs @@ -1,3 +1,164 @@ +extern crate clap; +extern crate serde; +extern crate serde_yaml; +extern crate uuid; + +use clap::{Arg, App}; +use serde::{Deserialize, Serialize}; +use serde_yaml::Value; +use uuid::Uuid; + +use std::collections::HashMap; +use std::fs; + fn main() { - println!("Hello, world!"); + let matches = App::new("travis processor") + .arg(Arg::with_name("filename") + .short("f") + .long("filename") + .value_name("FILE") + .required(true) + .help("File") + .takes_value(true)) + .get_matches(); + let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml"); + let contents = fs::read_to_string(filename).expect("Something went wrong reading the file"); + let pipeline = serde_yaml::from_str::(&contents) + .expect("Failed to deserialize the yaml file into a TravisConfig"); + + let mut output = PipelineManifest { + tasks: vec![], + }; + + let mut caps = HashMap::new(); + + if pipeline.sudo { + caps.insert("docker_run".to_string(), Value::Bool(false)); + } + else { + caps.insert("docker_run".to_string(), Value::Bool(true)); + } + + let mut task = Task { + id: Uuid::new_v4().to_string(), + capabilities: caps, + ops: vec![], + }; + + task.ops.push(Op { + id: Uuid::new_v4().to_string(), + op_type: OpType::BeginContext, + // Cheap quick hack to get a simple hashmap here + data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(), + }); + + for script in pipeline.script.iter() { + let mut data = HashMap::new(); + data.insert("script".to_string(), Value::String(script.to_string())); + data.insert("timeout_s".to_string(), Value::Number(serde_yaml::Number::from(300))); + data.insert("env".to_string(), Value::Null); + task.ops.push(Op { + id: Uuid::new_v4().to_string(), + op_type: OpType::RunProcess, + data + }); + + } + + task.ops.push(Op { + id: Uuid::new_v4().to_string(), + op_type: OpType::EndContext, + // Cheap quick hack to get a simple hashmap here + data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(), + }); + + output.tasks.push(task); + + println!("{}", serde_yaml::to_string(&output).expect("Failed to serialize manifest")); +} + +#[derive(Deserialize, Debug, Serialize)] +struct TravisConfig { + sudo: bool, + script: Vec, +} + +#[derive(Deserialize, Debug, PartialEq, Serialize)] +enum OpType { + #[serde(rename = "BEGINCTX")] + BeginContext, + #[serde(rename = "RUNPROC")] + RunProcess, + #[serde(rename = "ENDCTX")] + EndContext, +} + +#[derive(Deserialize, Debug, Serialize)] +struct Op { + id: String, + #[serde(rename = "type")] + op_type: OpType, + data: HashMap, +} + +#[derive(Deserialize, Debug, Serialize)] +struct Task { + id: String, + capabilities: HashMap, + ops: Vec, +} +#[derive(Deserialize, Debug, Serialize)] +struct PipelineManifest { + // meta: Meta, + tasks: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deser_simple() { + let yaml = r#" +--- +sudo: false +script: + - echo "Hello World" + - env +"#; + let c = serde_yaml::from_str::(yaml).unwrap(); + assert!(!c.sudo); + assert_eq!(c.script.len(), 2); + } + + #[test] + fn deser_yaml() { + let yaml = r#" +--- +meta: +tasks: + - id: 0x1 + capabilities: + docker_run: true + ops: + - id: 1 + type: BEGINCTX + data: + name: 'Travis' + - id: 2 + type: RUNPROC + data: + script: 'echo "Hello World"' + env: + timeout_s: 300 + - id: 3 + type: ENDCTX + data: + name: 'Travis' +"#; + let c = serde_yaml::from_str::(yaml).unwrap(); + assert_eq!(c.tasks.len(), 1); + assert_eq!(c.tasks[0].ops.len(), 3); + assert_eq!(c.tasks[0].ops[0].op_type, OpType::BeginContext); + } } -- 2.43.0 From 58cdae28cc94a8b5c8851cea85e8d1320d7e9dd5 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 13 Jan 2020 19:41:07 -0800 Subject: [PATCH 12/12] Prototype the travis-ci processor pushing directly into the tasks.for_auction channel This doesn't help too terribly much at this point, but at least it's a means of getting a .travis-ci.yml into the system --- Cargo.lock | 4 ++ eventbus/src/client.rs | 27 +++++++++--- eventbus/src/lib.rs | 18 ++++++++ processors/travis-ci/Cargo.toml | 6 +++ processors/travis-ci/src/main.rs | 74 ++++++++++++++++++++++++-------- 5 files changed, 105 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f76d58..e7fcada 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1491,7 +1491,11 @@ dependencies = [ name = "processor-travis-ci" version = "0.1.0" dependencies = [ + "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "otto-eventbus 0.1.0", + "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", "serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs index 687f734..f3e96d9 100644 --- a/eventbus/src/client.rs +++ b/eventbus/src/client.rs @@ -35,6 +35,10 @@ pub struct EventBusClient { id: &'static str, } +#[derive(Debug, Message)] +#[rtype(result = "()")] +pub struct Disconnect; + /** * Implementation of the Debug trait so that the EventBusClient can be included * in logging statements and print each instance's `id` @@ -45,10 +49,6 @@ impl std::fmt::Debug for EventBusClient { } } -pub trait EventDispatch { - fn dispatch(&self, payload: T); -} - /** * connect will create begin the connection process and start the EventBusClient * actor. @@ -96,7 +96,6 @@ impl EventBusClient { } } -//impl Actor for EventBusClient { impl Actor for EventBusClient { type Context = Context; @@ -119,6 +118,24 @@ impl Actor for EventBusClient { } } +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _: Disconnect, ctx: &mut Context) { + ctx.stop() + } +} + +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, input: InputMessage, _ctx: &mut Context) { + self.sink + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); + } +} + impl Handler for EventBusClient { type Result = (); diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index 15a97e1..4e494ba 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -45,6 +45,18 @@ pub struct Meta { pub ts: DateTime, } +impl Meta { + /** + * Construct a Meta struct with the current time + */ + pub fn new(channel: String) -> Self { + Meta { + channel: channel, + ts: Utc::now(), + } + } +} + /** * The Output enums are all meant to capture the types of messages which can be received from the * eventbus. @@ -158,4 +170,10 @@ mod test { assert_eq!(m.channel, ""); assert!(m.ts < Utc::now()); } + + #[test] + fn test_new_meta() { + let m = Meta::new("foo".to_string()); + assert_eq!(m.channel, "foo"); + } } diff --git a/processors/travis-ci/Cargo.toml b/processors/travis-ci/Cargo.toml index 19b9a37..2e26d71 100644 --- a/processors/travis-ci/Cargo.toml +++ b/processors/travis-ci/Cargo.toml @@ -11,8 +11,14 @@ serde = "~1.0.103" serde_json = "~1.0.0" serde_yaml = "~0.8.11" +actix = "~0.9.0" +log = "~0.4.8" +pretty_env_logger = "~0.3.1" + # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } # Needed for generating uuids uuid = { version = "~0.8.1", features = ["serde", "v4"] } + +otto-eventbus = { path = "../../eventbus" } diff --git a/processors/travis-ci/src/main.rs b/processors/travis-ci/src/main.rs index abfc907..9d2514f 100644 --- a/processors/travis-ci/src/main.rs +++ b/processors/travis-ci/src/main.rs @@ -1,9 +1,13 @@ +extern crate actix; extern crate clap; +extern crate pretty_env_logger; extern crate serde; extern crate serde_yaml; extern crate uuid; -use clap::{Arg, App}; +use actix::*; +use clap::{App, Arg}; +use log::*; use serde::{Deserialize, Serialize}; use serde_yaml::Value; use uuid::Uuid; @@ -11,31 +15,35 @@ use uuid::Uuid; use std::collections::HashMap; use std::fs; +use otto_eventbus::client::*; +use otto_eventbus::*; + fn main() { + pretty_env_logger::init(); + let matches = App::new("travis processor") - .arg(Arg::with_name("filename") - .short("f") - .long("filename") - .value_name("FILE") - .required(true) - .help("File") - .takes_value(true)) - .get_matches(); + .arg( + Arg::with_name("filename") + .short("f") + .long("filename") + .value_name("FILE") + .required(true) + .help("File") + .takes_value(true), + ) + .get_matches(); let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml"); let contents = fs::read_to_string(filename).expect("Something went wrong reading the file"); let pipeline = serde_yaml::from_str::(&contents) .expect("Failed to deserialize the yaml file into a TravisConfig"); - let mut output = PipelineManifest { - tasks: vec![], - }; + let mut output = PipelineManifest { tasks: vec![] }; let mut caps = HashMap::new(); if pipeline.sudo { caps.insert("docker_run".to_string(), Value::Bool(false)); - } - else { + } else { caps.insert("docker_run".to_string(), Value::Bool(true)); } @@ -55,14 +63,16 @@ fn main() { for script in pipeline.script.iter() { let mut data = HashMap::new(); data.insert("script".to_string(), Value::String(script.to_string())); - data.insert("timeout_s".to_string(), Value::Number(serde_yaml::Number::from(300))); + data.insert( + "timeout_s".to_string(), + Value::Number(serde_yaml::Number::from(300)), + ); data.insert("env".to_string(), Value::Null); task.ops.push(Op { id: Uuid::new_v4().to_string(), op_type: OpType::RunProcess, - data - }); - + data, + }); } task.ops.push(Op { @@ -74,7 +84,33 @@ fn main() { output.tasks.push(task); - println!("{}", serde_yaml::to_string(&output).expect("Failed to serialize manifest")); + info!( + "{}", + serde_yaml::to_string(&output).expect("Failed to serialize manifest") + ); + + let sys = System::new("name"); + Arbiter::spawn(async { + let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await; + info!("Client created: {:?}", client); + + let input = InputMessage { + meta: Meta::new("tasks.for_auction".to_string()), + msg: Input::Publish { + payload: serde_json::to_value(output).unwrap(), + }, + }; + client.do_send(input); + + /* + * Disconnecting as soon as we send Input doesn't work well, because the client seems to + * terminate before it can actually send messages over. + */ + //client.do_send(Disconnect {}); + //info!("Disconnected"); + //System::current().stop(); + }); + sys.run().unwrap(); } #[derive(Deserialize, Debug, Serialize)] -- 2.43.0