Provide the basics for an EventBusClient #2
|
@ -1310,7 +1310,9 @@ name = "otto-auctioneer"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1318,8 +1320,6 @@ dependencies = [
|
|||
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tungstenite 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1327,13 +1327,17 @@ name = "otto-eventbus"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1483,6 +1487,21 @@ dependencies = [
|
|||
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "processor-travis-ci"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"otto-eventbus 0.1.0",
|
||||
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-error"
|
||||
version = "1.2.2"
|
||||
|
@ -1814,6 +1833,17 @@ dependencies = [
|
|||
"url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_yaml"
|
||||
version = "0.8.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha-1"
|
||||
version = "0.8.1"
|
||||
|
@ -2112,6 +2142,15 @@ name = "utf8parse"
|
|||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.8"
|
||||
|
@ -2430,6 +2469,7 @@ dependencies = [
|
|||
"checksum serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)" = "48c575e0cc52bdd09b47f330f646cf59afc586e9c4e3ccd6fc1f625b8ea1dad7"
|
||||
"checksum serde_test 0.8.23 (registry+https://github.com/rust-lang/crates.io-index)" = "110b3dbdf8607ec493c22d5d947753282f3bae73c0f56d322af1e8c78e4c23d5"
|
||||
"checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
|
||||
"checksum serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)" = "691b17f19fc1ec9d94ec0b5864859290dff279dbd7b03f017afda54eb36c3c35"
|
||||
"checksum sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "23962131a91661d643c98940b20fcaffe62d776a823247be80a48fcb8b6fce68"
|
||||
"checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
|
||||
"checksum shellexpand 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a23aed0018ea07316c7826ade41e551772031cf652805f93ebc922215a44d4a"
|
||||
|
@ -2463,6 +2503,7 @@ dependencies = [
|
|||
"checksum url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "75b414f6c464c879d7f9babf951f23bc3743fb7313c081b2e6ca719067ea9d61"
|
||||
"checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
|
||||
"checksum utf8parse 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8772a4ccbb4e89959023bc5b7cb8623a795caa7092d99f3aa9501b9484d4557d"
|
||||
"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
|
||||
"checksum vcpkg 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168"
|
||||
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
|
||||
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
|
||||
|
|
|
@ -3,4 +3,5 @@
|
|||
members = [
|
||||
"auctioneer",
|
||||
"eventbus",
|
||||
"processors/travis-ci",
|
||||
]
|
||||
|
|
|
@ -7,11 +7,9 @@ edition = "2018"
|
|||
[dependencies]
|
||||
# Needed for the actor system
|
||||
actix = "~0.9.0"
|
||||
actix-http = "~1.0.1"
|
||||
actix-rt = "~1.0.0"
|
||||
|
||||
# Needed for websockets
|
||||
tungstenite = "~0.9.2"
|
||||
url = "~2.1.0"
|
||||
actix-web = "~2.0.0"
|
||||
|
||||
# Handling command line options
|
||||
clap = { version = "~2.33.0", features = ["yaml"] }
|
||||
|
@ -27,4 +25,3 @@ serde = { version = "~1.0.103", features = ["rc"] }
|
|||
serde_json = "~1.0.0"
|
||||
|
||||
otto-eventbus = { path = "../eventbus" }
|
||||
|
||||
|
|
|
@ -2,69 +2,57 @@
|
|||
* The auctioneer main model
|
||||
*/
|
||||
extern crate actix;
|
||||
extern crate actix_http;
|
||||
extern crate actix_web;
|
||||
extern crate pretty_env_logger;
|
||||
extern crate tungstenite;
|
||||
use std::time::Duration;
|
||||
use std::{io, thread};
|
||||
|
||||
use actix::*;
|
||||
use log::*;
|
||||
use tungstenite::client::AutoStream;
|
||||
use tungstenite::handshake::client::Response;
|
||||
use tungstenite::*;
|
||||
use url::Url;
|
||||
use actix_web::{middleware, web};
|
||||
use actix_web::{App, HttpResponse, HttpServer};
|
||||
use log::debug;
|
||||
|
||||
use otto_eventbus::client::*;
|
||||
use otto_eventbus::*;
|
||||
|
||||
struct BusClient {}
|
||||
|
||||
impl Actor for BusClient {
|
||||
type Context = SyncContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
info!("Starting client");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple websocket connection function, just to allow for easy reconnections
|
||||
* The index handler for the root of the Auctioneer web interface
|
||||
*/
|
||||
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
|
||||
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
|
||||
async fn route_index() -> HttpResponse {
|
||||
HttpResponse::Ok().body("Auctioneer")
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() {
|
||||
async fn main() -> std::io::Result<()> {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let (mut socket, _response) = ws_connect().unwrap();
|
||||
let client = connect("http://127.0.0.1:8000/ws/", "auctioneer").await;
|
||||
debug!("Client created: {:?}", client);
|
||||
|
||||
info!("Connected to the server");
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::Compress::default())
|
||||
.wrap(middleware::Logger::default())
|
||||
.route("/", web::get().to(route_index))
|
||||
})
|
||||
.bind("127.0.0.1:8001")?
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/*
|
||||
* In this loop we should read the message, and then dispatch it to a handler actor immediately
|
||||
* to do "the work"
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use actix_web::{test, web, App};
|
||||
|
||||
/**
|
||||
* This test just ensures that the server can come online properly and render its index handler
|
||||
* properly.
|
||||
*/
|
||||
loop {
|
||||
let msg = socket.read_message();
|
||||
let msg = match msg {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
error!("Failed to read a message off the WebSocket: {:?}", e);
|
||||
// must reconnect
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
#[actix_rt::test]
|
||||
async fn test_basic_http() {
|
||||
let srv = test::start(move || App::new().route("/", web::get().to(route_index)));
|
||||
|
||||
/*
|
||||
* Ignore connection errors, they're not important since we
|
||||
* are in the retry loop anyways.
|
||||
*/
|
||||
if let Ok(recon) = ws_connect() {
|
||||
info!("Server reconnected");
|
||||
socket = recon.0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
info!("Received: {}", msg);
|
||||
let req = srv.get("/");
|
||||
let response = req.send().await.unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ path = "src/server/main.rs"
|
|||
|
||||
[[bin]]
|
||||
name = "otto-ebc"
|
||||
path = "src/cli.rs"
|
||||
path = "src/cli/main.rs"
|
||||
|
||||
[dependencies]
|
||||
# Actix provides the web and websocket basis we sit on top of
|
||||
|
@ -47,5 +47,12 @@ url = "~2.1.0"
|
|||
rustyline = "~5.0.6"
|
||||
|
||||
|
||||
# Used for the websocket client
|
||||
actix-codec = "~0.2.0"
|
||||
awc = "~1.0.1"
|
||||
futures = "~0.3.1"
|
||||
bytes = "~0.5.3"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
regex = "1"
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* The CLI is meant to be used for manual testing and verification of the eventbus only.
|
||||
*/
|
||||
|
||||
extern crate rustyline;
|
||||
|
||||
use std::io::{stdin, stdout, Write};
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::Editor;
|
||||
use tungstenite::*;
|
||||
use url::Url;
|
||||
|
||||
fn main() {
|
||||
let (mut socket, response) =
|
||||
connect(Url::parse("ws://localhost:8000/ws/").unwrap()).expect("Failed to connect");
|
||||
println!("Connected to the server");
|
||||
|
||||
// `()` can be used when no completer is required
|
||||
let mut rl = Editor::<()>::new();
|
||||
let history = ".otto-ebc-history";
|
||||
|
||||
if rl.load_history(history).is_err() {
|
||||
println!("No previous history");
|
||||
}
|
||||
|
||||
loop {
|
||||
let readline = rl.readline(">> ");
|
||||
match readline {
|
||||
Ok(line) => {
|
||||
if line.len() == 0 {
|
||||
let msg = socket.read_message().expect("Failed to read message");
|
||||
println!("Received: {}", msg);
|
||||
} else {
|
||||
socket.write_message(Message::Text(line));
|
||||
}
|
||||
},
|
||||
Err(ReadlineError::Interrupted) => {
|
||||
// ctrl-C
|
||||
break
|
||||
},
|
||||
Err(ReadlineError::Eof) => {
|
||||
// ctrl-D
|
||||
break
|
||||
},
|
||||
Err(err) => {
|
||||
println!("Error: {:?}", err);
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
rl.save_history(history).unwrap();
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* The CLI is meant to be used for manual testing and verification of the eventbus only.
|
||||
*/
|
||||
extern crate rustyline;
|
||||
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::Editor;
|
||||
use tungstenite::client::AutoStream;
|
||||
use tungstenite::handshake::client::Response;
|
||||
use tungstenite::*;
|
||||
use url::Url;
|
||||
|
||||
fn ws_connect() -> Result<(WebSocket<AutoStream>, Response)> {
|
||||
return connect(Url::parse("ws://localhost:8000/ws/").unwrap());
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let (mut socket, _response) = ws_connect().unwrap();
|
||||
println!("Connected to the server");
|
||||
|
||||
// `()` can be used when no completer is required
|
||||
let mut rl = Editor::<()>::new();
|
||||
let history = ".otto-ebc-history";
|
||||
|
||||
if rl.load_history(history).is_err() {
|
||||
println!("No previous history");
|
||||
}
|
||||
|
||||
loop {
|
||||
let readline = rl.readline(">> ");
|
||||
match readline {
|
||||
Ok(line) => {
|
||||
if line.len() == 0 {
|
||||
let response = socket.read_message();
|
||||
match response {
|
||||
Ok(msg) => {
|
||||
println!("{}", msg);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Failed to read: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if line == "quit" {
|
||||
return;
|
||||
}
|
||||
match socket.write_message(Message::Text(line)) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("Failed to write: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(ReadlineError::Interrupted) => {
|
||||
// ctrl-C
|
||||
break;
|
||||
}
|
||||
Err(ReadlineError::Eof) => {
|
||||
// ctrl-D
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
println!("Error: {:?}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
rl.save_history(history).unwrap();
|
||||
}
|
|
@ -0,0 +1,186 @@
|
|||
/**!
|
||||
* This module is the eventbus client
|
||||
*/
|
||||
use actix::io::SinkWrite;
|
||||
use actix::*;
|
||||
use actix_codec::Framed;
|
||||
use awc::{
|
||||
error::WsProtocolError,
|
||||
ws::{Codec, Frame, Message},
|
||||
BoxedSocket, Client,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::{SplitSink, StreamExt};
|
||||
use log::{error, info};
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::*;
|
||||
|
||||
/**
|
||||
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
|
||||
* the eventbus.
|
||||
*/
|
||||
pub struct EventBusClient {
|
||||
/**
|
||||
* The sink is a writable object which can send messages back to the EventBus
|
||||
*/
|
||||
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
|
||||
/**
|
||||
* String identifier for the client
|
||||
*
|
||||
* This should be persisted between invocations of the process if the client
|
||||
* should have any semblence of persistence with its messages
|
||||
*/
|
||||
id: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Debug, Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Disconnect;
|
||||
|
||||
/**
|
||||
* Implementation of the Debug trait so that the EventBusClient can be included
|
||||
* in logging statements and print each instance's `id`
|
||||
*/
|
||||
impl std::fmt::Debug for EventBusClient {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "EventBusClient<id:{}>", self.id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* connect will create begin the connection process and start the EventBusClient
|
||||
* actor.
|
||||
*
|
||||
* The caller will need to await in order to get the EventBusClient
|
||||
*/
|
||||
pub async fn connect(ws: &'static str, id: &'static str) -> Addr<EventBusClient> {
|
||||
let mut backoff = 1;
|
||||
|
||||
loop {
|
||||
let r = Client::new().ws(ws).connect().await;
|
||||
|
||||
match r {
|
||||
Ok((response, framed)) => {
|
||||
let (sink, stream) = framed.split();
|
||||
|
||||
return EventBusClient::create(|ctx| {
|
||||
EventBusClient::add_stream(stream, ctx);
|
||||
EventBusClient {
|
||||
sink: SinkWrite::new(sink, ctx),
|
||||
id: id,
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed establish WebSocket: {}", e);
|
||||
backoff = backoff + backoff;
|
||||
std::thread::sleep(Duration::from_secs(backoff));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventBusClient {
|
||||
fn hb(&self, ctx: &mut Context<Self>) {
|
||||
ctx.run_later(Duration::new(1, 0), |act, ctx| {
|
||||
act.sink
|
||||
.write(Message::Ping(Bytes::from_static(b"")))
|
||||
.unwrap();
|
||||
act.hb(ctx);
|
||||
|
||||
// client should also check for a timeout here, similar to the
|
||||
// server code
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for EventBusClient {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Context<Self>) {
|
||||
let input = InputMessage {
|
||||
meta: Meta::default(),
|
||||
msg: Input::Connect {
|
||||
name: self.id.to_string(),
|
||||
},
|
||||
};
|
||||
self.sink
|
||||
.write(Message::Text(serde_json::to_string(&input).unwrap()))
|
||||
.unwrap();
|
||||
// start heartbeats otherwise server will disconnect after 10 seconds
|
||||
self.hb(ctx)
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _: &mut Context<Self>) {
|
||||
info!("Disconnected");
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Disconnect> for EventBusClient {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _: Disconnect, ctx: &mut Context<Self>) {
|
||||
ctx.stop()
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<InputMessage> for EventBusClient {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, input: InputMessage, _ctx: &mut Context<Self>) {
|
||||
self.sink
|
||||
.write(Message::Text(serde_json::to_string(&input).unwrap()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<OutputMessage> for EventBusClient {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
|
||||
/*
|
||||
* For heartbeats we really don't need to do anything
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle server websocket messages
|
||||
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
|
||||
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
|
||||
if let Ok(Frame::Text(txt)) = msg {
|
||||
/*
|
||||
* We have received _some_ message from the eventbus, let's try to
|
||||
* decode it!
|
||||
*/
|
||||
let msg = serde_json::from_slice::<OutputMessage>(&txt);
|
||||
match msg {
|
||||
Ok(output) => {
|
||||
/*
|
||||
* Dispatch the message basically back to ourself for easy
|
||||
* handling
|
||||
*/
|
||||
ctx.address().do_send(output);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received invalid message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn started(&mut self, _ctx: &mut Context<Self>) {
|
||||
info!("Connected");
|
||||
}
|
||||
|
||||
fn finished(&mut self, ctx: &mut Context<Self>) {
|
||||
info!("Server disconnected");
|
||||
ctx.stop()
|
||||
}
|
||||
}
|
||||
|
||||
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {}
|
|
@ -12,6 +12,8 @@ use serde_json::Value;
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub mod client;
|
||||
|
||||
/**
|
||||
* Default function for deserialize/serialize of times, always defaults to 1970-01-01
|
||||
*/
|
||||
|
@ -20,6 +22,14 @@ fn epoch() -> DateTime<Utc> {
|
|||
return Utc.ymd(1970, 1, 1).and_hms_milli(0, 0, 1, 444);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default function for the deserialization of an empty channel, will just create an empty string
|
||||
*/
|
||||
|
||||
fn default_channel() -> String {
|
||||
return "".to_owned();
|
||||
}
|
||||
|
||||
/**
|
||||
* The `Meta` struct contains the necessary metadata about a message which is being sent over the
|
||||
* wire
|
||||
|
@ -29,11 +39,24 @@ fn epoch() -> DateTime<Utc> {
|
|||
#[derive(Serialize, Deserialize, Debug, Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Meta {
|
||||
#[serde(default = "default_channel")]
|
||||
pub channel: String,
|
||||
#[serde(default = "epoch")]
|
||||
pub ts: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Meta {
|
||||
/**
|
||||
* Construct a Meta struct with the current time
|
||||
*/
|
||||
pub fn new(channel: String) -> Self {
|
||||
Meta {
|
||||
channel: channel,
|
||||
ts: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The Output enums are all meant to capture the types of messages which can be received from the
|
||||
* eventbus.
|
||||
|
@ -43,7 +66,10 @@ pub struct Meta {
|
|||
#[rtype(result = "()")]
|
||||
pub enum Output {
|
||||
Heartbeat,
|
||||
Message { payload: Value },
|
||||
Message {
|
||||
#[serde(default)]
|
||||
payload: Value,
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -71,6 +97,13 @@ pub struct OutputMessage {
|
|||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[rtype(result = "()")]
|
||||
pub enum Input {
|
||||
/**
|
||||
* A Connect message must be sent for the client to start receiving messages
|
||||
*
|
||||
* This message instructs the eventbus to subscribe the client to the "all"
|
||||
* channel and its own private inbox channel
|
||||
*/
|
||||
Connect { name: String },
|
||||
/**
|
||||
* A Subscribe message must be sent for each channel the client wishes to subscribe to.
|
||||
*
|
||||
|
@ -113,3 +146,34 @@ pub struct InputMessage {
|
|||
pub msg: Input,
|
||||
pub meta: Meta,
|
||||
}
|
||||
|
||||
/**
|
||||
* The Default trait for `Meta` will create a functionally empty Meta struct
|
||||
*/
|
||||
impl Default for Meta {
|
||||
fn default() -> Meta {
|
||||
Meta {
|
||||
channel: default_channel(),
|
||||
ts: epoch(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use chrono::Utc;
|
||||
|
||||
#[test]
|
||||
fn test_default_for_meta() {
|
||||
let m = Meta::default();
|
||||
assert_eq!(m.channel, "");
|
||||
assert!(m.ts < Utc::now());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_meta() {
|
||||
let m = Meta::new("foo".to_string());
|
||||
assert_eq!(m.channel, "foo");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,13 +6,12 @@
|
|||
use actix::*;
|
||||
use actix_web_actors::ws;
|
||||
use chrono::prelude::Utc;
|
||||
use log::{error, info};
|
||||
use log::{error, info, trace};
|
||||
use serde_json;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::*;
|
||||
use otto_eventbus::*;
|
||||
|
||||
/*
|
||||
* Define the Websocket Actor needed for Actix
|
||||
|
@ -24,12 +23,55 @@ pub struct WSClient {
|
|||
events: Addr<eventbus::EventBus>,
|
||||
}
|
||||
|
||||
/**
|
||||
* The WSClient is an actor responsible for the stateful behavior of each websocket
|
||||
* connected client
|
||||
*/
|
||||
impl WSClient {
|
||||
/**
|
||||
* Construct the WSClient with the given EventBus actor to communicate with
|
||||
*/
|
||||
pub fn new(eb: Addr<eventbus::EventBus>) -> Self {
|
||||
Self { events: eb }
|
||||
}
|
||||
|
||||
fn handle_text(&self, text: String, ctx: &<WSClient as Actor>::Context) {
|
||||
/**
|
||||
* handle_connect() takes care of the Input::Connect message and will ensure
|
||||
* that the client is connected to the "all" channel as well as its inbox"
|
||||
*/
|
||||
fn handle_connect(&self, client_name: String, ctx: &mut <WSClient as Actor>::Context) {
|
||||
self.events.do_send(eventbus::Subscribe {
|
||||
to: "all".to_owned(),
|
||||
addr: ctx.address(),
|
||||
});
|
||||
|
||||
/*
|
||||
* Both of these message sends are creating their own String to represent
|
||||
* the inbox channel.
|
||||
*
|
||||
* This SHOULD be fixed, but will require some more thinking about how
|
||||
* the lifetimes of Channel objects should work
|
||||
*/
|
||||
|
||||
self.events.do_send(eventbus::CreateChannel {
|
||||
channel: eventbus::Channel {
|
||||
name: format!("inbox.{}", client_name),
|
||||
stateful: true,
|
||||
},
|
||||
});
|
||||
|
||||
self.events.do_send(eventbus::Subscribe {
|
||||
to: format!("inbox.{}", client_name),
|
||||
addr: ctx.address(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* handle_text handles _all_ incoming messages from the websocket connection,
|
||||
* it is responsible for translating those JSON messages into something the
|
||||
* eventbus can pass around internally
|
||||
*/
|
||||
fn handle_text(&self, text: String, ctx: &mut <WSClient as Actor>::Context) {
|
||||
let command = serde_json::from_str::<InputMessage>(&text);
|
||||
|
||||
match command {
|
||||
|
@ -38,6 +80,11 @@ impl WSClient {
|
|||
match c {
|
||||
InputMessage { msg, meta } => {
|
||||
match msg {
|
||||
Input::Connect { name } => {
|
||||
info!("Received connect for client named: {}", name);
|
||||
self.handle_connect(name, ctx)
|
||||
}
|
||||
|
||||
Input::Publish { payload } => {
|
||||
info!("received publish: {:?}", payload);
|
||||
self.events.do_send(eventbus::Event {
|
||||
|
@ -95,27 +142,13 @@ impl Handler<eventbus::Event> for WSClient {
|
|||
*/
|
||||
impl Actor for WSClient {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
let sub = eventbus::Subscribe {
|
||||
to: "all".to_owned(),
|
||||
addr: ctx.address(),
|
||||
};
|
||||
self.events
|
||||
.send(sub)
|
||||
.into_actor(self)
|
||||
.then(|result, _actor, ctx| {
|
||||
match result {
|
||||
Ok(_) => (),
|
||||
_ => ctx.stop(),
|
||||
}
|
||||
fut::ready(())
|
||||
})
|
||||
.wait(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handler for ws::Message message
|
||||
/**
|
||||
* Handler for the ws::Message message for the WSClient actor
|
||||
*
|
||||
* This handler will be called for every message from a websocket client
|
||||
*/
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
let msg = match msg {
|
||||
|
@ -125,7 +158,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
|
|||
}
|
||||
Ok(msg) => msg,
|
||||
};
|
||||
info!("WebSocket received: {:?}", msg);
|
||||
trace!("WebSocket message received: {:?}", msg);
|
||||
match msg {
|
||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||
ws::Message::Text(text) => self.handle_text(text, ctx),
|
||||
|
@ -134,3 +167,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {}
|
||||
|
|
|
@ -18,6 +18,32 @@ use crate::*;
|
|||
*/
|
||||
type ClientId = Addr<connection::WSClient>;
|
||||
|
||||
/**
|
||||
* The Channel struct is used as an internal representation of each channel that
|
||||
* the eventbus knows about.
|
||||
*
|
||||
* Channels may be either stateless or stateful, with the ladder implying persistence
|
||||
* guarantees, depending on the eventbus' backing implementation.
|
||||
*/
|
||||
#[derive(Clone, Debug, Eq)]
|
||||
pub struct Channel {
|
||||
pub name: String,
|
||||
pub stateful: bool,
|
||||
}
|
||||
|
||||
/**
|
||||
* The CreateChannel message is only meant to be used by internal components of
|
||||
* the eventbus.
|
||||
*
|
||||
* It is used primarily for creating new channels on-demand, such as those needed
|
||||
* for new client inboxes
|
||||
*/
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct CreateChannel {
|
||||
pub channel: Channel,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Subscribe {
|
||||
|
@ -39,23 +65,17 @@ pub struct Unsubscribe {
|
|||
}
|
||||
|
||||
/**
|
||||
* The Channel struct is used as an internal representation of each channel that
|
||||
* the eventbus knows about.
|
||||
*
|
||||
* Channels may be either stateless or stateful, with the ladder implying persistence
|
||||
* guarantees, depending on the eventbus' backing implementation.
|
||||
* Implementation of the Hash trait for Channel ensures that it can be placed in a HashSet
|
||||
*/
|
||||
#[derive(Debug, Eq)]
|
||||
pub struct Channel {
|
||||
name: String,
|
||||
stateful: bool,
|
||||
}
|
||||
|
||||
impl Hash for Channel {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.name.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of PartialEq trait for Channel ensures that it can be placed in a HashSet
|
||||
*/
|
||||
impl PartialEq for Channel {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
|
@ -70,6 +90,10 @@ pub struct EventBus {
|
|||
channels: HashMap<Channel, HashSet<ClientId>>,
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* The Actor trait for the Eventbus allows it to act as an actor in the actix system
|
||||
*/
|
||||
impl Actor for EventBus {
|
||||
type Context = Context<Self>;
|
||||
|
||||
|
@ -107,16 +131,26 @@ impl EventBus {
|
|||
}
|
||||
}
|
||||
|
||||
impl Handler<CreateChannel> for EventBus {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, create: CreateChannel, _: &mut Context<Self>) {
|
||||
self.channels.insert(create.channel, HashSet::new());
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<Subscribe> for EventBus {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
|
||||
info!("Subscribing client to {}", msg.to);
|
||||
// The stateful field doesn't matter here because the hashing on the
|
||||
// HashMap only applies to the name of the channel
|
||||
let ch = Channel {
|
||||
name: msg.to,
|
||||
stateful: false,
|
||||
};
|
||||
|
||||
if self.channels.contains_key(&ch) {
|
||||
match self.channels.get_mut(&ch) {
|
||||
Some(set) => {
|
||||
|
@ -140,6 +174,7 @@ impl Handler<Event> for EventBus {
|
|||
name: ev.channel.to_string(),
|
||||
stateful: false,
|
||||
};
|
||||
|
||||
if self.channels.contains_key(&ch) {
|
||||
if let Some(clients) = self.channels.get(&ch) {
|
||||
/*
|
||||
|
@ -162,3 +197,18 @@ impl Handler<Event> for EventBus {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_with_channels_empty() {
|
||||
let _bus = EventBus::with_channels(vec![], vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_channels_stateless() {
|
||||
let _bus = EventBus::with_channels(vec![String::from("test")], vec![]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
* server loop that the eventbus uses.
|
||||
*/
|
||||
extern crate actix;
|
||||
extern crate actix_web;
|
||||
extern crate actix_http;
|
||||
extern crate actix_web;
|
||||
extern crate config;
|
||||
extern crate log;
|
||||
extern crate pretty_env_logger;
|
||||
|
@ -159,7 +159,6 @@ async fn main() -> std::io::Result<()> {
|
|||
.await
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -183,10 +182,10 @@ mod test {
|
|||
};
|
||||
let wd = web::Data::new(state);
|
||||
let srv = test::start(move || {
|
||||
App::new()
|
||||
.app_data(wd.clone())
|
||||
.route("/", web::get().to(index))
|
||||
});
|
||||
App::new()
|
||||
.app_data(wd.clone())
|
||||
.route("/", web::get().to(index))
|
||||
});
|
||||
|
||||
let req = srv.get("/");
|
||||
let mut response = req.send().await.unwrap();
|
||||
|
@ -199,6 +198,9 @@ mod test {
|
|||
let matches = re.captures(&buffer).unwrap();
|
||||
|
||||
let version = matches.get(1).unwrap();
|
||||
assert_eq!(version.as_str(), format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown")));
|
||||
assert_eq!(
|
||||
version.as_str(),
|
||||
format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
= Otto Processors
|
||||
|
||||
|
||||
Processors in Otto take configuration files, e.g. `.travis-ci.yml`,
|
||||
`Jenkinsfile`, and convert them into the internal Otto constructs necessary for
|
||||
task execution.
|
|
@ -0,0 +1 @@
|
|||
target/
|
|
@ -0,0 +1,24 @@
|
|||
[package]
|
||||
name = "processor-travis-ci"
|
||||
version = "0.1.0"
|
||||
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
serde = "~1.0.103"
|
||||
serde_json = "~1.0.0"
|
||||
serde_yaml = "~0.8.11"
|
||||
|
||||
actix = "~0.9.0"
|
||||
log = "~0.4.8"
|
||||
pretty_env_logger = "~0.3.1"
|
||||
|
||||
# Handling command line options
|
||||
clap = { version = "~2.33.0", features = ["yaml"] }
|
||||
|
||||
# Needed for generating uuids
|
||||
uuid = { version = "~0.8.1", features = ["serde", "v4"] }
|
||||
|
||||
otto-eventbus = { path = "../../eventbus" }
|
|
@ -0,0 +1,41 @@
|
|||
#
|
||||
# This is an example of what the example.yml should convert to in terms of
|
||||
# Otto's internal task execution structure
|
||||
#
|
||||
---
|
||||
meta:
|
||||
# Tasks are a discrete higher level concept, which can be mapped to resources
|
||||
# for execution
|
||||
tasks:
|
||||
- id: 0x1
|
||||
capabilities:
|
||||
# "sudo false" basically means that the Travis workload doens't need a full
|
||||
# virtual machine in order to operate
|
||||
docker_run: true
|
||||
# Operations should all exist in a single task and therefore run on a
|
||||
# single colocated resource that has been allocated underneath
|
||||
ops:
|
||||
- id: 1
|
||||
# Contexts are typically going to carry environment variables and other
|
||||
# things, in the simple Travis CI example, there's really a single
|
||||
# "stage" (to use Jenkins terms) in which all scripts will be executed
|
||||
type: BEGINCTX
|
||||
data:
|
||||
name: 'Travis'
|
||||
- id: 2
|
||||
type: RUNPROC
|
||||
data:
|
||||
script: 'echo "Hello World"'
|
||||
env:
|
||||
# The Travis processor should set a default timeout
|
||||
timeout_s: 300
|
||||
- id: 3
|
||||
type: RUNPROC
|
||||
data:
|
||||
script: 'env'
|
||||
env:
|
||||
timeout_s: 300
|
||||
- id: 4
|
||||
type: ENDCTX
|
||||
data:
|
||||
name: 'Travis'
|
|
@ -0,0 +1,10 @@
|
|||
#
|
||||
# This is an example of a _very_ simple .travis-ci.yml file.
|
||||
|
||||
---
|
||||
sudo: false
|
||||
script:
|
||||
- echo "Hello World"
|
||||
- env
|
||||
|
||||
|
|
@ -0,0 +1,200 @@
|
|||
extern crate actix;
|
||||
extern crate clap;
|
||||
extern crate pretty_env_logger;
|
||||
extern crate serde;
|
||||
extern crate serde_yaml;
|
||||
extern crate uuid;
|
||||
|
||||
use actix::*;
|
||||
use clap::{App, Arg};
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
|
||||
use otto_eventbus::client::*;
|
||||
use otto_eventbus::*;
|
||||
|
||||
fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let matches = App::new("travis processor")
|
||||
.arg(
|
||||
Arg::with_name("filename")
|
||||
.short("f")
|
||||
.long("filename")
|
||||
.value_name("FILE")
|
||||
.required(true)
|
||||
.help("File")
|
||||
.takes_value(true),
|
||||
)
|
||||
.get_matches();
|
||||
let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml");
|
||||
let contents = fs::read_to_string(filename).expect("Something went wrong reading the file");
|
||||
let pipeline = serde_yaml::from_str::<TravisConfig>(&contents)
|
||||
.expect("Failed to deserialize the yaml file into a TravisConfig");
|
||||
|
||||
let mut output = PipelineManifest { tasks: vec![] };
|
||||
|
||||
let mut caps = HashMap::new();
|
||||
|
||||
if pipeline.sudo {
|
||||
caps.insert("docker_run".to_string(), Value::Bool(false));
|
||||
} else {
|
||||
caps.insert("docker_run".to_string(), Value::Bool(true));
|
||||
}
|
||||
|
||||
let mut task = Task {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
capabilities: caps,
|
||||
ops: vec![],
|
||||
};
|
||||
|
||||
task.ops.push(Op {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
op_type: OpType::BeginContext,
|
||||
// Cheap quick hack to get a simple hashmap here
|
||||
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
|
||||
});
|
||||
|
||||
for script in pipeline.script.iter() {
|
||||
let mut data = HashMap::new();
|
||||
data.insert("script".to_string(), Value::String(script.to_string()));
|
||||
data.insert(
|
||||
"timeout_s".to_string(),
|
||||
Value::Number(serde_yaml::Number::from(300)),
|
||||
);
|
||||
data.insert("env".to_string(), Value::Null);
|
||||
task.ops.push(Op {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
op_type: OpType::RunProcess,
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
task.ops.push(Op {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
op_type: OpType::EndContext,
|
||||
// Cheap quick hack to get a simple hashmap here
|
||||
data: serde_yaml::from_str(r#"{ name: "Travis" }"#).unwrap(),
|
||||
});
|
||||
|
||||
output.tasks.push(task);
|
||||
|
||||
info!(
|
||||
"{}",
|
||||
serde_yaml::to_string(&output).expect("Failed to serialize manifest")
|
||||
);
|
||||
|
||||
let sys = System::new("name");
|
||||
Arbiter::spawn(async {
|
||||
let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await;
|
||||
info!("Client created: {:?}", client);
|
||||
|
||||
let input = InputMessage {
|
||||
meta: Meta::new("tasks.for_auction".to_string()),
|
||||
msg: Input::Publish {
|
||||
payload: serde_json::to_value(output).unwrap(),
|
||||
},
|
||||
};
|
||||
client.do_send(input);
|
||||
|
||||
/*
|
||||
* Disconnecting as soon as we send Input doesn't work well, because the client seems to
|
||||
* terminate before it can actually send messages over.
|
||||
*/
|
||||
//client.do_send(Disconnect {});
|
||||
//info!("Disconnected");
|
||||
//System::current().stop();
|
||||
});
|
||||
sys.run().unwrap();
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Serialize)]
|
||||
struct TravisConfig {
|
||||
sudo: bool,
|
||||
script: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, PartialEq, Serialize)]
|
||||
enum OpType {
|
||||
#[serde(rename = "BEGINCTX")]
|
||||
BeginContext,
|
||||
#[serde(rename = "RUNPROC")]
|
||||
RunProcess,
|
||||
#[serde(rename = "ENDCTX")]
|
||||
EndContext,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Serialize)]
|
||||
struct Op {
|
||||
id: String,
|
||||
#[serde(rename = "type")]
|
||||
op_type: OpType,
|
||||
data: HashMap<String, Value>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Serialize)]
|
||||
struct Task {
|
||||
id: String,
|
||||
capabilities: HashMap<String, Value>,
|
||||
ops: Vec<Op>,
|
||||
}
|
||||
#[derive(Deserialize, Debug, Serialize)]
|
||||
struct PipelineManifest {
|
||||
// meta: Meta,
|
||||
tasks: Vec<Task>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn deser_simple() {
|
||||
let yaml = r#"
|
||||
---
|
||||
sudo: false
|
||||
script:
|
||||
- echo "Hello World"
|
||||
- env
|
||||
"#;
|
||||
let c = serde_yaml::from_str::<TravisConfig>(yaml).unwrap();
|
||||
assert!(!c.sudo);
|
||||
assert_eq!(c.script.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deser_yaml() {
|
||||
let yaml = r#"
|
||||
---
|
||||
meta:
|
||||
tasks:
|
||||
- id: 0x1
|
||||
capabilities:
|
||||
docker_run: true
|
||||
ops:
|
||||
- id: 1
|
||||
type: BEGINCTX
|
||||
data:
|
||||
name: 'Travis'
|
||||
- id: 2
|
||||
type: RUNPROC
|
||||
data:
|
||||
script: 'echo "Hello World"'
|
||||
env:
|
||||
timeout_s: 300
|
||||
- id: 3
|
||||
type: ENDCTX
|
||||
data:
|
||||
name: 'Travis'
|
||||
"#;
|
||||
let c = serde_yaml::from_str::<PipelineManifest>(yaml).unwrap();
|
||||
assert_eq!(c.tasks.len(), 1);
|
||||
assert_eq!(c.tasks[0].ops.len(), 3);
|
||||
assert_eq!(c.tasks[0].ops[0].op_type, OpType::BeginContext);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue