Provide the basics for an EventBusClient #2

Manually merged
rtyler merged 12 commits from resource-allocation into master 2020-01-14 05:44:41 +00:00
18 changed files with 822 additions and 150 deletions

45
Cargo.lock generated
View File

@ -1310,7 +1310,9 @@ name = "otto-auctioneer"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1318,8 +1320,6 @@ dependencies = [
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1327,13 +1327,17 @@ name = "otto-eventbus"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1483,6 +1487,21 @@ dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "processor-travis-ci"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"otto-eventbus 0.1.0",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "quick-error"
version = "1.2.2"
@ -1814,6 +1833,17 @@ dependencies = [
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_yaml"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sha-1"
version = "0.8.1"
@ -2112,6 +2142,15 @@ name = "utf8parse"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "vcpkg"
version = "0.2.8"
@ -2430,6 +2469,7 @@ dependencies = [
"checksum serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)" = "48c575e0cc52bdd09b47f330f646cf59afc586e9c4e3ccd6fc1f625b8ea1dad7"
"checksum serde_test 0.8.23 (registry+https://github.com/rust-lang/crates.io-index)" = "110b3dbdf8607ec493c22d5d947753282f3bae73c0f56d322af1e8c78e4c23d5"
"checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
"checksum serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)" = "691b17f19fc1ec9d94ec0b5864859290dff279dbd7b03f017afda54eb36c3c35"
"checksum sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "23962131a91661d643c98940b20fcaffe62d776a823247be80a48fcb8b6fce68"
"checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
"checksum shellexpand 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a23aed0018ea07316c7826ade41e551772031cf652805f93ebc922215a44d4a"
@ -2463,6 +2503,7 @@ dependencies = [
"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61"
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
"checksum utf8parse 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8772a4ccbb4e89959023bc5b7cb8623a795caa7092d99f3aa9501b9484d4557d"
"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"

View File

@ -3,4 +3,5 @@
members = [
"auctioneer",
"eventbus",
"processors/travis-ci",
]

View File

@ -7,11 +7,9 @@ edition = "2018"
[dependencies]
# Needed for the actor system
actix = "~0.9.0"
actix-http = "~1.0.1"
actix-rt = "~1.0.0"
# Needed for websockets
tungstenite = "~0.9.2"
url = "~2.1.0"
actix-web = "~2.0.0"
# Handling command line options
clap = { version = "~2.33.0", features = ["yaml"] }
@ -27,4 +25,3 @@ serde = { version = "~1.0.103", features = ["rc"] }
serde_json = "~1.0.0"
otto-eventbus = { path = "../eventbus" }

View File

@ -2,69 +2,57 @@
* The auctioneer main model
*/
extern crate actix;
extern crate actix_http;
extern crate actix_web;
extern crate pretty_env_logger;
extern crate tungstenite;
use std::time::Duration;
use std::{io, thread};
use actix::*;
use log::*;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
use tungstenite::*;
use url::Url;
use actix_web::{middleware, web};
use actix_web::{App, HttpResponse, HttpServer};
use log::debug;
use otto_eventbus::client::*;
use otto_eventbus::*;
struct BusClient {}
impl Actor for BusClient {
type Context = SyncContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Starting client");
}
}
/**
* Simple websocket connection function, just to allow for easy reconnections
* The index handler for the root of the Auctioneer web interface
*/
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
async fn route_index() -> HttpResponse {
HttpResponse::Ok().body("Auctioneer")
}
#[actix_rt::main]
async fn main() {
async fn main() -> std::io::Result<()> {
pretty_env_logger::init();
let (mut socket, _response) = ws_connect().unwrap();
let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await;
debug!("Client created: {:?}", client);
info!("Connected to the server");
HttpServer::new(move || {
App::new()
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::default())
.route("/", web::get().to(route_index))
})
.bind("127.0.0.1:8001")?
.run()
.await
}
/*
* In this loop we should read the message, and then dispatch it to a handler actor immediately
* to do "the work"
#[cfg(test)]
mod test {
use super::*;
use actix_web::{test, web, App};
/**
* This test just ensures that the server can come online properly and render its index handler
* properly.
*/
loop {
let msg = socket.read_message();
let msg = match msg {
Ok(m) => m,
Err(e) => {
error!("Failed to read a message off the WebSocket: {:?}", e);
// must reconnect
thread::sleep(Duration::from_secs(3));
#[actix_rt::test]
async fn test_basic_http() {
let srv = test::start(move || App::new().route("/", web::get().to(route_index)));
/*
* Ignore connection errors, they're not important since we
* are in the retry loop anyways.
*/
if let Ok(recon) = ws_connect() {
info!("Server reconnected");
socket = recon.0;
}
continue;
}
};
info!("Received: {}", msg);
let req = srv.get("/");
let response = req.send().await.unwrap();
assert!(response.status().is_success());
}
}

View File

@ -10,7 +10,7 @@ path = "src/server/main.rs"
[[bin]]
name = "otto-ebc"
path = "src/cli.rs"
path = "src/cli/main.rs"
[dependencies]
# Actix provides the web and websocket basis we sit on top of
@ -47,5 +47,12 @@ url = "~2.1.0"
rustyline = "~5.0.6"
# Used for the websocket client
actix-codec = "~0.2.0"
awc = "~1.0.1"
futures = "~0.3.1"
bytes = "~0.5.3"
[dev-dependencies]
regex = "1"

View File

@ -1,52 +0,0 @@
/**
* The CLI is meant to be used for manual testing and verification of the eventbus only.
*/
extern crate rustyline;
use std::io::{stdin, stdout, Write};
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tungstenite::*;
use url::Url;
fn main() {
let (mut socket, response) =
connect(Url::parse("ws://localhost:8000/ws/").unwrap()).expect("Failed to connect");
println!("Connected to the server");
// `()` can be used when no completer is required
let mut rl = Editor::<()>::new();
let history = ".otto-ebc-history";
if rl.load_history(history).is_err() {
println!("No previous history");
}
loop {
let readline = rl.readline(">> ");
match readline {
Ok(line) => {
if line.len() == 0 {
let msg = socket.read_message().expect("Failed to read message");
println!("Received: {}", msg);
} else {
socket.write_message(Message::Text(line));
}
},
Err(ReadlineError::Interrupted) => {
// ctrl-C
break
},
Err(ReadlineError::Eof) => {
// ctrl-D
break
},
Err(err) => {
println!("Error: {:?}", err);
break
}
}
}
rl.save_history(history).unwrap();
}

70
eventbus/src/cli/main.rs Normal file
View File

@ -0,0 +1,70 @@
/**
* The CLI is meant to be used for manual testing and verification of the eventbus only.
*/
extern crate rustyline;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
use tungstenite::*;
use url::Url;
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
}
fn main() {
let (mut socket, _response) = ws_connect().unwrap();
println!("Connected to the server");
// `()` can be used when no completer is required
let mut rl = Editor::<()>::new();
let history = ".otto-ebc-history";
if rl.load_history(history).is_err() {
println!("No previous history");
}
loop {
let readline = rl.readline(">> ");
match readline {
Ok(line) => {
if line.len() == 0 {
let response = socket.read_message();
match response {
Ok(msg) => {
println!("{}", msg);
}
Err(e) => {
println!("Failed to read: {}", e);
}
}
} else {
if line == "quit" {
return;
}
match socket.write_message(Message::Text(line)) {
Ok(_) => {}
Err(e) => {
println!("Failed to write: {}", e);
}
}
}
}
Err(ReadlineError::Interrupted) => {
// ctrl-C
break;
}
Err(ReadlineError::Eof) => {
// ctrl-D
break;
}
Err(err) => {
println!("Error: {:?}", err);
break;
}
}
}
rl.save_history(history).unwrap();
}

186
eventbus/src/client.rs Normal file
View File

@ -0,0 +1,186 @@
/**!
* This module is the eventbus client
*/
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use bytes::Bytes;
use futures::stream::{SplitSink, StreamExt};
use log::{error, info};
use std::time::Duration;
use crate::*;
/**
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
* the eventbus.
*/
pub struct EventBusClient {
/**
* The sink is a writable object which can send messages back to the EventBus
*/
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
/**
* String identifier for the client
*
* This should be persisted between invocations of the process if the client
* should have any semblence of persistence with its messages
*/
id: &'static str,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct Disconnect;
/**
* Implementation of the Debug trait so that the EventBusClient can be included
* in logging statements and print each instance's `id`
*/
impl std::fmt::Debug for EventBusClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EventBusClient<id:{}>", self.id)
}
}
/**
* connect will create begin the connection process and start the EventBusClient
* actor.
*
* The caller will need to await in order to get the EventBusClient
*/
pub async fn connect(ws: &'static str, id: &'static str) -> Addr<EventBusClient> {
let mut backoff = 1;
loop {
let r = Client::new().ws(ws).connect().await;
match r {
Ok((response, framed)) => {
let (sink, stream) = framed.split();
return EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: id,
}
});
}
Err(e) => {
error!("Failed establish WebSocket: {}", e);
backoff = backoff + backoff;
std::thread::sleep(Duration::from_secs(backoff));
}
}
}
}
impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.sink
.write(Message::Ping(Bytes::from_static(b"")))
.unwrap();
act.hb(ctx);
// client should also check for a timeout here, similar to the
// server code
});
}
}
impl Actor for EventBusClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let input = InputMessage {
meta: Meta::default(),
msg: Input::Connect {
name: self.id.to_string(),
},
};
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
info!("Disconnected");
}
}
impl Handler<Disconnect> for EventBusClient {
type Result = ();
fn handle(&mut self, _: Disconnect, ctx: &mut Context<Self>) {
ctx.stop()
}
}
impl Handler<InputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, input: InputMessage, _ctx: &mut Context<Self>) {
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
}
}
impl Handler<OutputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
/*
* For heartbeats we really don't need to do anything
*/
}
}
/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
if let Ok(Frame::Text(txt)) = msg {
/*
* We have received _some_ message from the eventbus, let's try to
* decode it!
*/
let msg = serde_json::from_slice::<OutputMessage>(&txt);
match msg {
Ok(output) => {
/*
* Dispatch the message basically back to ourself for easy
* handling
*/
ctx.address().do_send(output);
}
Err(e) => {
error!("Received invalid message: {}", e);
}
}
}
}
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("Connected");
}
fn finished(&mut self, ctx: &mut Context<Self>) {
info!("Server disconnected");
ctx.stop()
}
}
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {}

View File

@ -12,6 +12,8 @@ use serde_json::Value;
use std::sync::Arc;
pub mod client;
/**
* Default function for deserialize/serialize of times, always defaults to 1970-01-01
*/
@ -20,6 +22,14 @@ fn epoch() -> DateTime<Utc> {
return Utc.ymd(1970, 1, 1).and_hms_milli(0, 0, 1, 444);
}
/**
* Default function for the deserialization of an empty channel, will just create an empty string
*/
fn default_channel() -> String {
return "".to_owned();
}
/**
* The `Meta` struct contains the necessary metadata about a message which is being sent over the
* wire
@ -29,11 +39,24 @@ fn epoch() -> DateTime<Utc> {
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct Meta {
#[serde(default = "default_channel")]
pub channel: String,
#[serde(default = "epoch")]
pub ts: DateTime<Utc>,
}
impl Meta {
/**
* Construct a Meta struct with the current time
*/
pub fn new(channel: String) -> Self {
Meta {
channel: channel,
ts: Utc::now(),
}
}
}
/**
* The Output enums are all meant to capture the types of messages which can be received from the
* eventbus.
@ -43,7 +66,10 @@ pub struct Meta {
#[rtype(result = "()")]
pub enum Output {
Heartbeat,
Message { payload: Value },
Message {
#[serde(default)]
payload: Value,
},
}
/**
@ -71,6 +97,13 @@ pub struct OutputMessage {
#[serde(tag = "type", rename_all = "camelCase")]
#[rtype(result = "()")]
pub enum Input {
/**
* A Connect message must be sent for the client to start receiving messages
*
* This message instructs the eventbus to subscribe the client to the "all"
* channel and its own private inbox channel
*/
Connect { name: String },
/**
* A Subscribe message must be sent for each channel the client wishes to subscribe to.
*
@ -113,3 +146,34 @@ pub struct InputMessage {
pub msg: Input,
pub meta: Meta,
}
/**
* The Default trait for `Meta` will create a functionally empty Meta struct
*/
impl Default for Meta {
fn default() -> Meta {
Meta {
channel: default_channel(),
ts: epoch(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::Utc;
#[test]
fn test_default_for_meta() {
let m = Meta::default();
assert_eq!(m.channel, "");
assert!(m.ts < Utc::now());
}
#[test]
fn test_new_meta() {
let m = Meta::new("foo".to_string());
assert_eq!(m.channel, "foo");
}
}

View File

@ -6,13 +6,12 @@
use actix::*;
use actix_web_actors::ws;
use chrono::prelude::Utc;
use log::{error, info};
use log::{error, info, trace};
use serde_json;
use std::sync::Arc;
use crate::*;
use otto_eventbus::*;
/*
* Define the Websocket Actor needed for Actix
@ -24,12 +23,55 @@ pub struct WSClient {
events: Addr<eventbus::EventBus>,
}
/**
* The WSClient is an actor responsible for the stateful behavior of each websocket
* connected client
*/
impl WSClient {
/**
* Construct the WSClient with the given EventBus actor to communicate with
*/
pub fn new(eb: Addr<eventbus::EventBus>) -> Self {
Self { events: eb }
}
fn handle_text(&self, text: String, ctx: &<WSClient as Actor>::Context) {
/**
* handle_connect() takes care of the Input::Connect message and will ensure
* that the client is connected to the "all" channel as well as its inbox"
*/
fn handle_connect(&self, client_name: String, ctx: &mut <WSClient as Actor>::Context) {
self.events.do_send(eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
});
/*
* Both of these message sends are creating their own String to represent
* the inbox channel.
*
* This SHOULD be fixed, but will require some more thinking about how
* the lifetimes of Channel objects should work
*/
self.events.do_send(eventbus::CreateChannel {
channel: eventbus::Channel {
name: format!("inbox.{}", client_name),
stateful: true,
},
});
self.events.do_send(eventbus::Subscribe {
to: format!("inbox.{}", client_name),
addr: ctx.address(),
});
}
/**
* handle_text handles _all_ incoming messages from the websocket connection,
* it is responsible for translating those JSON messages into something the
* eventbus can pass around internally
*/
fn handle_text(&self, text: String, ctx: &mut <WSClient as Actor>::Context) {
let command = serde_json::from_str::<InputMessage>(&text);
match command {
@ -38,6 +80,11 @@ impl WSClient {
match c {
InputMessage { msg, meta } => {
match msg {
Input::Connect { name } => {
info!("Received connect for client named: {}", name);
self.handle_connect(name, ctx)
}
Input::Publish { payload } => {
info!("received publish: {:?}", payload);
self.events.do_send(eventbus::Event {
@ -95,27 +142,13 @@ impl Handler<eventbus::Event> for WSClient {
*/
impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let sub = eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
};
self.events
.send(sub)
.into_actor(self)
.then(|result, _actor, ctx| {
match result {
Ok(_) => (),
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
}
/// Handler for ws::Message message
/**
* Handler for the ws::Message message for the WSClient actor
*
* This handler will be called for every message from a websocket client
*/
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
@ -125,7 +158,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
}
Ok(msg) => msg,
};
info!("WebSocket received: {:?}", msg);
trace!("WebSocket message received: {:?}", msg);
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => self.handle_text(text, ctx),
@ -134,3 +167,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
}
}
}
#[cfg(test)]
mod test {}

View File

@ -18,6 +18,32 @@ use crate::*;
*/
type ClientId = Addr<connection::WSClient>;
/**
* The Channel struct is used as an internal representation of each channel that
* the eventbus knows about.
*
* Channels may be either stateless or stateful, with the ladder implying persistence
* guarantees, depending on the eventbus' backing implementation.
*/
#[derive(Clone, Debug, Eq)]
pub struct Channel {
pub name: String,
pub stateful: bool,
}
/**
* The CreateChannel message is only meant to be used by internal components of
* the eventbus.
*
* It is used primarily for creating new channels on-demand, such as those needed
* for new client inboxes
*/
#[derive(Message)]
#[rtype(result = "()")]
pub struct CreateChannel {
pub channel: Channel,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe {
@ -39,23 +65,17 @@ pub struct Unsubscribe {
}
/**
* The Channel struct is used as an internal representation of each channel that
* the eventbus knows about.
*
* Channels may be either stateless or stateful, with the ladder implying persistence
* guarantees, depending on the eventbus' backing implementation.
* Implementation of the Hash trait for Channel ensures that it can be placed in a HashSet
*/
#[derive(Debug, Eq)]
pub struct Channel {
name: String,
stateful: bool,
}
impl Hash for Channel {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
/**
* Implementation of PartialEq trait for Channel ensures that it can be placed in a HashSet
*/
impl PartialEq for Channel {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
@ -70,6 +90,10 @@ pub struct EventBus {
channels: HashMap<Channel, HashSet<ClientId>>,
}
/**
*
* The Actor trait for the Eventbus allows it to act as an actor in the actix system
*/
impl Actor for EventBus {
type Context = Context<Self>;
@ -107,16 +131,26 @@ impl EventBus {
}
}
impl Handler<CreateChannel> for EventBus {
type Result = ();
fn handle(&mut self, create: CreateChannel, _: &mut Context<Self>) {
self.channels.insert(create.channel, HashSet::new());
}
}
impl Handler<Subscribe> for EventBus {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
info!("Subscribing client to {}", msg.to);
// The stateful field doesn't matter here because the hashing on the
// HashMap only applies to the name of the channel
let ch = Channel {
name: msg.to,
stateful: false,
};
if self.channels.contains_key(&ch) {
match self.channels.get_mut(&ch) {
Some(set) => {
@ -140,6 +174,7 @@ impl Handler<Event> for EventBus {
name: ev.channel.to_string(),
stateful: false,
};
if self.channels.contains_key(&ch) {
if let Some(clients) = self.channels.get(&ch) {
/*
@ -162,3 +197,18 @@ impl Handler<Event> for EventBus {
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_with_channels_empty() {
let _bus = EventBus::with_channels(vec![], vec![]);
}
#[test]
fn test_with_channels_stateless() {
let _bus = EventBus::with_channels(vec![String::from("test")], vec![]);
}
}

View File

@ -3,8 +3,8 @@
* server loop that the eventbus uses.
*/
extern crate actix;
extern crate actix_web;
extern crate actix_http;
extern crate actix_web;
extern crate config;
extern crate log;
extern crate pretty_env_logger;
@ -159,7 +159,6 @@ async fn main() -> std::io::Result<()> {
.await
}
#[cfg(test)]
mod test {
use super::*;
@ -183,10 +182,10 @@ mod test {
};
let wd = web::Data::new(state);
let srv = test::start(move || {
App::new()
.app_data(wd.clone())
.route("/", web::get().to(index))
});
App::new()
.app_data(wd.clone())
.route("/", web::get().to(index))
});
let req = srv.get("/");
let mut response = req.send().await.unwrap();
@ -199,6 +198,9 @@ mod test {
let matches = re.captures(&buffer).unwrap();
let version = matches.get(1).unwrap();
assert_eq!(version.as_str(), format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown")));
assert_eq!(
version.as_str(),
format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))
);
}
}

6
processors/README.adoc Normal file
View File

@ -0,0 +1,6 @@
= Otto Processors
Processors in Otto take configuration files, e.g. `.travis-ci.yml`,
`Jenkinsfile`, and convert them into the internal Otto constructs necessary for
task execution.

1
processors/travis-ci/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target/

View File

@ -0,0 +1,24 @@
[package]
name = "processor-travis-ci"
version = "0.1.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = "~1.0.103"
serde_json = "~1.0.0"
serde_yaml = "~0.8.11"
actix = "~0.9.0"
log = "~0.4.8"
pretty_env_logger = "~0.3.1"
# Handling command line options
clap = { version = "~2.33.0", features = ["yaml"] }
# Needed for generating uuids
uuid = { version = "~0.8.1", features = ["serde", "v4"] }
otto-eventbus = { path = "../../eventbus" }

View File

@ -0,0 +1,41 @@
#
# This is an example of what the example.yml should convert to in terms of
# Otto's internal task execution structure
#
---
meta:
# Tasks are a discrete higher level concept, which can be mapped to resources
# for execution
tasks:
- id: 0x1
capabilities:
# "sudo false" basically means that the Travis workload doens't need a full
# virtual machine in order to operate
docker_run: true
# Operations should all exist in a single task and therefore run on a
# single colocated resource that has been allocated underneath
ops:
- id: 1
# Contexts are typically going to carry environment variables and other
# things, in the simple Travis CI example, there's really a single
# "stage" (to use Jenkins terms) in which all scripts will be executed
type: BEGINCTX
data:
name: 'Travis'
- id: 2
type: RUNPROC
data:
script: 'echo "Hello World"'
env:
# The Travis processor should set a default timeout
timeout_s: 300
- id: 3
type: RUNPROC
data:
script: 'env'
env:
timeout_s: 300
- id: 4
type: ENDCTX
data:
name: 'Travis'

View File

@ -0,0 +1,10 @@
#
# This is an example of a _very_ simple .travis-ci.yml file.
---
sudo: false
script:
- echo "Hello World"
- env

View File

@ -0,0 +1,200 @@
extern crate actix;
extern crate clap;
extern crate pretty_env_logger;
extern crate serde;
extern crate serde_yaml;
extern crate uuid;
use actix::*;
use clap::{App, Arg};
use log::*;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use uuid::Uuid;
use std::collections::HashMap;
use std::fs;
use otto_eventbus::client::*;
use otto_eventbus::*;
fn main() {
pretty_env_logger::init();
let matches = App::new("travis processor")
.arg(
Arg::with_name("filename")
.short("f")
.long("filename")
.value_name("FILE")
.required(true)
.help("File")
.takes_value(true),
)
.get_matches();
let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml");
let contents = fs::read_to_string(filename).expect("Something went wrong reading the file");
let pipeline = serde_yaml::from_str::<TravisConfig>(&contents)
.expect("Failed to deserialize the yaml file into a TravisConfig");
let mut output = PipelineManifest { tasks: vec![] };
let mut caps = HashMap::new();
if pipeline.sudo {
caps.insert("docker_run".to_string(), Value::Bool(false));
} else {
caps.insert("docker_run".to_string(), Value::Bool(true));
}
let mut task = Task {
id: Uuid::new_v4().to_string(),
capabilities: caps,
ops: vec![],
};
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::BeginContext,
// Cheap quick hack to get a simple hashmap here
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
});
for script in pipeline.script.iter() {
let mut data = HashMap::new();
data.insert("script".to_string(), Value::String(script.to_string()));
data.insert(
"timeout_s".to_string(),
Value::Number(serde_yaml::Number::from(300)),
);
data.insert("env".to_string(), Value::Null);
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::RunProcess,
data,
});
}
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::EndContext,
// Cheap quick hack to get a simple hashmap here
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
});
output.tasks.push(task);
info!(
"{}",
serde_yaml::to_string(&output).expect("Failed to serialize manifest")
);
let sys = System::new("name");
Arbiter::spawn(async {
let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await;
info!("Client created: {:?}", client);
let input = InputMessage {
meta: Meta::new("tasks.for_auction".to_string()),
msg: Input::Publish {
payload: serde_json::to_value(output).unwrap(),
},
};
client.do_send(input);
/*
* Disconnecting as soon as we send Input doesn't work well, because the client seems to
* terminate before it can actually send messages over.
*/
//client.do_send(Disconnect {});
//info!("Disconnected");
//System::current().stop();
});
sys.run().unwrap();
}
#[derive(Deserialize, Debug, Serialize)]
struct TravisConfig {
sudo: bool,
script: Vec<String>,
}
#[derive(Deserialize, Debug, PartialEq, Serialize)]
enum OpType {
#[serde(rename = "BEGINCTX")]
BeginContext,
#[serde(rename = "RUNPROC")]
RunProcess,
#[serde(rename = "ENDCTX")]
EndContext,
}
#[derive(Deserialize, Debug, Serialize)]
struct Op {
id: String,
#[serde(rename = "type")]
op_type: OpType,
data: HashMap<String, Value>,
}
#[derive(Deserialize, Debug, Serialize)]
struct Task {
id: String,
capabilities: HashMap<String, Value>,
ops: Vec<Op>,
}
#[derive(Deserialize, Debug, Serialize)]
struct PipelineManifest {
// meta: Meta,
tasks: Vec<Task>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deser_simple() {
let yaml = r#"
---
sudo: false
script:
- echo "Hello World"
- env
"#;
let c = serde_yaml::from_str::<TravisConfig>(yaml).unwrap();
assert!(!c.sudo);
assert_eq!(c.script.len(), 2);
}
#[test]
fn deser_yaml() {
let yaml = r#"
---
meta:
tasks:
- id: 0x1
capabilities:
docker_run: true
ops:
- id: 1
type: BEGINCTX
data:
name: 'Travis'
- id: 2
type: RUNPROC
data:
script: 'echo "Hello World"'
env:
timeout_s: 300
- id: 3
type: ENDCTX
data:
name: 'Travis'
"#;
let c = serde_yaml::from_str::<PipelineManifest>(yaml).unwrap();
assert_eq!(c.tasks.len(), 1);
assert_eq!(c.tasks[0].ops.len(), 3);
assert_eq!(c.tasks[0].ops[0].op_type, OpType::BeginContext);
}
}