Compare commits

...

3 Commits

Author SHA1 Message Date
R Tyler Croy 8b9674af3e Ensure the agent is always converting positional parameters to keyword arguments
Steps are always expecting keyword arguments, but for developer convenience they
can use positional arguments.

This commit properly converts those positional arguments.

Run with my local integration test:

    #!/usr/bin/env ruby

    require 'json'
    require 'net/http'

    pipeline = nil

    # Parse it
    Net::HTTP.start('localhost', 7672) do |http|
        otto = File.read('Ottofile')
        response = http.post('/v1/parse', otto)

        if response.code.to_i != 200
            puts 'Failed to parse file'
            exit 1
        end

        pipeline = JSON.parse(response.read_body)
    end

    # Hit the local-orchestrator
    Net::HTTP.start('localhost', 7673) do |http|
        contexts = []
        pipeline['batches'].each do |batch|
            contexts += (batch['contexts'])
        end

        payload = JSON.dump({
            :pipeline => pipeline['uuid'],
            :contexts => contexts,
        })

        puts payload
        res = http.post('/v1/run', payload)

        if res.code.to_i != 200
            puts "Failed to orchestrate! #{res.code} #{res.read_body}"
            exit 1
        end
        puts 'Enqueued'
    end

Fixes #42
2020-11-25 21:24:55 -08:00
R Tyler Croy 97088dff12 Run the local-orchestrator inside the tmp directory 2020-11-25 21:18:41 -08:00
R Tyler Croy 308c516e47 Run sequential contexts in the local-orchestrator if there was a success 2020-11-22 20:02:00 -08:00
5 changed files with 120 additions and 23 deletions

View File

@ -3,7 +3,7 @@
#
object-store: OTTO_OBJECT_DIR=tmp/objects RUST_LOG=debug ./target/debug/otto-object-store
orchestrator: RUST_LOG=debug ./target/debug/otto-local-orchestrator
orchestrator: RUST_LOG=debug STEPS_DIR=$PWD/tmp PATH=$PWD/target/debug:$PATH otto-local-orchestrator
parser: RUST_LOG=debug ./target/debug/otto-parser
# vim: ft=sh

View File

