Compare commits
2 Commits
414ad11f8e
...
d92a72ec7d
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | d92a72ec7d | |
R Tyler Croy | bd41fecab3 |
|
@ -89,9 +89,9 @@ fn load_manifests_for(
|
|||
* This conveninece function will just generate the endpoint with the object store URL for the
|
||||
* given pipeline
|
||||
*/
|
||||
fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
|
||||
fn object_endpoint_for(uuid: &Uuid) -> step::Endpoint {
|
||||
step::Endpoint {
|
||||
url: url::Url::parse(&format!("http://localhost:7671/{}", pipeline.uuid))
|
||||
url: url::Url::parse(&format!("http://localhost:7671/{}", uuid))
|
||||
.expect("Failed for prepare the object endpoint for a pipeline"),
|
||||
}
|
||||
}
|
||||
|
@ -104,17 +104,18 @@ fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
|
|||
*/
|
||||
pub fn run(
|
||||
steps_dir: &str,
|
||||
pipeline: &Pipeline,
|
||||
steps: &Vec<Step>,
|
||||
pipeline: Uuid,
|
||||
controller: Option<Receiver<control::Request>>,
|
||||
) -> std::io::Result<Status> {
|
||||
let manifests = load_manifests_for(steps_dir, &pipeline.steps)?;
|
||||
let manifests = load_manifests_for(steps_dir, steps)?;
|
||||
|
||||
// XXX: hacks
|
||||
let mut endpoints = HashMap::new();
|
||||
endpoints.insert("objects".to_string(), object_endpoint_for(pipeline));
|
||||
endpoints.insert("objects".to_string(), object_endpoint_for(&pipeline));
|
||||
|
||||
// Now that things are valid and collected, let's executed
|
||||
for step in pipeline.steps.iter() {
|
||||
for step in steps.iter() {
|
||||
if let Some(ref ctl) = controller {
|
||||
while !ctl.is_empty() {
|
||||
if let Ok(msg) = ctl.try_recv() {
|
||||
|
@ -138,7 +139,7 @@ pub fn run(
|
|||
// TODO: This is going to be wrong on nested steps
|
||||
let sock = control::agent_socket();
|
||||
let configuration = step::Configuration {
|
||||
pipeline: pipeline.uuid,
|
||||
pipeline: pipeline,
|
||||
uuid: step.uuid,
|
||||
ipc: sock,
|
||||
endpoints: endpoints.clone(),
|
||||
|
|
|
@ -4,7 +4,9 @@
|
|||
* Most of the logic _should_ be contained within lib.rs and the surrounding modules
|
||||
*/
|
||||
use async_std::sync::channel;
|
||||
use serde::Deserialize;
|
||||
use std::fs::File;
|
||||
use uuid::Uuid;
|
||||
|
||||
use otto_agent::*;
|
||||
|
||||
|
@ -16,6 +18,15 @@ use otto_agent::*;
|
|||
*/
|
||||
const MAX_CONTROL_MSGS: usize = 64;
|
||||
|
||||
/**
|
||||
* The format of the invocation file for the agent
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct Invocation {
|
||||
pipeline: Uuid,
|
||||
steps: Vec<otto_models::Step>,
|
||||
}
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
pretty_env_logger::init();
|
||||
|
@ -29,7 +40,7 @@ async fn main() -> std::io::Result<()> {
|
|||
let file = File::open(&args[1])?;
|
||||
let (sender, receiver) = channel(MAX_CONTROL_MSGS);
|
||||
|
||||
match serde_yaml::from_reader::<File, otto_models::Pipeline>(file) {
|
||||
match serde_yaml::from_reader::<File, Invocation>(file) {
|
||||
Err(e) => {
|
||||
panic!("Failed to parse parameters file: {:#?}", e);
|
||||
}
|
||||
|
@ -39,7 +50,8 @@ async fn main() -> std::io::Result<()> {
|
|||
control::run(sender).await.expect("Failed to bind control?");
|
||||
});
|
||||
|
||||
run(&steps_dir, &invoke, Some(receiver)).expect("Failed to run pipeline");
|
||||
run(&steps_dir, &invoke.steps, invoke.pipeline, Some(receiver))
|
||||
.expect("Failed to run pipeline");
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
|
|
|
@ -10,20 +10,52 @@ use uuid::Uuid;
|
|||
pub struct Pipeline {
|
||||
#[serde(default = "generate_uuid")]
|
||||
pub uuid: Uuid,
|
||||
pub contexts: Vec<Context>,
|
||||
pub steps: Vec<Step>,
|
||||
pub batches: Vec<Batch>,
|
||||
}
|
||||
|
||||
impl Default for Pipeline {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
uuid: generate_uuid(),
|
||||
contexts: vec![],
|
||||
steps: vec![],
|
||||
batches: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A batch is a collection of contexts that should be executed in a given mode.
|
||||
*
|
||||
* This structure basically allows for Otto to execute batches of contexts in parallel, or in various
|
||||
* other flows.
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct Batch {
|
||||
pub mode: BatchMode,
|
||||
pub contexts: Vec<Context>,
|
||||
}
|
||||
|
||||
impl Default for Batch {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: BatchMode::Linear,
|
||||
contexts: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The mode in which an orchestrator should execute the batch
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub enum BatchMode {
|
||||
/// Each context should be executed in order
|
||||
Linear,
|
||||
/// Each context should be executed in parallel completely independent of each other
|
||||
Parallel,
|
||||
/// Each context should be executed in parallel but all should cancel on the first failure
|
||||
Fanout,
|
||||
}
|
||||
|
||||
/**
|
||||
* Possible statuses that a Pipeline can have
|
||||
*
|
||||
|
@ -43,7 +75,7 @@ pub enum Status {
|
|||
|
||||
/**
|
||||
* A context is some bucket of variables and configuration within a pipeline
|
||||
* this will most frequently be a "stage" in the conventional sense
|
||||
* this will most frequently be a "stage" in the conventional pipeline
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct Context {
|
||||
|
@ -51,6 +83,7 @@ pub struct Context {
|
|||
pub uuid: Uuid,
|
||||
pub properties: HashMap<String, String>,
|
||||
pub environment: Option<HashMap<String, String>>,
|
||||
pub steps: Vec<Step>,
|
||||
}
|
||||
|
||||
impl Default for Context {
|
||||
|
@ -59,6 +92,7 @@ impl Default for Context {
|
|||
uuid: generate_uuid(),
|
||||
properties: HashMap::default(),
|
||||
environment: None,
|
||||
steps: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,8 +10,7 @@ async fn main() -> std::io::Result<()> {
|
|||
|
||||
if let Some(fd) = env::var("LISTEN_FD").ok().and_then(|fd| fd.parse().ok()) {
|
||||
app.listen(unsafe { TcpListener::from_raw_fd(fd) }).await?;
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
app.listen("http://localhost:7671").await?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -4,7 +4,8 @@ extern crate pest_derive;
|
|||
|
||||
use log::*;
|
||||
use otto_models::*;
|
||||
use pest::iterators::Pairs;
|
||||
use pest::error::Error as PestError;
|
||||
use pest::iterators::{Pair, Pairs};
|
||||
use pest::Parser;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -16,7 +17,7 @@ struct PipelineParser;
|
|||
* This function will attempt to fully parse the buffer as if it were a complete
|
||||
* pipeline file.
|
||||
*/
|
||||
pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, pest::error::Error<Rule>> {
|
||||
pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, PestError<Rule>> {
|
||||
let mut parser = PipelineParser::parse(Rule::pipeline, buffer)?;
|
||||
let mut pipeline = Pipeline::default();
|
||||
|
||||
|
@ -27,20 +28,29 @@ pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, pest::error::Erro
|
|||
while let Some(parsed) = parsed.next() {
|
||||
match parsed.as_rule() {
|
||||
Rule::steps => {
|
||||
pipeline.steps.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
|
||||
},
|
||||
let mut ctx = Context::default();
|
||||
ctx.steps
|
||||
.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
|
||||
|
||||
pipeline.batches.push(Batch {
|
||||
mode: BatchMode::Linear,
|
||||
contexts: vec![ctx],
|
||||
});
|
||||
}
|
||||
Rule::stage => {
|
||||
let (ctx, mut steps) = parse_stage(&mut parsed.into_inner());
|
||||
pipeline.contexts.push(ctx);
|
||||
pipeline.steps.append(&mut steps);
|
||||
},
|
||||
_ => {},
|
||||
let ctx = parse_stage(&mut parsed.into_inner());
|
||||
pipeline.batches.push(Batch {
|
||||
mode: BatchMode::Linear,
|
||||
contexts: vec![ctx],
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
@ -65,8 +75,6 @@ fn parse_str(parser: &mut pest::iterators::Pair<Rule>) -> String {
|
|||
* In the case of orphan steps, the uuid should be the pipeline's uuid
|
||||
*/
|
||||
fn parse_steps(parser: &mut Pairs<Rule>, uuid: Uuid) -> Vec<Step> {
|
||||
use pest::iterators::Pair;
|
||||
|
||||
let mut steps = vec![];
|
||||
|
||||
while let Some(parsed) = parser.next() {
|
||||
|
@ -88,9 +96,8 @@ fn parse_steps(parser: &mut Pairs<Rule>, uuid: Uuid) -> Vec<Step> {
|
|||
steps
|
||||
}
|
||||
|
||||
fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
|
||||
fn parse_stage(parser: &mut Pairs<Rule>) -> Context {
|
||||
let mut stage = Context::default();
|
||||
let mut steps: Vec<Step> = vec![];
|
||||
|
||||
debug!("stage: {:?}", parser);
|
||||
|
||||
|
@ -117,12 +124,12 @@ fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
|
|||
}
|
||||
Rule::steps => {
|
||||
let mut inner = parsed.into_inner();
|
||||
steps.extend(parse_steps(&mut inner, stage.uuid));
|
||||
stage.steps.extend(parse_steps(&mut inner, stage.uuid));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
(stage, steps)
|
||||
stage
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -227,9 +234,10 @@ mod tests {
|
|||
|
||||
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
|
||||
assert!(!pipeline.uuid.is_nil());
|
||||
assert_eq!(pipeline.contexts.len(), 1);
|
||||
assert!(pipeline.contexts[0].properties.contains_key("name"));
|
||||
assert_eq!(pipeline.steps.len(), 1);
|
||||
assert_eq!(pipeline.batches.len(), 1);
|
||||
let context = &pipeline.batches[0].contexts[0];
|
||||
assert!(context.properties.contains_key("name"));
|
||||
assert_eq!(context.steps.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -253,8 +261,7 @@ mod tests {
|
|||
|
||||
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
|
||||
assert!(!pipeline.uuid.is_nil());
|
||||
assert_eq!(pipeline.contexts.len(), 2);
|
||||
assert_eq!(pipeline.steps.len(), 3);
|
||||
assert_eq!(pipeline.batches.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -267,7 +274,6 @@ mod tests {
|
|||
}"#;
|
||||
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
|
||||
assert!(!pipeline.uuid.is_nil());
|
||||
assert_eq!(pipeline.contexts.len(), 0);
|
||||
assert_eq!(pipeline.steps.len(), 1);
|
||||
assert_eq!(pipeline.batches.len(), 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,13 @@ fn main() -> std::io::Result<()> {
|
|||
std::env::set_current_dir(&invoke.parameters.directory)
|
||||
.expect("Failed to set current directory, perhaps it doesn't exist");
|
||||
|
||||
// Construct a somewhat fabricated pipeline configuration
|
||||
let pipeline = otto_models::Pipeline {
|
||||
uuid: invoke.configuration.pipeline,
|
||||
contexts: vec![],
|
||||
steps: invoke.parameters.block,
|
||||
};
|
||||
|
||||
let status = otto_agent::run(&steps_dir, &pipeline, None).unwrap();
|
||||
let status = otto_agent::run(
|
||||
&steps_dir,
|
||||
&invoke.parameters.block,
|
||||
invoke.configuration.pipeline,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
// Pass our block-scoped status back up to the caller
|
||||
std::process::exit(status as i32);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue