Compare commits

...

2 Commits

Author SHA1 Message Date
R Tyler Croy d92a72ec7d Restructuring the models to allow for parallel execution of contexts
This will lay the ground work for more complex pipeline verbs like fanout and
parallel

This also changes the invocation file format for the agent to basically:

    ---
    pipeline: 'some-uuid'
    steps:
      - symbol: sh
        context: 'some-uuid'
        parameters:

This means that the agents will be a little more stupid when it comes to the
overall pipeline structure
2020-11-14 15:43:28 -08:00
R Tyler Croy bd41fecab3 Slight cleanup/refactoring 2020-11-11 19:59:16 -08:00
6 changed files with 99 additions and 48 deletions

View File

@ -89,9 +89,9 @@ fn load_manifests_for(
* This conveninece function will just generate the endpoint with the object store URL for the
* given pipeline
*/
fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
fn object_endpoint_for(uuid: &Uuid) -> step::Endpoint {
step::Endpoint {
url: url::Url::parse(&format!("http://localhost:7671/{}", pipeline.uuid))
url: url::Url::parse(&format!("http://localhost:7671/{}", uuid))
.expect("Failed for prepare the object endpoint for a pipeline"),
}
}
@ -104,17 +104,18 @@ fn object_endpoint_for(pipeline: &Pipeline) -> step::Endpoint {
*/
pub fn run(
steps_dir: &str,
pipeline: &Pipeline,
steps: &Vec<Step>,
pipeline: Uuid,
controller: Option<Receiver<control::Request>>,
) -> std::io::Result<Status> {
let manifests = load_manifests_for(steps_dir, &pipeline.steps)?;
let manifests = load_manifests_for(steps_dir, steps)?;
// XXX: hacks
let mut endpoints = HashMap::new();
endpoints.insert("objects".to_string(), object_endpoint_for(pipeline));
endpoints.insert("objects".to_string(), object_endpoint_for(&pipeline));
// Now that things are valid and collected, let's executed
for step in pipeline.steps.iter() {
for step in steps.iter() {
if let Some(ref ctl) = controller {
while !ctl.is_empty() {
if let Ok(msg) = ctl.try_recv() {
@ -138,7 +139,7 @@ pub fn run(
// TODO: This is going to be wrong on nested steps
let sock = control::agent_socket();
let configuration = step::Configuration {
pipeline: pipeline.uuid,
pipeline: pipeline,
uuid: step.uuid,
ipc: sock,
endpoints: endpoints.clone(),

View File

@ -4,7 +4,9 @@
* Most of the logic _should_ be contained within lib.rs and the surrounding modules
*/
use async_std::sync::channel;
use serde::Deserialize;
use std::fs::File;
use uuid::Uuid;
use otto_agent::*;
@ -16,6 +18,15 @@ use otto_agent::*;
*/
const MAX_CONTROL_MSGS: usize = 64;
/**
* The format of the invocation file for the agent
*/
#[derive(Clone, Debug, Deserialize)]
struct Invocation {
pipeline: Uuid,
steps: Vec<otto_models::Step>,
}
#[async_std::main]
async fn main() -> std::io::Result<()> {
pretty_env_logger::init();
@ -29,7 +40,7 @@ async fn main() -> std::io::Result<()> {
let file = File::open(&args[1])?;
let (sender, receiver) = channel(MAX_CONTROL_MSGS);
match serde_yaml::from_reader::<File, otto_models::Pipeline>(file) {
match serde_yaml::from_reader::<File, Invocation>(file) {
Err(e) => {
panic!("Failed to parse parameters file: {:#?}", e);
}
@ -39,7 +50,8 @@ async fn main() -> std::io::Result<()> {
control::run(sender).await.expect("Failed to bind control?");
});
run(&steps_dir, &invoke, Some(receiver)).expect("Failed to run pipeline");
run(&steps_dir, &invoke.steps, invoke.pipeline, Some(receiver))
.expect("Failed to run pipeline");
}
};
Ok(())

View File

@ -10,20 +10,52 @@ use uuid::Uuid;
pub struct Pipeline {
#[serde(default = "generate_uuid")]
pub uuid: Uuid,
pub contexts: Vec<Context>,
pub steps: Vec<Step>,
pub batches: Vec<Batch>,
}
impl Default for Pipeline {
fn default() -> Self {
Self {
uuid: generate_uuid(),
contexts: vec![],
steps: vec![],
batches: vec![],
}
}
}
/**
* A batch is a collection of contexts that should be executed in a given mode.
*
* This structure basically allows for Otto to execute batches of contexts in parallel, or in various
* other flows.
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Batch {
pub mode: BatchMode,
pub contexts: Vec<Context>,
}
impl Default for Batch {
fn default() -> Self {
Self {
mode: BatchMode::Linear,
contexts: vec![],
}
}
}
/**
* The mode in which an orchestrator should execute the batch
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum BatchMode {
/// Each context should be executed in order
Linear,
/// Each context should be executed in parallel completely independent of each other
Parallel,
/// Each context should be executed in parallel but all should cancel on the first failure
Fanout,
}
/**
* Possible statuses that a Pipeline can have
*
@ -43,7 +75,7 @@ pub enum Status {
/**
* A context is some bucket of variables and configuration within a pipeline
* this will most frequently be a "stage" in the conventional sense
* this will most frequently be a "stage" in the conventional pipeline
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Context {
@ -51,6 +83,7 @@ pub struct Context {
pub uuid: Uuid,
pub properties: HashMap<String, String>,
pub environment: Option<HashMap<String, String>>,
pub steps: Vec<Step>,
}
impl Default for Context {
@ -59,6 +92,7 @@ impl Default for Context {
uuid: generate_uuid(),
properties: HashMap::default(),
environment: None,
steps: vec![],
}
}
}

View File

@ -10,8 +10,7 @@ async fn main() -> std::io::Result<()> {
if let Some(fd) = env::var("LISTEN_FD").ok().and_then(|fd| fd.parse().ok()) {
app.listen(unsafe { TcpListener::from_raw_fd(fd) }).await?;
}
else {
} else {
app.listen("http://localhost:7671").await?;
}
Ok(())

View File

@ -4,7 +4,8 @@ extern crate pest_derive;
use log::*;
use otto_models::*;
use pest::iterators::Pairs;
use pest::error::Error as PestError;
use pest::iterators::{Pair, Pairs};
use pest::Parser;
use uuid::Uuid;
@ -16,7 +17,7 @@ struct PipelineParser;
* This function will attempt to fully parse the buffer as if it were a complete
* pipeline file.
*/
pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, pest::error::Error<Rule>> {
pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, PestError<Rule>> {
let mut parser = PipelineParser::parse(Rule::pipeline, buffer)?;
let mut pipeline = Pipeline::default();
@ -27,20 +28,29 @@ pub fn parse_pipeline_string(buffer: &str) -> Result<Pipeline, pest::error::Erro
while let Some(parsed) = parsed.next() {
match parsed.as_rule() {
Rule::steps => {
pipeline.steps.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
},
let mut ctx = Context::default();
ctx.steps
.extend(parse_steps(&mut parsed.into_inner(), pipeline.uuid));
pipeline.batches.push(Batch {
mode: BatchMode::Linear,
contexts: vec![ctx],
});
}
Rule::stage => {
let (ctx, mut steps) = parse_stage(&mut parsed.into_inner());
pipeline.contexts.push(ctx);
pipeline.steps.append(&mut steps);
},
_ => {},
let ctx = parse_stage(&mut parsed.into_inner());
pipeline.batches.push(Batch {
mode: BatchMode::Linear,
contexts: vec![ctx],
});
}
_ => {}
}
}
},
}
_ => {}
}
};
}
Ok(pipeline)
}
@ -65,8 +75,6 @@ fn parse_str(parser: &mut pest::iterators::Pair<Rule>) -> String {
* In the case of orphan steps, the uuid should be the pipeline's uuid
*/
fn parse_steps(parser: &mut Pairs<Rule>, uuid: Uuid) -> Vec<Step> {
use pest::iterators::Pair;
let mut steps = vec![];
while let Some(parsed) = parser.next() {
@ -88,9 +96,8 @@ fn parse_steps(parser: &mut Pairs<Rule>, uuid: Uuid) -> Vec<Step> {
steps
}
fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
fn parse_stage(parser: &mut Pairs<Rule>) -> Context {
let mut stage = Context::default();
let mut steps: Vec<Step> = vec![];
debug!("stage: {:?}", parser);
@ -117,12 +124,12 @@ fn parse_stage(parser: &mut Pairs<Rule>) -> (Context, Vec<Step>) {
}
Rule::steps => {
let mut inner = parsed.into_inner();
steps.extend(parse_steps(&mut inner, stage.uuid));
stage.steps.extend(parse_steps(&mut inner, stage.uuid));
}
_ => {}
}
}
(stage, steps)
stage
}
#[cfg(test)]
@ -227,9 +234,10 @@ mod tests {
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 1);
assert!(pipeline.contexts[0].properties.contains_key("name"));
assert_eq!(pipeline.steps.len(), 1);
assert_eq!(pipeline.batches.len(), 1);
let context = &pipeline.batches[0].contexts[0];
assert!(context.properties.contains_key("name"));
assert_eq!(context.steps.len(), 1);
}
#[test]
@ -253,8 +261,7 @@ mod tests {
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 2);
assert_eq!(pipeline.steps.len(), 3);
assert_eq!(pipeline.batches.len(), 2);
}
#[test]
@ -267,7 +274,6 @@ mod tests {
}"#;
let pipeline = parse_pipeline_string(&buf).expect("Failed to parse");
assert!(!pipeline.uuid.is_nil());
assert_eq!(pipeline.contexts.len(), 0);
assert_eq!(pipeline.steps.len(), 1);
assert_eq!(pipeline.batches.len(), 1);
}
}

View File

@ -19,14 +19,13 @@ fn main() -> std::io::Result<()> {
std::env::set_current_dir(&invoke.parameters.directory)
.expect("Failed to set current directory, perhaps it doesn't exist");
// Construct a somewhat fabricated pipeline configuration
let pipeline = otto_models::Pipeline {
uuid: invoke.configuration.pipeline,
contexts: vec![],
steps: invoke.parameters.block,
};
let status = otto_agent::run(&steps_dir, &pipeline, None).unwrap();
let status = otto_agent::run(
&steps_dir,
&invoke.parameters.block,
invoke.configuration.pipeline,
None,
)
.unwrap();
// Pass our block-scoped status back up to the caller
std::process::exit(status as i32);
}