Restructuring the models to allow for parallel execution of contexts

This will lay the ground work for more complex pipeline verbs like fanout and
parallel

This also changes the invocation file format for the agent to basically:

    ---
    pipeline: 'some-uuid'
    steps:
      - symbol: sh
        context: 'some-uuid'
        parameters:

This means that the agents will be a little more stupid when it comes to the
overall pipeline structure
This commit is contained in:
R Tyler Croy 2020-11-14 15:19:04 -08:00
parent bd41fecab3
commit d92a72ec7d
6 changed files with 97 additions and 45 deletions

View File

@ -89,9 +89,9 @@ fn load_manifests_for(
* This conveninece function will just generate the endpoint with the object store URL for the
* given pipeline
*/
fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
fn object_endpoint_for(uuid: &Uuid) -> step::Endpoint {
step::Endpoint {
url: url::Url::parse(&format!("http://localhost:7671/{}", pipeline.uuid))
url: url::Url::parse(&format!("http://localhost:7671/{}", uuid))
.expect("Failed for prepare the object endpoint for a pipeline"),
}
}
@ -104,17 +104,18 @@ fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
*/
pub fn run(
steps_dir: &str,
pipeline: &Pipeline,
steps: &Vec<Step>,
pipeline: Uuid,
controller: Option<Receiver<control::Request>>,
) -> std::io::Result<Status> {
let manifests = load_manifests_for(steps_dir, &pipeline.steps)?;
let manifests = load_manifests_for(steps_dir, steps)?;
// XXX: hacks
let mut endpoints = HashMap::new();
endpoints.insert("objects".to_string(), object_endpoint_for(pipeline));
endpoints.insert("objects".to_string(), object_endpoint_for(&pipeline));
// Now that things are valid and collected, let's executed
for step in pipeline.steps.iter() {
for step in steps.iter() {
if let Some(ref ctl) = controller {
while !ctl.is_empty() {
if let Ok(msg) = ctl.try_recv() {
@ -138,7 +139,7 @@ pub fn run(
// TODO: This is going to be wrong on nested steps
let sock = control::agent_socket();
let configuration = step::Configuration {
pipeline: pipeline.uuid,
pipeline: pipeline,
uuid: step.uuid,
ipc: sock,
endpoints: endpoints.clone(),

View File

@ -4,7 +4,9 @@
* Most of the logic _should_ be contained within lib.rs and the surrounding modules
*/
use async_std::sync::channel;
use serde::Deserialize;
use std::fs::File;
use uuid::Uuid;
use otto_agent::*;
@ -16,6 +18,15 @@ use otto_agent::*;
*/
const MAX_CONTROL_MSGS: usize = 64;
/**
* The format of the invocation file for the agent
*/
#[derive(Clone, Debug, Deserialize)]
struct Invocation {
pipeline: Uuid,
steps: Vec<otto_models::Step>,
}
#[async_std::main]
async fn main() -> std::io::Result<()> {
pretty_env_logger::init();
@ -29,7 +40,7 @@ async fn main() -> std::io::Result<()> {
let file = File::open(&args[1])?;
let (sender, receiver) = channel(MAX_CONTROL_MSGS);
match serde_yaml::from_reader::<File, otto_models::Pipeline>(file) {
match serde_yaml::from_reader::<File, Invocation>(file) {
Err(e) => {
panic!("Failed to parse parameters file: {:#?}", e);
}
@ -39,7 +50,8 @@ async fn main() -> std::io::Result<()> {
control::run(sender).await.expect("Failed to bind control?");
});
run(&steps_dir, &invoke, Some(receiver)).expect("Failed to run pipeline");
run(&steps_dir, &invoke.steps, invoke.pipeline, Some(receiver))
.expect("Failed to run pipeline");
}
};
Ok(())

View File

@ -10,20 +10,52 @@ use uuid::Uuid;
pub struct Pipeline {
#[serde(default = "generate_uuid")]
pub uuid: Uuid,
pub contexts: Vec<Context>,
pub steps: Vec<Step>,
pub batches: Vec<Batch>,
}
impl Default for Pipeline {
fn default() -> Self {
Self {
uuid: generate_uuid(),
contexts: vec![],
steps: vec![],
batches: vec![],
}
}
}
/**
* A batch is a collection of contexts that should be executed in a given mode.
*
* This structure basically allows for Otto to execute batches of contexts in parallel, or in various
* other flows.
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Batch {
pub mode: BatchMode,
pub contexts: Vec<Context>,
}
impl Default for Batch {
fn default() -> Self {
Self {
mode: BatchMode::Linear,
contexts: vec![],
}
}
}
/**
* The mode in which an orchestrator should execute the batch
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum BatchMode {
/// Each context should be executed in order
Linear,
/// Each context should be executed in parallel completely independent of each other
Parallel,
/// Each context should be executed in parallel but all should cancel on the first failure
Fanout,
}
/**
* Possible statuses that a Pipeline can have
*
@ -43,7 +75,7 @@ pub enum Status {
/**
* A context is some bucket of variables and configuration within a pipeline
* this will most frequently be a "stage" in the conventional sense
* this will most frequently be a "stage" in the conventional pipeline
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Context {
@ -51,6 +83,7 @@ pub struct Context {
pub uuid: Uuid,
pub properties: HashMap<String, String>,
pub environment: Option<HashMap<String, String>>,
pub steps: Vec<Step>,
}
impl Default for Context {
@ -59,6 +92,7 @@ impl Default for Context {
uuid: generate_uuid(),
properties: HashMap::default(),
environment: None,
steps: vec![],
}
}
}

View File

@ -10,8 +10,7 @@ async fn main() -> std::io::Result<()> {
if let Some(fd) = env::var("LISTEN_FD").ok().and_then(|fd| fd.parse().ok()) {
app.listen(unsafe { TcpListener::from_raw_fd(fd) }).await?;
}
else {
} else {
app.listen("http://localhost:7671").await?;
}
Ok(())

View File

@ -4,8 +4,8 @@ extern crate pest_derive;
use log::*;
use otto_models::*;
use pest::iterators::{Pair, Pairs};
use pest::error::Error as PestError;
use pest::iterators::{Pair, Pairs};
use pest::Parser;
use uuid::Uuid;
@ -28,20 +28,29 @@ pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, PestError<Rule>>
while let Some(parsed) = parsed.next() {
match parsed.as_rule() {
Rule::steps => {
pipeline.steps.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
},
let mut ctx = Context::default();
ctx.steps
.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
pipeline.batches.push(Batch {
mode: BatchMode::Linear,
contexts: vec![ctx],
});
}
Rule::stage => {
let (ctx, mut steps) = parse_stage(&mut parsed.into_inner());
pipeline.contexts.push(ctx);
pipeline.steps.append(&mut steps);
},
_ => {},
let ctx = parse_stage(&mut parsed.into_inner());
pipeline.batches.push(Batch {
mode: BatchMode::Linear,
contexts: vec![ctx],
});
}
_ => {}
}
}
},
}
_ => {}
}
};
}
Ok(pipeline)
}
@ -87,9 +96,8 @@ fn parse_steps(parser: &mut Pairs<Rule>, uuid: Uuid) -> Vec<Step> {
steps
}
fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
fn parse_stage(parser: &mut Pairs<Rule>) -> Context {
let mut stage = Context::default();
let mut steps: Vec<Step> = vec![];
debug!("stage: {:?}", parser);
@ -116,12 +124,12 @@ fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
}
Rule::steps => {
let mut inner = parsed.into_inner();
steps.extend(parse_steps(&mut inner, stage.uuid));
stage.steps.extend(parse_steps(&mut inner, stage.uuid));
}
_ => {}
}
}
(stage, steps)
stage
}
#[cfg(test)]
@ -226,9 +234,10 @@ mod tests {
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 1);
assert!(pipeline.contexts[0].properties.contains_key("name"));
assert_eq!(pipeline.steps.len(), 1);
assert_eq!(pipeline.batches.len(), 1);
let context = &pipeline.batches[0].contexts[0];
assert!(context.properties.contains_key("name"));
assert_eq!(context.steps.len(), 1);
}
#[test]
@ -252,8 +261,7 @@ mod tests {
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 2);
assert_eq!(pipeline.steps.len(), 3);
assert_eq!(pipeline.batches.len(), 2);
}
#[test]
@ -266,7 +274,6 @@ mod tests {
}"#;
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 0);
assert_eq!(pipeline.steps.len(), 1);
assert_eq!(pipeline.batches.len(), 1);
}
}

View File

@ -19,14 +19,13 @@ fn main() -> std::io::Result<()> {
std::env::set_current_dir(&invoke.parameters.directory)
.expect("Failed to set current directory, perhaps it doesn't exist");
// Construct a somewhat fabricated pipeline configuration
let pipeline = otto_models::Pipeline {
uuid: invoke.configuration.pipeline,
contexts: vec![],
steps: invoke.parameters.block,
};
let status = otto_agent::run(&steps_dir, &pipeline, None).unwrap();
let status = otto_agent::run(
&steps_dir,
&invoke.parameters.block,
invoke.configuration.pipeline,
None,
)
.unwrap();
// Pass our block-scoped status back up to the caller
std::process::exit(status as i32);
}