@ -53,9 +53,10 @@ struct LoadedManifest {
manifest: osp::Manifest,
path: PathBuf,
}
fn load_manifests_for(
fn load_manifests_for_symbols(
steps_dir: &str,
steps: &Vec<Step>,
symbols: Vec<String>,
) -> std::io::Result<HashMap<String, LoadedManifest>> {
use std::io::{Error, ErrorKind};
@ -72,28 +73,32 @@ fn load_manifests_for(
let mut manifests = HashMap::new();
for step in steps.iter() {
let manifest_file = dir.join(&step.symbol).join("manifest.yml");
for step in symbols.iter() {
let manifest_file = dir.join(step).join("manifest.yml");
if manifest_file.is_file() {
let file = File::open(manifest_file)?;
if let Ok(manifest) = serde_yaml::from_reader::<File, osp::Manifest>(file) {
let manifest = LoadedManifest {
manifest,
path: dir.join(&step.symbol).to_path_buf(),
path: dir.join(step).to_path_buf(),
};
manifests.insert(step.symbol.clone(), manifest);
manifests.insert(step.clone(), manifest);
}
} else {
warn!(
"{}/manifest.yml does not exist, step cannot execute",
step.symbol
);
warn!("{}/manifest.yml does not exist, step cannot execute", step);
}
}
Ok(manifests)
}
fn load_manifests_for(
steps_dir: &str,
steps: &Vec<Step>,
) -> std::io::Result<HashMap<String, LoadedManifest>> {
load_manifests_for_symbols(steps_dir, steps.iter().map(|s| s.symbol.clone()).collect())
}
/**
* This conveninece function will just generate the endpoint with the object store URL for the
* given pipeline
@ -105,6 +110,32 @@ fn object_endpoint_for(uuid: &Uuid) -> step::Endpoint {
}
}
/**
* Convert positional StepParameters to keywords based on the manifest for the step
*
* This function will clone the parameters
*/
fn positional_to_keyword(args: &StepParameters, manifest: &osp::Manifest) -> StepParameters {
match args {
StepParameters::Keyword(_) => args.clone(),
StepParameters::Positional(args) => {
let mut kwargs = HashMap::new();
for (i, arg) in args.iter().enumerate() {
if i >= manifest.parameters.len() {
error!(
"Too many positional parameters for the step! ({})",
manifest.symbol
);
break;
}
let key = manifest.parameters[i].name.clone();
kwargs.insert(key, arg.clone());
}
StepParameters::Keyword(kwargs)
}
}
}
/**
* The run method is the "core" of the agent which will run a series of steps
* passed in.
@ -167,7 +198,7 @@ pub fn run(
};
let invocation: step::Invocation<StepParameters> = step::Invocation {
configuration,
parameters: step.parameters.clone(),
parameters: positional_to_keyword(&step.parameters, &runner.manifest),
};
serde_json::to_writer(&mut file, &invocation)
@ -260,4 +291,40 @@ mod tests {
load_manifests_for("../stdlib", &vec![step]).expect("Failed to look into stdlib?");
assert!(manifests.len() > 0);
}
#[test]
fn pos_to_keyword() {
use serde_json::Value;
let arg = Value::String("ps".to_string());
let parameters = StepParameters::Positional(vec![arg.clone()]);
let manifests = load_manifests_for_symbols("../stdlib", vec!["sh".to_string()])
.expect("Failed to look into stdlib?");
let loaded = manifests.get("sh").expect("Must have a `sh` manifest");
let kwargs = positional_to_keyword(&parameters, &loaded.manifest);
match kwargs {
StepParameters::Positional(_) => assert!(false),
StepParameters::Keyword(kw) => {
assert_eq!(kw.get("script"), Some(&arg));
}
}
}
#[test]
fn too_many_pos_to_keyword() {
use serde_json::Value;
let arg = Value::String("hi".to_string());
let parameters = StepParameters::Positional(vec![arg.clone(), arg.clone()]);
let manifests = load_manifests_for_symbols("../stdlib", vec!["echo".to_string()])
.expect("Failed to look into stdlib?");
let loaded = manifests.get("echo").expect("Must have a `echo` manifest");
let kwargs = positional_to_keyword(&parameters, &loaded.manifest);
match kwargs {
StepParameters::Positional(_) => assert!(false),
StepParameters::Keyword(kw) => {
assert_eq!(kw.get("message"), Some(&arg));
}
}
}
}

View File

@ -171,6 +171,22 @@ mod tests {
assert_eq!(step.symbol, "sh");
}
/*
* https://github.com/rtyler/otto/issues/42
*/
#[test]
fn deserialize_positional_issue_42() {
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Pipeline {
pipeline: String,
steps: Vec<Step>,
}
let buf = r#"{"pipeline":"fdbebdcf-ad5c-49e5-890f-aef294b476c5","steps":[{"uuid":"f619073f-4129-4d30-a94f-f61af164a6d8","context":"fdbebdcf-ad5c-49e5-890f-aef294b476c5","symbol":"sh","parameters":["pwd"]}]}"#;
let pipeline = serde_json::from_str::<Pipeline>(&buf).expect("Failed to deserialize");
assert_eq!(pipeline.steps[0].symbol, "sh");
}
#[test]
fn deserialize_kwargs() {
let buf = r#"

View File

@ -72,11 +72,11 @@ pub struct Entrypoint {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Parameter {
name: String,
required: bool,
pub name: String,
pub required: bool,
#[serde(rename = "type")]
p_type: ParameterType,
description: String,
pub p_type: ParameterType,
pub description: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]

View File

@ -18,7 +18,7 @@ struct RunWorkload {
* context and will spawn an agent to run it.
*
*/
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<()> {
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<bool> {
use std::io::{Error, ErrorKind};
use std::process::Command;
use tempfile::NamedTempFile;
@ -29,6 +29,8 @@ fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<(
steps: ctx.steps.clone(),
};
println!("{}", serde_json::to_string(&invocation).unwrap());
if let Err(failure) = serde_json::to_writer(&mut file, &invocation) {
error!("Failed to write temporary file for agent: {:#?}", failure);
return Err(Error::new(
@ -38,13 +40,16 @@ fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<(
}
if let Ok(output) = Command::new("otto-agent").arg(file.path()).output() {
info!("output: {:?}", output);
use std::io::{stdout, Write};
stdout().write(&output.stdout);
return Ok(output.status.success());
} else {
// TODO
error!("Failed to run agent");
}
Ok(())
Ok(false)
}
async fn healthcheck(_req: Request<()>) -> tide::Result {
@ -56,12 +61,21 @@ async fn healthcheck(_req: Request<()>) -> tide::Result {
async fn run_workload(mut req: Request<()>) -> tide::Result {
let run: RunWorkload = req.body_json().await?;
debug!("Received RunWorkload: {:#?}", run);
debug!("Received RunWorkload: {:?}", run);
task::spawn(async move {
println!("Running workload: {:#?}", run);
for ctx in run.contexts.iter() {
run_context(&run.pipeline, ctx);
match run_context(&run.pipeline, ctx) {
Ok(success) => {
if !success {
return;
}
debug!("Context succeeded, continuing");
}
Err(_) => {
return;
}
}
}
});
@ -74,7 +88,7 @@ async fn run_workload(mut req: Request<()>) -> tide::Result {
#[async_std::main]
async fn main() -> std::io::Result<()> {
use std::{env, net::TcpListener, os::unix::io::FromRawFd};
tide::log::start();
pretty_env_logger::init();
let mut app = tide::new();
app.at("/health").get(healthcheck);