Compare commits
3 Commits
bc8e6e7cec
...
8b9674af3e
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | 8b9674af3e | |
R Tyler Croy | 97088dff12 | |
R Tyler Croy | 308c516e47 |
2
Procfile
2
Procfile
|
@ -3,7 +3,7 @@
|
|||
#
|
||||
|
||||
object-store: OTTO_OBJECT_DIR=tmp/objects RUST_LOG=debug ./target/debug/otto-object-store
|
||||
orchestrator: RUST_LOG=debug ./target/debug/otto-local-orchestrator
|
||||
orchestrator: RUST_LOG=debug STEPS_DIR=$PWD/tmp PATH=$PWD/target/debug:$PATH otto-local-orchestrator
|
||||
parser: RUST_LOG=debug ./target/debug/otto-parser
|
||||
|
||||
# vim: ft=sh
|
||||
|
|
|
@ -53,9 +53,10 @@ struct LoadedManifest {
|
|||
manifest: osp::Manifest,
|
||||
path: PathBuf,
|
||||
}
|
||||
fn load_manifests_for(
|
||||
|
||||
fn load_manifests_for_symbols(
|
||||
steps_dir: &str,
|
||||
steps: &Vec<Step>,
|
||||
symbols: Vec<String>,
|
||||
) -> std::io::Result<HashMap<String, LoadedManifest>> {
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
|
@ -72,28 +73,32 @@ fn load_manifests_for(
|
|||
|
||||
let mut manifests = HashMap::new();
|
||||
|
||||
for step in steps.iter() {
|
||||
let manifest_file = dir.join(&step.symbol).join("manifest.yml");
|
||||
for step in symbols.iter() {
|
||||
let manifest_file = dir.join(step).join("manifest.yml");
|
||||
|
||||
if manifest_file.is_file() {
|
||||
let file = File::open(manifest_file)?;
|
||||
if let Ok(manifest) = serde_yaml::from_reader::<File, osp::Manifest>(file) {
|
||||
let manifest = LoadedManifest {
|
||||
manifest,
|
||||
path: dir.join(&step.symbol).to_path_buf(),
|
||||
path: dir.join(step).to_path_buf(),
|
||||
};
|
||||
manifests.insert(step.symbol.clone(), manifest);
|
||||
manifests.insert(step.clone(), manifest);
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"{}/manifest.yml does not exist, step cannot execute",
|
||||
step.symbol
|
||||
);
|
||||
warn!("{}/manifest.yml does not exist, step cannot execute", step);
|
||||
}
|
||||
}
|
||||
Ok(manifests)
|
||||
}
|
||||
|
||||
fn load_manifests_for(
|
||||
steps_dir: &str,
|
||||
steps: &Vec<Step>,
|
||||
) -> std::io::Result<HashMap<String, LoadedManifest>> {
|
||||
load_manifests_for_symbols(steps_dir, steps.iter().map(|s| s.symbol.clone()).collect())
|
||||
}
|
||||
|
||||
/**
|
||||
* This conveninece function will just generate the endpoint with the object store URL for the
|
||||
* given pipeline
|
||||
|
@ -105,6 +110,32 @@ fn object_endpoint_for(uuid: &Uuid) -> step::Endpoint {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert positional StepParameters to keywords based on the manifest for the step
|
||||
*
|
||||
* This function will clone the parameters
|
||||
*/
|
||||
fn positional_to_keyword(args: &StepParameters, manifest: &osp::Manifest) -> StepParameters {
|
||||
match args {
|
||||
StepParameters::Keyword(_) => args.clone(),
|
||||
StepParameters::Positional(args) => {
|
||||
let mut kwargs = HashMap::new();
|
||||
for (i, arg) in args.iter().enumerate() {
|
||||
if i >= manifest.parameters.len() {
|
||||
error!(
|
||||
"Too many positional parameters for the step! ({})",
|
||||
manifest.symbol
|
||||
);
|
||||
break;
|
||||
}
|
||||
let key = manifest.parameters[i].name.clone();
|
||||
kwargs.insert(key, arg.clone());
|
||||
}
|
||||
StepParameters::Keyword(kwargs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The run method is the "core" of the agent which will run a series of steps
|
||||
* passed in.
|
||||
|
@ -167,7 +198,7 @@ pub fn run(
|
|||
};
|
||||
let invocation: step::Invocation<StepParameters> = step::Invocation {
|
||||
configuration,
|
||||
parameters: step.parameters.clone(),
|
||||
parameters: positional_to_keyword(&step.parameters, &runner.manifest),
|
||||
};
|
||||
|
||||
serde_json::to_writer(&mut file, &invocation)
|
||||
|
@ -260,4 +291,40 @@ mod tests {
|
|||
load_manifests_for("../stdlib", &vec![step]).expect("Failed to look into stdlib?");
|
||||
assert!(manifests.len() > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pos_to_keyword() {
|
||||
use serde_json::Value;
|
||||
let arg = Value::String("ps".to_string());
|
||||
let parameters = StepParameters::Positional(vec![arg.clone()]);
|
||||
let manifests = load_manifests_for_symbols("../stdlib", vec!["sh".to_string()])
|
||||
.expect("Failed to look into stdlib?");
|
||||
let loaded = manifests.get("sh").expect("Must have a `sh` manifest");
|
||||
|
||||
let kwargs = positional_to_keyword(¶meters, &loaded.manifest);
|
||||
match kwargs {
|
||||
StepParameters::Positional(_) => assert!(false),
|
||||
StepParameters::Keyword(kw) => {
|
||||
assert_eq!(kw.get("script"), Some(&arg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn too_many_pos_to_keyword() {
|
||||
use serde_json::Value;
|
||||
let arg = Value::String("hi".to_string());
|
||||
let parameters = StepParameters::Positional(vec![arg.clone(), arg.clone()]);
|
||||
let manifests = load_manifests_for_symbols("../stdlib", vec!["echo".to_string()])
|
||||
.expect("Failed to look into stdlib?");
|
||||
let loaded = manifests.get("echo").expect("Must have a `echo` manifest");
|
||||
|
||||
let kwargs = positional_to_keyword(¶meters, &loaded.manifest);
|
||||
match kwargs {
|
||||
StepParameters::Positional(_) => assert!(false),
|
||||
StepParameters::Keyword(kw) => {
|
||||
assert_eq!(kw.get("message"), Some(&arg));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,6 +171,22 @@ mod tests {
|
|||
assert_eq!(step.symbol, "sh");
|
||||
}
|
||||
|
||||
/*
|
||||
* https://github.com/rtyler/otto/issues/42
|
||||
*/
|
||||
#[test]
|
||||
fn deserialize_positional_issue_42() {
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
struct Pipeline {
|
||||
pipeline: String,
|
||||
steps: Vec<Step>,
|
||||
}
|
||||
let buf = r#"{"pipeline":"fdbebdcf-ad5c-49e5-890f-aef294b476c5","steps":[{"uuid":"f619073f-4129-4d30-a94f-f61af164a6d8","context":"fdbebdcf-ad5c-49e5-890f-aef294b476c5","symbol":"sh","parameters":["pwd"]}]}"#;
|
||||
let pipeline = serde_json::from_str::<Pipeline>(&buf).expect("Failed to deserialize");
|
||||
|
||||
assert_eq!(pipeline.steps[0].symbol, "sh");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_kwargs() {
|
||||
let buf = r#"
|
||||
|
|
|
@ -72,11 +72,11 @@ pub struct Entrypoint {
|
|||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct Parameter {
|
||||
name: String,
|
||||
required: bool,
|
||||
pub name: String,
|
||||
pub required: bool,
|
||||
#[serde(rename = "type")]
|
||||
p_type: ParameterType,
|
||||
description: String,
|
||||
pub p_type: ParameterType,
|
||||
pub description: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
|
|
|
@ -18,7 +18,7 @@ struct RunWorkload {
|
|||
* context and will spawn an agent to run it.
|
||||
*
|
||||
*/
|
||||
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<()> {
|
||||
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<bool> {
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::process::Command;
|
||||
use tempfile::NamedTempFile;
|
||||
|
@ -29,6 +29,8 @@ fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<(
|
|||
steps: ctx.steps.clone(),
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string(&invocation).unwrap());
|
||||
|
||||
if let Err(failure) = serde_json::to_writer(&mut file, &invocation) {
|
||||
error!("Failed to write temporary file for agent: {:#?}", failure);
|
||||
return Err(Error::new(
|
||||
|
@ -38,13 +40,16 @@ fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<(
|
|||
}
|
||||
|
||||
if let Ok(output) = Command::new("otto-agent").arg(file.path()).output() {
|
||||
info!("output: {:?}", output);
|
||||
use std::io::{stdout, Write};
|
||||
stdout().write(&output.stdout);
|
||||
|
||||
return Ok(output.status.success());
|
||||
} else {
|
||||
// TODO
|
||||
error!("Failed to run agent");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn healthcheck(_req: Request<()>) -> tide::Result {
|
||||
|
@ -56,12 +61,21 @@ async fn healthcheck(_req: Request<()>) -> tide::Result {
|
|||
|
||||
async fn run_workload(mut req: Request<()>) -> tide::Result {
|
||||
let run: RunWorkload = req.body_json().await?;
|
||||
debug!("Received RunWorkload: {:#?}", run);
|
||||
debug!("Received RunWorkload: {:?}", run);
|
||||
|
||||
task::spawn(async move {
|
||||
println!("Running workload: {:#?}", run);
|
||||
for ctx in run.contexts.iter() {
|
||||
run_context(&run.pipeline, ctx);
|
||||
match run_context(&run.pipeline, ctx) {
|
||||
Ok(success) => {
|
||||
if !success {
|
||||
return;
|
||||
}
|
||||
debug!("Context succeeded, continuing");
|
||||
}
|
||||
Err(_) => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -74,7 +88,7 @@ async fn run_workload(mut req: Request<()>) -> tide::Result {
|
|||
#[async_std::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
use std::{env, net::TcpListener, os::unix::io::FromRawFd};
|
||||
tide::log::start();
|
||||
pretty_env_logger::init();
|
||||
|
||||
let mut app = tide::new();
|
||||
app.at("/health").get(healthcheck);
|
||||
|
|
Loading…
Reference in New Issue