From a32e28d2d254ccdd648f45d170d0a4194f1c9cb6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 13 Jan 2020 19:41:07 -0800 Subject: [PATCH] Prototype the travis-ci processor pushing directly into the tasks.for_auction channel This doesn't help too terribly much at this point, but at least it's a means of getting a .travis-ci.yml into the system --- Cargo.lock | 4 ++ eventbus/src/client.rs | 27 +++++++++--- eventbus/src/lib.rs | 18 ++++++++ processors/travis-ci/Cargo.toml | 6 +++ processors/travis-ci/src/main.rs | 74 ++++++++++++++++++++++++-------- 5 files changed, 105 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f76d58..e7fcada 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1491,7 +1491,11 @@ dependencies = [ name = "processor-travis-ci" version = "0.1.0" dependencies = [ + "actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "otto-eventbus 0.1.0", + "pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", "serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/eventbus/src/client.rs b/eventbus/src/client.rs index 687f734..f3e96d9 100644 --- a/eventbus/src/client.rs +++ b/eventbus/src/client.rs @@ -35,6 +35,10 @@ pub struct EventBusClient { id: &'static str, } +#[derive(Debug, Message)] +#[rtype(result = "()")] +pub struct Disconnect; + /** * Implementation of the Debug trait so that the EventBusClient can be included * in logging statements and print each instance's `id` @@ -45,10 +49,6 @@ impl std::fmt::Debug for EventBusClient { } } -pub trait EventDispatch { - fn dispatch(&self, payload: T); -} - /** * connect will create begin the connection process and start the EventBusClient * actor. @@ -96,7 +96,6 @@ impl EventBusClient { } } -//impl Actor for EventBusClient { impl Actor for EventBusClient { type Context = Context; @@ -119,6 +118,24 @@ impl Actor for EventBusClient { } } +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, _: Disconnect, ctx: &mut Context) { + ctx.stop() + } +} + +impl Handler for EventBusClient { + type Result = (); + + fn handle(&mut self, input: InputMessage, _ctx: &mut Context) { + self.sink + .write(Message::Text(serde_json::to_string(&input).unwrap())) + .unwrap(); + } +} + impl Handler for EventBusClient { type Result = (); diff --git a/eventbus/src/lib.rs b/eventbus/src/lib.rs index 15a97e1..4e494ba 100644 --- a/eventbus/src/lib.rs +++ b/eventbus/src/lib.rs @@ -45,6 +45,18 @@ pub struct Meta { pub ts: DateTime, } +impl Meta { + /** + * Construct a Meta struct with the current time + */ + pub fn new(channel: String) -> Self { + Meta { + channel: channel, + ts: Utc::now(), + } + } +} + /** * The Output enums are all meant to capture the types of messages which can be received from the * eventbus. @@ -158,4 +170,10 @@ mod test { assert_eq!(m.channel, ""); assert!(m.ts < Utc::now()); } + + #[test] + fn test_new_meta() { + let m = Meta::new("foo".to_string()); + assert_eq!(m.channel, "foo"); + } } diff --git a/processors/travis-ci/Cargo.toml b/processors/travis-ci/Cargo.toml index 19b9a37..2e26d71 100644 --- a/processors/travis-ci/Cargo.toml +++ b/processors/travis-ci/Cargo.toml @@ -11,8 +11,14 @@ serde = "~1.0.103" serde_json = "~1.0.0" serde_yaml = "~0.8.11" +actix = "~0.9.0" +log = "~0.4.8" +pretty_env_logger = "~0.3.1" + # Handling command line options clap = { version = "~2.33.0", features = ["yaml"] } # Needed for generating uuids uuid = { version = "~0.8.1", features = ["serde", "v4"] } + +otto-eventbus = { path = "../../eventbus" } diff --git a/processors/travis-ci/src/main.rs b/processors/travis-ci/src/main.rs index abfc907..9d2514f 100644 --- a/processors/travis-ci/src/main.rs +++ b/processors/travis-ci/src/main.rs @@ -1,9 +1,13 @@ +extern crate actix; extern crate clap; +extern crate pretty_env_logger; extern crate serde; extern crate serde_yaml; extern crate uuid; -use clap::{Arg, App}; +use actix::*; +use clap::{App, Arg}; +use log::*; use serde::{Deserialize, Serialize}; use serde_yaml::Value; use uuid::Uuid; @@ -11,31 +15,35 @@ use uuid::Uuid; use std::collections::HashMap; use std::fs; +use otto_eventbus::client::*; +use otto_eventbus::*; + fn main() { + pretty_env_logger::init(); + let matches = App::new("travis processor") - .arg(Arg::with_name("filename") - .short("f") - .long("filename") - .value_name("FILE") - .required(true) - .help("File") - .takes_value(true)) - .get_matches(); + .arg( + Arg::with_name("filename") + .short("f") + .long("filename") + .value_name("FILE") + .required(true) + .help("File") + .takes_value(true), + ) + .get_matches(); let filename = matches.value_of("filename").unwrap_or(".travis-ci.yml"); let contents = fs::read_to_string(filename).expect("Something went wrong reading the file"); let pipeline = serde_yaml::from_str::(&contents) .expect("Failed to deserialize the yaml file into a TravisConfig"); - let mut output = PipelineManifest { - tasks: vec![], - }; + let mut output = PipelineManifest { tasks: vec![] }; let mut caps = HashMap::new(); if pipeline.sudo { caps.insert("docker_run".to_string(), Value::Bool(false)); - } - else { + } else { caps.insert("docker_run".to_string(), Value::Bool(true)); } @@ -55,14 +63,16 @@ fn main() { for script in pipeline.script.iter() { let mut data = HashMap::new(); data.insert("script".to_string(), Value::String(script.to_string())); - data.insert("timeout_s".to_string(), Value::Number(serde_yaml::Number::from(300))); + data.insert( + "timeout_s".to_string(), + Value::Number(serde_yaml::Number::from(300)), + ); data.insert("env".to_string(), Value::Null); task.ops.push(Op { id: Uuid::new_v4().to_string(), op_type: OpType::RunProcess, - data - }); - + data, + }); } task.ops.push(Op { @@ -74,7 +84,33 @@ fn main() { output.tasks.push(task); - println!("{}", serde_yaml::to_string(&output).expect("Failed to serialize manifest")); + info!( + "{}", + serde_yaml::to_string(&output).expect("Failed to serialize manifest") + ); + + let sys = System::new("name"); + Arbiter::spawn(async { + let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await; + info!("Client created: {:?}", client); + + let input = InputMessage { + meta: Meta::new("tasks.for_auction".to_string()), + msg: Input::Publish { + payload: serde_json::to_value(output).unwrap(), + }, + }; + client.do_send(input); + + /* + * Disconnecting as soon as we send Input doesn't work well, because the client seems to + * terminate before it can actually send messages over. + */ + //client.do_send(Disconnect {}); + //info!("Disconnected"); + //System::current().stop(); + }); + sys.run().unwrap(); } #[derive(Deserialize, Debug, Serialize)]