Compare commits

...

12 Commits

Author SHA1 Message Date
R Tyler Croy 58cdae28cc
Prototype the travis-ci processor pushing directly into the tasks.for_auction channel
This doesn't help too terribly much at this point, but at least it's a means of
getting a .travis-ci.yml into the system
2020-01-13 19:41:07 -08:00
R Tyler Croy 51057413e3
Implement a simple processor for travis-ci.yml files
Currently this emits data only to standard out, but it will likely need to learn
how to drop this payload directly onto the eventbus in the near future
2020-01-13 18:19:54 -08:00
R Tyler Croy 3ebcf376c6
Starting to scope out a simple Travis CI processor
I need something to generate some simple tasks to pump into the eventbus for the
auctioneer, this will have to do
2020-01-12 14:23:37 -08:00
R Tyler Croy b40abcad9d
Implement backoff on connection errors for the eventbus client
This uses a fibonacci backoff (yay I pass the coding interview) for attempting
to reconnect the connection if something cannot work properly
2020-01-11 20:44:05 -08:00
R Tyler Croy 24db3be3ea
Refactor the EventBusClient code into the eventus crate where it belongs
Now that I have something simple and functioning, this no longer needs to be
in the auctioneer

The fun work begins now, supporting custom messages known only to the auctioneer
for its needs, but passing those through EventBusClient
2020-01-11 16:18:14 -08:00
R Tyler Croy 0f3582db01
Implement the Default trait for Meta to make it easier for the auctioneer
This removes a dependency from auctioneer and is the first step in refactoring
the EventBusClient into the eventbus crate
2020-01-11 16:18:14 -08:00
R Tyler Croy e0a9b0c870
Implement some basic command dispatching in the auctioneer
This code will soon be refactored into the eventbus crate so that all other
clients can implement their handlers the same way
2020-01-11 13:03:37 -08:00
R Tyler Croy 044abc397f
Deserialize messages from the eventbus 2020-01-11 12:38:35 -08:00
R Tyler Croy b1260df2cf
Switch back to actix-web-actors for the eventbus client, again. >_<
I originally tossed this approach since I believed that the awc +
actix-web-actors approach was unnecessarily complex. After experimenting a bit
with tungstenite, I believe that converting its high-level API into something
that I could work with in an asynchronous manner would require too much poking
around with tokio to be worth the effort.

I suppose I'll have to make do with the somewhat mid-level abstraction that
actix-web-actors provides me.
2020-01-11 12:38:34 -08:00
R Tyler Croy eb75d61259
cargo fmt 2020-01-11 12:38:33 -08:00
R Tyler Croy 9b1df700de
Clean up a little bit of error handling in the eventbus CLI 2020-01-11 11:17:58 -08:00
R Tyler Croy a5ae7e15a8
Begin working on the per-client inbox functionality
There's some refactoring I need to do in the server/eventbus module before
continuing, but this contains the initial cut of code necessary to create a new
inbox when a client connects that will map to that client name
2020-01-11 11:17:56 -08:00
18 changed files with 822 additions and 150 deletions

45
Cargo.lock generated
View File

@ -1310,7 +1310,9 @@ name = "otto-auctioneer"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1318,8 +1320,6 @@ dependencies = [
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1327,13 +1327,17 @@ name = "otto-eventbus"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1483,6 +1487,21 @@ dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "processor-travis-ci"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"otto-eventbus 0.1.0",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "quick-error"
version = "1.2.2"
@ -1814,6 +1833,17 @@ dependencies = [
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_yaml"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sha-1"
version = "0.8.1"
@ -2112,6 +2142,15 @@ name = "utf8parse"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "vcpkg"
version = "0.2.8"
@ -2430,6 +2469,7 @@ dependencies = [
"checksum serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)" = "48c575e0cc52bdd09b47f330f646cf59afc586e9c4e3ccd6fc1f625b8ea1dad7"
"checksum serde_test 0.8.23 (registry+https://github.com/rust-lang/crates.io-index)" = "110b3dbdf8607ec493c22d5d947753282f3bae73c0f56d322af1e8c78e4c23d5"
"checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
"checksum serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)" = "691b17f19fc1ec9d94ec0b5864859290dff279dbd7b03f017afda54eb36c3c35"
"checksum sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "23962131a91661d643c98940b20fcaffe62d776a823247be80a48fcb8b6fce68"
"checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
"checksum shellexpand 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a23aed0018ea07316c7826ade41e551772031cf652805f93ebc922215a44d4a"
@ -2463,6 +2503,7 @@ dependencies = [
"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61"
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
"checksum utf8parse 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8772a4ccbb4e89959023bc5b7cb8623a795caa7092d99f3aa9501b9484d4557d"
"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"

View File

@ -3,4 +3,5 @@
members = [
"auctioneer",
"eventbus",
"processors/travis-ci",
]

View File

@ -7,11 +7,9 @@ edition = "2018"
[dependencies]
# Needed for the actor system
actix = "~0.9.0"
actix-http = "~1.0.1"
actix-rt = "~1.0.0"
# Needed for websockets
tungstenite = "~0.9.2"
url = "~2.1.0"
actix-web = "~2.0.0"
# Handling command line options
clap = { version = "~2.33.0", features = ["yaml"] }
@ -27,4 +25,3 @@ serde = { version = "~1.0.103", features = ["rc"] }
serde_json = "~1.0.0"
otto-eventbus = { path = "../eventbus" }

View File

@ -2,69 +2,57 @@
* The auctioneer main model
*/
extern crate actix;
extern crate actix_http;
extern crate actix_web;
extern crate pretty_env_logger;
extern crate tungstenite;
use std::time::Duration;
use std::{io, thread};
use actix::*;
use log::*;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
use tungstenite::*;
use url::Url;
use actix_web::{middleware, web};
use actix_web::{App, HttpResponse, HttpServer};
use log::debug;
use otto_eventbus::client::*;
use otto_eventbus::*;
struct BusClient {}
impl Actor for BusClient {
type Context = SyncContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Starting client");
}
}
/**
* Simple websocket connection function, just to allow for easy reconnections
* The index handler for the root of the Auctioneer web interface
*/
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
async fn route_index() -> HttpResponse {
HttpResponse::Ok().body("Auctioneer")
}
#[actix_rt::main]
async fn main() {
async fn main() -> std::io::Result<()> {
pretty_env_logger::init();
let (mut socket, _response) = ws_connect().unwrap();
let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await;
debug!("Client created: {:?}", client);
info!("Connected to the server");
HttpServer::new(move || {
App::new()
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::default())
.route("/", web::get().to(route_index))
})
.bind("127.0.0.1:8001")?
.run()
.await
}
/*
* In this loop we should read the message, and then dispatch it to a handler actor immediately
* to do "the work"
#[cfg(test)]
mod test {
use super::*;
use actix_web::{test, web, App};
/**
* This test just ensures that the server can come online properly and render its index handler
* properly.
*/
loop {
let msg = socket.read_message();
let msg = match msg {
Ok(m) => m,
Err(e) => {
error!("Failed to read a message off the WebSocket: {:?}", e);
// must reconnect
thread::sleep(Duration::from_secs(3));
#[actix_rt::test]
async fn test_basic_http() {
let srv = test::start(move || App::new().route("/", web::get().to(route_index)));
/*
* Ignore connection errors, they're not important since we
* are in the retry loop anyways.
*/
if let Ok(recon) = ws_connect() {
info!("Server reconnected");
socket = recon.0;
}
continue;
}
};
info!("Received: {}", msg);
let req = srv.get("/");
let response = req.send().await.unwrap();
assert!(response.status().is_success());
}
}

View File

@ -10,7 +10,7 @@ path = "src/server/main.rs"
[[bin]]
name = "otto-ebc"
path = "src/cli.rs"
path = "src/cli/main.rs"
[dependencies]
# Actix provides the web and websocket basis we sit on top of
@ -47,5 +47,12 @@ url = "~2.1.0"
rustyline = "~5.0.6"
# Used for the websocket client
actix-codec = "~0.2.0"
awc = "~1.0.1"
futures = "~0.3.1"
bytes = "~0.5.3"
[dev-dependencies]
regex = "1"

View File

@ -1,52 +0,0 @@
/**
* The CLI is meant to be used for manual testing and verification of the eventbus only.
*/
extern crate rustyline;
use std::io::{stdin, stdout, Write};
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tungstenite::*;
use url::Url;
fn main() {
let (mut socket, response) =
connect(Url::parse("ws://localhost:8000/ws/").unwrap()).expect("Failed to connect");
println!("Connected to the server");
// `()` can be used when no completer is required
let mut rl = Editor::<()>::new();
let history = ".otto-ebc-history";
if rl.load_history(history).is_err() {
println!("No previous history");
}
loop {
let readline = rl.readline(">> ");
match readline {
Ok(line) => {
if line.len() == 0 {
let msg = socket.read_message().expect("Failed to read message");
println!("Received: {}", msg);
} else {
socket.write_message(Message::Text(line));
}
},
Err(ReadlineError::Interrupted) => {
// ctrl-C
break
},
Err(ReadlineError::Eof) => {
// ctrl-D
break
},
Err(err) => {
println!("Error: {:?}", err);
break
}
}
}
rl.save_history(history).unwrap();
}

70
eventbus/src/cli/main.rs Normal file
View File

@ -0,0 +1,70 @@
/**
* The CLI is meant to be used for manual testing and verification of the eventbus only.
*/
extern crate rustyline;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tungstenite::client::AutoStream;
use tungstenite::handshake::client::Response;
use tungstenite::*;
use url::Url;
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
}
fn main() {
let (mut socket, _response) = ws_connect().unwrap();
println!("Connected to the server");
// `()` can be used when no completer is required
let mut rl = Editor::<()>::new();
let history = ".otto-ebc-history";
if rl.load_history(history).is_err() {
println!("No previous history");
}
loop {
let readline = rl.readline(">> ");
match readline {
Ok(line) => {
if line.len() == 0 {
let response = socket.read_message();
match response {
Ok(msg) => {
println!("{}", msg);
}
Err(e) => {
println!("Failed to read: {}", e);
}
}
} else {
if line == "quit" {
return;
}
match socket.write_message(Message::Text(line)) {
Ok(_) => {}
Err(e) => {
println!("Failed to write: {}", e);
}
}
}
}
Err(ReadlineError::Interrupted) => {
// ctrl-C
break;
}
Err(ReadlineError::Eof) => {
// ctrl-D
break;
}
Err(err) => {
println!("Error: {:?}", err);
break;
}
}
}
rl.save_history(history).unwrap();
}

186
eventbus/src/client.rs Normal file
View File

@ -0,0 +1,186 @@
/**!
* This module is the eventbus client
*/
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use bytes::Bytes;
use futures::stream::{SplitSink, StreamExt};
use log::{error, info};
use std::time::Duration;
use crate::*;
/**
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
* the eventbus.
*/
pub struct EventBusClient {
/**
* The sink is a writable object which can send messages back to the EventBus
*/
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
/**
* String identifier for the client
*
* This should be persisted between invocations of the process if the client
* should have any semblence of persistence with its messages
*/
id: &'static str,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct Disconnect;
/**
* Implementation of the Debug trait so that the EventBusClient can be included
* in logging statements and print each instance's `id`
*/
impl std::fmt::Debug for EventBusClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EventBusClient<id:{}>", self.id)
}
}
/**
* connect will create begin the connection process and start the EventBusClient
* actor.
*
* The caller will need to await in order to get the EventBusClient
*/
pub async fn connect(ws: &'static str, id: &'static str) -> Addr<EventBusClient> {
let mut backoff = 1;
loop {
let r = Client::new().ws(ws).connect().await;
match r {
Ok((response, framed)) => {
let (sink, stream) = framed.split();
return EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: id,
}
});
}
Err(e) => {
error!("Failed establish WebSocket: {}", e);
backoff = backoff + backoff;
std::thread::sleep(Duration::from_secs(backoff));
}
}
}
}
impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.sink
.write(Message::Ping(Bytes::from_static(b"")))
.unwrap();
act.hb(ctx);
// client should also check for a timeout here, similar to the
// server code
});
}
}
impl Actor for EventBusClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let input = InputMessage {
meta: Meta::default(),
msg: Input::Connect {
name: self.id.to_string(),
},
};
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
info!("Disconnected");
}
}
impl Handler<Disconnect> for EventBusClient {
type Result = ();
fn handle(&mut self, _: Disconnect, ctx: &mut Context<Self>) {
ctx.stop()
}
}
impl Handler<InputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, input: InputMessage, _ctx: &mut Context<Self>) {
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
}
}
impl Handler<OutputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
/*
* For heartbeats we really don't need to do anything
*/
}
}
/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
if let Ok(Frame::Text(txt)) = msg {
/*
* We have received _some_ message from the eventbus, let's try to
* decode it!
*/
let msg = serde_json::from_slice::<OutputMessage>(&txt);
match msg {
Ok(output) => {
/*
* Dispatch the message basically back to ourself for easy
* handling
*/
ctx.address().do_send(output);
}
Err(e) => {
error!("Received invalid message: {}", e);
}
}
}
}
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("Connected");
}
fn finished(&mut self, ctx: &mut Context<Self>) {
info!("Server disconnected");
ctx.stop()
}
}
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {}

View File

@ -12,6 +12,8 @@ use serde_json::Value;
use std::sync::Arc;
pub mod client;
/**
* Default function for deserialize/serialize of times, always defaults to 1970-01-01
*/
@ -20,6 +22,14 @@ fn epoch() -> DateTime<Utc> {
return Utc.ymd(1970, 1, 1).and_hms_milli(0, 0, 1, 444);
}
/**
* Default function for the deserialization of an empty channel, will just create an empty string
*/
fn default_channel() -> String {
return "".to_owned();
}
/**
* The `Meta` struct contains the necessary metadata about a message which is being sent over the
* wire
@ -29,11 +39,24 @@ fn epoch() -> DateTime<Utc> {
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct Meta {
#[serde(default = "default_channel")]
pub channel: String,
#[serde(default = "epoch")]
pub ts: DateTime<Utc>,
}
impl Meta {
/**
* Construct a Meta struct with the current time
*/
pub fn new(channel: String) -> Self {
Meta {
channel: channel,
ts: Utc::now(),
}
}
}
/**
* The Output enums are all meant to capture the types of messages which can be received from the
* eventbus.
@ -43,7 +66,10 @@ pub struct Meta {
#[rtype(result = "()")]
pub enum Output {
Heartbeat,
Message { payload: Value },
Message {
#[serde(default)]
payload: Value,
},
}
/**
@ -71,6 +97,13 @@ pub struct OutputMessage {
#[serde(tag = "type", rename_all = "camelCase")]
#[rtype(result = "()")]
pub enum Input {
/**
* A Connect message must be sent for the client to start receiving messages
*
* This message instructs the eventbus to subscribe the client to the "all"
* channel and its own private inbox channel
*/
Connect { name: String },
/**
* A Subscribe message must be sent for each channel the client wishes to subscribe to.
*
@ -113,3 +146,34 @@ pub struct InputMessage {
pub msg: Input,
pub meta: Meta,
}
/**
* The Default trait for `Meta` will create a functionally empty Meta struct
*/
impl Default for Meta {
fn default() -> Meta {
Meta {
channel: default_channel(),
ts: epoch(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::Utc;
#[test]
fn test_default_for_meta() {
let m = Meta::default();
assert_eq!(m.channel, "");
assert!(m.ts < Utc::now());
}
#[test]
fn test_new_meta() {
let m = Meta::new("foo".to_string());
assert_eq!(m.channel, "foo");
}
}

View File

@ -6,13 +6,12 @@
use actix::*;
use actix_web_actors::ws;
use chrono::prelude::Utc;
use log::{error, info};
use log::{error, info, trace};
use serde_json;
use std::sync::Arc;
use crate::*;
use otto_eventbus::*;
/*
* Define the Websocket Actor needed for Actix
@ -24,12 +23,55 @@ pub struct WSClient {
events: Addr<eventbus::EventBus>,
}
/**
* The WSClient is an actor responsible for the stateful behavior of each websocket
* connected client
*/
impl WSClient {
/**
* Construct the WSClient with the given EventBus actor to communicate with
*/
pub fn new(eb: Addr<eventbus::EventBus>) -> Self {
Self { events: eb }
}
fn handle_text(&self, text: String, ctx: &<WSClient as Actor>::Context) {
/**
* handle_connect() takes care of the Input::Connect message and will ensure
* that the client is connected to the "all" channel as well as its inbox"
*/
fn handle_connect(&self, client_name: String, ctx: &mut <WSClient as Actor>::Context) {
self.events.do_send(eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
});
/*
* Both of these message sends are creating their own String to represent
* the inbox channel.
*
* This SHOULD be fixed, but will require some more thinking about how
* the lifetimes of Channel objects should work
*/
self.events.do_send(eventbus::CreateChannel {
channel: eventbus::Channel {
name: format!("inbox.{}", client_name),
stateful: true,
},
});
self.events.do_send(eventbus::Subscribe {
to: format!("inbox.{}", client_name),
addr: ctx.address(),
});
}
/**
* handle_text handles _all_ incoming messages from the websocket connection,
* it is responsible for translating those JSON messages into something the
* eventbus can pass around internally
*/
fn handle_text(&self, text: String, ctx: &mut <WSClient as Actor>::Context) {
let command = serde_json::from_str::<InputMessage>(&text);
match command {
@ -38,6 +80,11 @@ impl WSClient {
match c {
InputMessage { msg, meta } => {
match msg {
Input::Connect { name } => {
info!("Received connect for client named: {}", name);
self.handle_connect(name, ctx)
}
Input::Publish { payload } => {
info!("received publish: {:?}", payload);
self.events.do_send(eventbus::Event {
@ -95,27 +142,13 @@ impl Handler<eventbus::Event> for WSClient {
*/
impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let sub = eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
};
self.events
.send(sub)
.into_actor(self)
.then(|result, _actor, ctx| {
match result {
Ok(_) => (),
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
}
/// Handler for ws::Message message
/**
* Handler for the ws::Message message for the WSClient actor
*
* This handler will be called for every message from a websocket client
*/
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
@ -125,7 +158,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
}
Ok(msg) => msg,
};
info!("WebSocket received: {:?}", msg);
trace!("WebSocket message received: {:?}", msg);
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => self.handle_text(text, ctx),
@ -134,3 +167,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
}
}
}
#[cfg(test)]
mod test {}

View File

@ -18,6 +18,32 @@ use crate::*;
*/
type ClientId = Addr<connection::WSClient>;
/**
* The Channel struct is used as an internal representation of each channel that
* the eventbus knows about.
*
* Channels may be either stateless or stateful, with the ladder implying persistence
* guarantees, depending on the eventbus' backing implementation.
*/
#[derive(Clone, Debug, Eq)]
pub struct Channel {
pub name: String,
pub stateful: bool,
}
/**
* The CreateChannel message is only meant to be used by internal components of
* the eventbus.
*
* It is used primarily for creating new channels on-demand, such as those needed
* for new client inboxes
*/
#[derive(Message)]
#[rtype(result = "()")]
pub struct CreateChannel {
pub channel: Channel,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe {
@ -39,23 +65,17 @@ pub struct Unsubscribe {
}
/**
* The Channel struct is used as an internal representation of each channel that
* the eventbus knows about.
*
* Channels may be either stateless or stateful, with the ladder implying persistence
* guarantees, depending on the eventbus' backing implementation.
* Implementation of the Hash trait for Channel ensures that it can be placed in a HashSet
*/
#[derive(Debug, Eq)]
pub struct Channel {
name: String,
stateful: bool,
}
impl Hash for Channel {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
/**
* Implementation of PartialEq trait for Channel ensures that it can be placed in a HashSet
*/
impl PartialEq for Channel {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
@ -70,6 +90,10 @@ pub struct EventBus {
channels: HashMap<Channel, HashSet<ClientId>>,
}
/**
*
* The Actor trait for the Eventbus allows it to act as an actor in the actix system
*/
impl Actor for EventBus {
type Context = Context<Self>;
@ -107,16 +131,26 @@ impl EventBus {
}
}
impl Handler<CreateChannel> for EventBus {
type Result = ();
fn handle(&mut self, create: CreateChannel, _: &mut Context<Self>) {
self.channels.insert(create.channel, HashSet::new());
}
}
impl Handler<Subscribe> for EventBus {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
info!("Subscribing client to {}", msg.to);
// The stateful field doesn't matter here because the hashing on the
// HashMap only applies to the name of the channel
let ch = Channel {
name: msg.to,
stateful: false,
};
if self.channels.contains_key(&ch) {
match self.channels.get_mut(&ch) {
Some(set) => {
@ -140,6 +174,7 @@ impl Handler<Event> for EventBus {
name: ev.channel.to_string(),
stateful: false,
};
if self.channels.contains_key(&ch) {
if let Some(clients) = self.channels.get(&ch) {
/*
@ -162,3 +197,18 @@ impl Handler<Event> for EventBus {
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_with_channels_empty() {
let _bus = EventBus::with_channels(vec![], vec![]);
}
#[test]
fn test_with_channels_stateless() {
let _bus = EventBus::with_channels(vec![String::from("test")], vec![]);
}
}

View File

@ -3,8 +3,8 @@
* server loop that the eventbus uses.
*/
extern crate actix;
extern crate actix_web;
extern crate actix_http;
extern crate actix_web;
extern crate config;
extern crate log;
extern crate pretty_env_logger;
@ -159,7 +159,6 @@ async fn main() -> std::io::Result<()> {
.await
}
#[cfg(test)]
mod test {
use super::*;
@ -183,10 +182,10 @@ mod test {
};
let wd = web::Data::new(state);
let srv = test::start(move || {
App::new()
.app_data(wd.clone())
.route("/", web::get().to(index))
});
App::new()
.app_data(wd.clone())
.route("/", web::get().to(index))
});
let req = srv.get("/");
let mut response = req.send().await.unwrap();
@ -199,6 +198,9 @@ mod test {
let matches = re.captures(&buffer).unwrap();
let version = matches.get(1).unwrap();
assert_eq!(version.as_str(), format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown")));
assert_eq!(
version.as_str(),
format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))
);
}
}

6
processors/README.adoc Normal file
View File

@ -0,0 +1,6 @@
= Otto Processors
Processors in Otto take configuration files, e.g. `.travis-ci.yml`,
`Jenkinsfile`, and convert them into the internal Otto constructs necessary for
task execution.

1
processors/travis-ci/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target/

View File

@ -0,0 +1,24 @@
[package]
name = "processor-travis-ci"
version = "0.1.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = "~1.0.103"
serde_json = "~1.0.0"
serde_yaml = "~0.8.11"
actix = "~0.9.0"
log = "~0.4.8"
pretty_env_logger = "~0.3.1"
# Handling command line options
clap = { version = "~2.33.0", features = ["yaml"] }
# Needed for generating uuids
uuid = { version = "~0.8.1", features = ["serde", "v4"] }
otto-eventbus = { path = "../../eventbus" }

View File

@ -0,0 +1,41 @@
#
# This is an example of what the example.yml should convert to in terms of
# Otto's internal task execution structure
#
---
meta:
# Tasks are a discrete higher level concept, which can be mapped to resources
# for execution
tasks:
- id: 0x1
capabilities:
# "sudo false" basically means that the Travis workload doens't need a full
# virtual machine in order to operate
docker_run: true
# Operations should all exist in a single task and therefore run on a
# single colocated resource that has been allocated underneath
ops:
- id: 1
# Contexts are typically going to carry environment variables and other
# things, in the simple Travis CI example, there's really a single
# "stage" (to use Jenkins terms) in which all scripts will be executed
type: BEGINCTX
data:
name: 'Travis'
- id: 2
type: RUNPROC
data:
script: 'echo "Hello World"'
env:
# The Travis processor should set a default timeout
timeout_s: 300
- id: 3
type: RUNPROC
data:
script: 'env'
env:
timeout_s: 300
- id: 4
type: ENDCTX
data:
name: 'Travis'

View File

@ -0,0 +1,10 @@
#
# This is an example of a _very_ simple .travis-ci.yml file.
---
sudo: false
script:
- echo "Hello World"
- env

View File

@ -0,0 +1,200 @@
extern crate actix;
extern crate clap;
extern crate pretty_env_logger;
extern crate serde;
extern crate serde_yaml;
extern crate uuid;
use actix::*;
use clap::{App, Arg};
use log::*;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use uuid::Uuid;
use std::collections::HashMap;
use std::fs;
use otto_eventbus::client::*;
use otto_eventbus::*;
fn main() {
pretty_env_logger::init();
let matches = App::new("travis processor")
.arg(
Arg::with_name("filename")
.short("f")
.long("filename")
.value_name("FILE")
.required(true)
.help("File")
.takes_value(true),
)
.get_matches();
let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml");
let contents = fs::read_to_string(filename).expect("Something went wrong reading the file");
let pipeline = serde_yaml::from_str::<TravisConfig>(&contents)
.expect("Failed to deserialize the yaml file into a TravisConfig");
let mut output = PipelineManifest { tasks: vec![] };
let mut caps = HashMap::new();
if pipeline.sudo {
caps.insert("docker_run".to_string(), Value::Bool(false));
} else {
caps.insert("docker_run".to_string(), Value::Bool(true));
}
let mut task = Task {
id: Uuid::new_v4().to_string(),
capabilities: caps,
ops: vec![],
};
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::BeginContext,
// Cheap quick hack to get a simple hashmap here
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
});
for script in pipeline.script.iter() {
let mut data = HashMap::new();
data.insert("script".to_string(), Value::String(script.to_string()));
data.insert(
"timeout_s".to_string(),
Value::Number(serde_yaml::Number::from(300)),
);
data.insert("env".to_string(), Value::Null);
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::RunProcess,
data,
});
}
task.ops.push(Op {
id: Uuid::new_v4().to_string(),
op_type: OpType::EndContext,
// Cheap quick hack to get a simple hashmap here
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
});
output.tasks.push(task);
info!(
"{}",
serde_yaml::to_string(&output).expect("Failed to serialize manifest")
);
let sys = System::new("name");
Arbiter::spawn(async {
let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await;
info!("Client created: {:?}", client);
let input = InputMessage {
meta: Meta::new("tasks.for_auction".to_string()),
msg: Input::Publish {
payload: serde_json::to_value(output).unwrap(),
},
};
client.do_send(input);
/*
* Disconnecting as soon as we send Input doesn't work well, because the client seems to
* terminate before it can actually send messages over.
*/
//client.do_send(Disconnect {});
//info!("Disconnected");
//System::current().stop();
});
sys.run().unwrap();
}
#[derive(Deserialize, Debug, Serialize)]
struct TravisConfig {
sudo: bool,
script: Vec<String>,
}
#[derive(Deserialize, Debug, PartialEq, Serialize)]
enum OpType {
#[serde(rename = "BEGINCTX")]
BeginContext,
#[serde(rename = "RUNPROC")]
RunProcess,
#[serde(rename = "ENDCTX")]
EndContext,
}
#[derive(Deserialize, Debug, Serialize)]
struct Op {
id: String,
#[serde(rename = "type")]
op_type: OpType,
data: HashMap<String, Value>,
}
#[derive(Deserialize, Debug, Serialize)]
struct Task {
id: String,
capabilities: HashMap<String, Value>,
ops: Vec<Op>,
}
#[derive(Deserialize, Debug, Serialize)]
struct PipelineManifest {
// meta: Meta,
tasks: Vec<Task>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deser_simple() {
let yaml = r#"
---
sudo: false
script:
- echo "Hello World"
- env
"#;
let c = serde_yaml::from_str::<TravisConfig>(yaml).unwrap();
assert!(!c.sudo);
assert_eq!(c.script.len(), 2);
}
#[test]
fn deser_yaml() {
let yaml = r#"
---
meta:
tasks:
- id: 0x1
capabilities:
docker_run: true
ops:
- id: 1
type: BEGINCTX
data:
name: 'Travis'
- id: 2
type: RUNPROC
data:
script: 'echo "Hello World"'
env:
timeout_s: 300
- id: 3
type: ENDCTX
data:
name: 'Travis'
"#;
let c = serde_yaml::from_str::<PipelineManifest>(yaml).unwrap();
assert_eq!(c.tasks.len(), 1);
assert_eq!(c.tasks[0].ops.len(), 3);
assert_eq!(c.tasks[0].ops[0].op_type, OpType::BeginContext);
}
}