Very simple and naive agent launching in the local orchestrator

Nothing here is suitable yet, but we're getting there.

Run with:

curl -XPOST -d @test-runworkload.json http://localhost:7673/v1/run

The JSON file:

{
    "pipeline": "2265b5d0-1f70-46de-bf50-f1050e9fac9a",
    "contexts" : [
        {
            "uuid" :"3ce1f6fb-79ca-4564-a47e-98265f53ef7f",
            "properties" : {},
            "steps" : [
                {
                    "symbol":  "sh",
                    "uuid": "5599cffb-f23a-4e0f-a0b9-f74654641b2b",
                    "context": "3ce1f6fb-79ca-4564-a47e-98265f53ef7f",
                    "parameters" : {
                        "script" : "pwd"
                    }
                }
            ]
        }
    ]
}
This commit is contained in:
R Tyler Croy 2020-11-22 15:50:34 -08:00
parent e21f112ed6
commit bc8e6e7cec
7 changed files with 60 additions and 96 deletions

2
Cargo.lock generated
View File

@ -1606,10 +1606,12 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"log", "log",
"otto-agent",
"otto-models", "otto-models",
"pretty_env_logger 0.4.0", "pretty_env_logger 0.4.0",
"serde 1.0.117", "serde 1.0.117",
"serde_json", "serde_json",
"tempfile",
"tide 0.15.0", "tide 0.15.0",
"uuid", "uuid",
] ]

View File

@ -1,7 +1,7 @@
use async_std::sync::Receiver; use async_std::sync::Receiver;
use log::*; use log::*;
use otto_models::*; use otto_models::*;
use serde::Serialize; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -12,6 +12,15 @@ use uuid::Uuid;
pub mod control; pub mod control;
pub mod step; pub mod step;
/**
* The format of the invocation file for the agent
*/
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Invocation {
pub pipeline: Uuid,
pub steps: Vec<otto_models::Step>,
}
/** /**
* Log is a data structure which captures the necessary metadata for logging a single line * Log is a data structure which captures the necessary metadata for logging a single line
*/ */

View File

@ -4,10 +4,8 @@
* Most of the logic _should_ be contained within lib.rs and the surrounding modules * Most of the logic _should_ be contained within lib.rs and the surrounding modules
*/ */
use async_std::sync::channel; use async_std::sync::channel;
use serde::Deserialize;
use std::fs::File; use std::fs::File;
use std::path::Path; use std::path::Path;
use uuid::Uuid;
use otto_agent::*; use otto_agent::*;
@ -19,15 +17,6 @@ use otto_agent::*;
*/ */
const MAX_CONTROL_MSGS: usize = 64; const MAX_CONTROL_MSGS: usize = 64;
/**
* The format of the invocation file for the agent
*/
#[derive(Clone, Debug, Deserialize)]
struct Invocation {
pipeline: Uuid,
steps: Vec<otto_models::Step>,
}
/** /**
* Ensure the directory exists by making it or panicking * Ensure the directory exists by making it or panicking
*/ */

View File

@ -140,12 +140,6 @@ pub enum StepParameters {
Keyword(HashMap<String, Value>), Keyword(HashMap<String, Value>),
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
struct MockStep {
symbol: String,
parameters: StepParameters,
}
/** /**
* Generate a UUID v4 for use in structs, etc * Generate a UUID v4 for use in structs, etc
* *

View File

@ -7,9 +7,11 @@ edition = "2018"
[dependencies] [dependencies]
async-std = { version = "~1.7", features = ["attributes"]} async-std = { version = "~1.7", features = ["attributes"]}
log = "~0.4.11" log = "~0.4.11"
otto-agent = { path = "../../agent" }
otto-models = { path = "../../models" } otto-models = { path = "../../models" }
pretty_env_logger = "~0.4.0" pretty_env_logger = "~0.4.0"
serde = {version = "~1.0.117", features = ["rc", "derive"]} serde = {version = "~1.0.117", features = ["rc", "derive"]}
serde_json = "~1.0.59" serde_json = "~1.0.59"
uuid = { version = "~0.8.1", features = ["v4", "serde"]} tempfile = "~3.1.0"
tide = "~0.15.0" tide = "~0.15.0"
uuid = { version = "~0.8.1", features = ["v4", "serde"]}

View File

@ -1,75 +0,0 @@
use log::*;
use std::path::PathBuf;
use tide::Request;
#[derive(Clone, Debug)]
pub struct State {
pub upload_dir: PathBuf,
}
async fn put_object(req: Request<State>) -> tide::Result {
use async_std::{fs::OpenOptions, io};
let key = req.url().path();
info!("Uploading: {:#?} into {:#?}", key, req.state().upload_dir);
/*
* A path will normally come in like /SomeFile.xlsx and Path::push will see
* that as a new absolute file which doesn't _join_ but instead overwrites
*/
let key = key.strip_prefix("/").unwrap_or(key);
let fs_path = req.state().upload_dir.join(key);
/*
* In the case of nested keys, we need to create the layout on disk
*/
if let Some(parent) = fs_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.write(true)
.open(&fs_path)
.await?;
let bytes_written = io::copy(req, file).await?;
tide::log::info!("file written", {
bytes: bytes_written,
path: fs_path.canonicalize()?.to_str()
});
Ok("{}".into())
}
async fn get_object(req: Request<State>) -> tide::Result {
use tide::{Body, Response};
let key = req.url().path();
info!("Fetching: {:#?} from{:#?}", key, req.state().upload_dir);
let key = key.strip_prefix("/").unwrap_or(key);
let fs_path = req.state().upload_dir.join(key);
if fs_path.exists() {
Ok(Response::builder(200)
.body(Body::from_file(&fs_path).await?)
.build())
} else {
Err(tide::Error::from_str(404, "Failed to locate key"))
}
}
pub fn app(mut upload_dir: PathBuf) -> tide::Server<State> {
upload_dir = std::fs::canonicalize(upload_dir).expect("Unable to canonicalize the upload_dir");
let state = State { upload_dir };
let mut app = tide::with_state(state);
app.at("/*").put(put_object);
app.at("/*").get(get_object);
app.at("/").get(|_| async { Ok("Hello, world!") });
app
}
#[cfg(test)]
mod tests {}

View File

@ -1,6 +1,7 @@
/* /*
* The local orchestrator doesn't do much * The local orchestrator doesn't do much
*/ */
use async_std::task;
use log::*; use log::*;
use serde::Deserialize; use serde::Deserialize;
use tide::Request; use tide::Request;
@ -12,6 +13,40 @@ struct RunWorkload {
contexts: Vec<otto_models::Context>, contexts: Vec<otto_models::Context>,
} }
/**
* This function is the core of the local-orchestrator in that it takes a
* context and will spawn an agent to run it.
*
*/
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<()> {
use std::io::{Error, ErrorKind};
use std::process::Command;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new()?;
let invocation = otto_agent::Invocation {
pipeline: *pipeline,
steps: ctx.steps.clone(),
};
if let Err(failure) = serde_json::to_writer(&mut file, &invocation) {
error!("Failed to write temporary file for agent: {:#?}", failure);
return Err(Error::new(
ErrorKind::InvalidData,
"Could not write temporary file",
));
}
if let Ok(output) = Command::new("otto-agent").arg(file.path()).output() {
info!("output: {:?}", output);
} else {
// TODO
error!("Failed to run agent");
}
Ok(())
}
async fn healthcheck(_req: Request<()>) -> tide::Result { async fn healthcheck(_req: Request<()>) -> tide::Result {
Ok(tide::Response::builder(200) Ok(tide::Response::builder(200)
.body("{}") .body("{}")
@ -20,10 +55,15 @@ async fn healthcheck(_req: Request<()>) -> tide::Result {
} }
async fn run_workload(mut req: Request<()>) -> tide::Result { async fn run_workload(mut req: Request<()>) -> tide::Result {
let run: RunWorkload = req.body_json().await?; let run: RunWorkload = req.body_json().await?;
debug!("Received RunWorkload: {:#?}", run); debug!("Received RunWorkload: {:#?}", run);
// TODO: do something actually useful :D task::spawn(async move {
println!("Running workload: {:#?}", run);
for ctx in run.contexts.iter() {
run_context(&run.pipeline, ctx);
}
});
Ok(tide::Response::builder(200) Ok(tide::Response::builder(200)
.body("{}") .body("{}")
@ -47,3 +87,6 @@ async fn main() -> std::io::Result<()> {
} }
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {}