Compare commits
4 Commits
96227d96d5
...
bc8e6e7cec
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | bc8e6e7cec | |
R Tyler Croy | e21f112ed6 | |
R Tyler Croy | c7d50516ac | |
R Tyler Croy | 6ccb9d841d |
|
@ -1606,8 +1606,14 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"async-std",
|
||||
"log",
|
||||
"otto-agent",
|
||||
"otto-models",
|
||||
"pretty_env_logger 0.4.0",
|
||||
"serde 1.0.117",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"tide 0.15.0",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
20
Makefile
20
Makefile
|
@ -8,21 +8,25 @@
|
|||
|
||||
################################################################################
|
||||
## Phony targets
|
||||
.PHONY: apispecs clean diagram help steps release run
|
||||
|
||||
release:
|
||||
run: ## Convenience target for running services
|
||||
./scripts/shoreman
|
||||
|
||||
release: ## Build release binaries of everything
|
||||
cargo build --release
|
||||
# Strip all the executables for size, does impact debug symbols
|
||||
find target/release -type f -executable -exec strip {} \;
|
||||
|
||||
steps: release
|
||||
steps: release ## Package up all the stdlib steps as tarballs with osp
|
||||
for dir in $$(find stdlib -maxdepth 1 -type d | tail -n +2); do \
|
||||
echo ">> Packaging $$dir"; \
|
||||
./target/release/osp $$dir; \
|
||||
done;
|
||||
|
||||
apispecs:
|
||||
schemathesis run ./services/parser/apispec.yml --base-url=http://localhost:7672 --checks all
|
||||
schemathesis run ./services/local-orchestrator/apispec.yml --base-url=http://localhost:7673 --checks all
|
||||
apispecs: ## Run the OpenAPI-based specification tests, requires servers to be running already
|
||||
schemathesis run ./services/local-orchestrator/apispec.yml --base-url=http://localhost:7673 --checks all --hypothesis-suppress-health-check too_slow
|
||||
schemathesis run ./services/parser/apispec.yml --base-url=http://localhost:7672 --checks all --hypothesis-suppress-health-check too_slow
|
||||
|
||||
test: contrib/shunit2/shunit2 ## Run the acceptance tests for steps
|
||||
set -e
|
||||
|
@ -47,9 +51,11 @@ clean: ## Clean all temporary/working files
|
|||
diagram: system.png system.dot ## Generate the diagrams describing otto
|
||||
dot -Tpng -o system.png system.dot
|
||||
|
||||
################################################################################
|
||||
|
||||
|
||||
################################################################################
|
||||
contrib/shunit2/shunit2:
|
||||
git submodule update --init
|
||||
|
||||
################################################################################
|
||||
|
||||
.PHONY: apispecs clean diagram help steps release
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use async_std::sync::Receiver;
|
||||
use log::*;
|
||||
use otto_models::*;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
@ -12,6 +12,15 @@ use uuid::Uuid;
|
|||
pub mod control;
|
||||
pub mod step;
|
||||
|
||||
/**
|
||||
* The format of the invocation file for the agent
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct Invocation {
|
||||
pub pipeline: Uuid,
|
||||
pub steps: Vec<otto_models::Step>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Log is a data structure which captures the necessary metadata for logging a single line
|
||||
*/
|
||||
|
|
|
@ -4,10 +4,8 @@
|
|||
* Most of the logic _should_ be contained within lib.rs and the surrounding modules
|
||||
*/
|
||||
use async_std::sync::channel;
|
||||
use serde::Deserialize;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use uuid::Uuid;
|
||||
|
||||
use otto_agent::*;
|
||||
|
||||
|
@ -19,15 +17,6 @@ use otto_agent::*;
|
|||
*/
|
||||
const MAX_CONTROL_MSGS: usize = 64;
|
||||
|
||||
/**
|
||||
* The format of the invocation file for the agent
|
||||
*/
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct Invocation {
|
||||
pipeline: Uuid,
|
||||
steps: Vec<otto_models::Step>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the directory exists by making it or panicking
|
||||
*/
|
||||
|
|
|
@ -140,12 +140,6 @@ pub enum StepParameters {
|
|||
Keyword(HashMap<String, Value>),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
struct MockStep {
|
||||
symbol: String,
|
||||
parameters: StepParameters,
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a UUID v4 for use in structs, etc
|
||||
*
|
||||
|
|
|
@ -7,5 +7,11 @@ edition = "2018"
|
|||
[dependencies]
|
||||
async-std = { version = "~1.7", features = ["attributes"]}
|
||||
log = "~0.4.11"
|
||||
otto-agent = { path = "../../agent" }
|
||||
otto-models = { path = "../../models" }
|
||||
pretty_env_logger = "~0.4.0"
|
||||
serde = {version = "~1.0.117", features = ["rc", "derive"]}
|
||||
serde_json = "~1.0.59"
|
||||
tempfile = "~3.1.0"
|
||||
tide = "~0.15.0"
|
||||
uuid = { version = "~0.8.1", features = ["v4", "serde"]}
|
||||
|
|
|
@ -30,5 +30,58 @@ paths:
|
|||
'200':
|
||||
description: 'A successful healthcheck'
|
||||
content:
|
||||
application/json: {}
|
||||
'application/json': {}
|
||||
'/v1/run':
|
||||
post:
|
||||
operationId: RunWorkload
|
||||
description: |
|
||||
The primary interface for the orchestrator which allows external services to
|
||||
provision an agent to run the specified workload.
|
||||
|
||||
This endpoint is not _synchronous_ insofar that it will enqueue the
|
||||
workload. It will not block until the workload has completed execution
|
||||
for hopefully obvious reasons.
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
'application/json':
|
||||
schema:
|
||||
$ref: '#/components/schemas/RunWorkloadRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: 'Successfully enqueued the workload with the orchestrator'
|
||||
|
||||
'422':
|
||||
description: 'Unprocessable data, usually not JSON or not UTF-6 encoded'
|
||||
|
||||
components:
|
||||
schemas:
|
||||
RunWorkloadRequest:
|
||||
description: |
|
||||
The primary APi payload for the orchestrator, which includes the workloads
|
||||
that should be provisioned and executed by the orchestrator.
|
||||
|
||||
THe list of contexts should _only_ those that can be safely executed by one
|
||||
agent in sequence.
|
||||
type: object
|
||||
required:
|
||||
- pipeline
|
||||
- contexts
|
||||
properties:
|
||||
pipeline:
|
||||
type: string
|
||||
format: uuid
|
||||
contexts:
|
||||
type: array
|
||||
example:
|
||||
pipeline: '9edc4483-a78a-480f-8e06-2726db1ddf24'
|
||||
contexts:
|
||||
- uuid: '8109f601-12e8-4621-96c6-11baff409d93'
|
||||
properties:
|
||||
name: 'Build'
|
||||
steps:
|
||||
- uuid: '6193b9b1-c6be-4c18-9bb8-1aeead5e7d14'
|
||||
context: '8109f601-12e8-4621-96c6-11baff409d93'
|
||||
symbol: 'sh'
|
||||
parameters:
|
||||
- 'ls'
|
||||
|
|
|
@ -1,75 +0,0 @@
|
|||
use log::*;
|
||||
use std::path::PathBuf;
|
||||
use tide::Request;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct State {
|
||||
pub upload_dir: PathBuf,
|
||||
}
|
||||
|
||||
async fn put_object(req: Request<State>) -> tide::Result {
|
||||
use async_std::{fs::OpenOptions, io};
|
||||
let key = req.url().path();
|
||||
|
||||
info!("Uploading: {:#?} into {:#?}", key, req.state().upload_dir);
|
||||
|
||||
/*
|
||||
* A path will normally come in like /SomeFile.xlsx and Path::push will see
|
||||
* that as a new absolute file which doesn't _join_ but instead overwrites
|
||||
*/
|
||||
let key = key.strip_prefix("/").unwrap_or(key);
|
||||
let fs_path = req.state().upload_dir.join(key);
|
||||
|
||||
/*
|
||||
* In the case of nested keys, we need to create the layout on disk
|
||||
*/
|
||||
if let Some(parent) = fs_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&fs_path)
|
||||
.await?;
|
||||
|
||||
let bytes_written = io::copy(req, file).await?;
|
||||
|
||||
tide::log::info!("file written", {
|
||||
bytes: bytes_written,
|
||||
path: fs_path.canonicalize()?.to_str()
|
||||
});
|
||||
|
||||
Ok("{}".into())
|
||||
}
|
||||
|
||||
async fn get_object(req: Request<State>) -> tide::Result {
|
||||
use tide::{Body, Response};
|
||||
|
||||
let key = req.url().path();
|
||||
info!("Fetching: {:#?} from{:#?}", key, req.state().upload_dir);
|
||||
|
||||
let key = key.strip_prefix("/").unwrap_or(key);
|
||||
let fs_path = req.state().upload_dir.join(key);
|
||||
|
||||
if fs_path.exists() {
|
||||
Ok(Response::builder(200)
|
||||
.body(Body::from_file(&fs_path).await?)
|
||||
.build())
|
||||
} else {
|
||||
Err(tide::Error::from_str(404, "Failed to locate key"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn app(mut upload_dir: PathBuf) -> tide::Server<State> {
|
||||
upload_dir = std::fs::canonicalize(upload_dir).expect("Unable to canonicalize the upload_dir");
|
||||
let state = State { upload_dir };
|
||||
let mut app = tide::with_state(state);
|
||||
app.at("/*").put(put_object);
|
||||
app.at("/*").get(get_object);
|
||||
app.at("/").get(|_| async { Ok("Hello, world!") });
|
||||
app
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
|
@ -1,7 +1,51 @@
|
|||
/*
|
||||
* The local orchestrator doesn't do much
|
||||
*/
|
||||
use async_std::task;
|
||||
use log::*;
|
||||
use serde::Deserialize;
|
||||
use tide::Request;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
struct RunWorkload {
|
||||
pipeline: Uuid,
|
||||
contexts: Vec<otto_models::Context>,
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is the core of the local-orchestrator in that it takes a
|
||||
* context and will spawn an agent to run it.
|
||||
*
|
||||
*/
|
||||
fn run_context(pipeline: &Uuid, ctx: &otto_models::Context) -> std::io::Result<()> {
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::process::Command;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
let mut file = NamedTempFile::new()?;
|
||||
let invocation = otto_agent::Invocation {
|
||||
pipeline: *pipeline,
|
||||
steps: ctx.steps.clone(),
|
||||
};
|
||||
|
||||
if let Err(failure) = serde_json::to_writer(&mut file, &invocation) {
|
||||
error!("Failed to write temporary file for agent: {:#?}", failure);
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
"Could not write temporary file",
|
||||
));
|
||||
}
|
||||
|
||||
if let Ok(output) = Command::new("otto-agent").arg(file.path()).output() {
|
||||
info!("output: {:?}", output);
|
||||
} else {
|
||||
// TODO
|
||||
error!("Failed to run agent");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn healthcheck(_req: Request<()>) -> tide::Result {
|
||||
Ok(tide::Response::builder(200)
|
||||
|
@ -10,6 +54,23 @@ async fn healthcheck(_req: Request<()>) -> tide::Result {
|
|||
.build())
|
||||
}
|
||||
|
||||
async fn run_workload(mut req: Request<()>) -> tide::Result {
|
||||
let run: RunWorkload = req.body_json().await?;
|
||||
debug!("Received RunWorkload: {:#?}", run);
|
||||
|
||||
task::spawn(async move {
|
||||
println!("Running workload: {:#?}", run);
|
||||
for ctx in run.contexts.iter() {
|
||||
run_context(&run.pipeline, ctx);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(tide::Response::builder(200)
|
||||
.body("{}")
|
||||
.content_type("application/json")
|
||||
.build())
|
||||
}
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
use std::{env, net::TcpListener, os::unix::io::FromRawFd};
|
||||
|
@ -17,6 +78,7 @@ async fn main() -> std::io::Result<()> {
|
|||
|
||||
let mut app = tide::new();
|
||||
app.at("/health").get(healthcheck);
|
||||
app.at("/v1/run").post(run_workload);
|
||||
|
||||
if let Some(fd) = env::var("LISTEN_FD").ok().and_then(|fd| fd.parse().ok()) {
|
||||
app.listen(unsafe { TcpListener::from_raw_fd(fd) }).await?;
|
||||
|
@ -25,3 +87,6 @@ async fn main() -> std::io::Result<()> {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
||||
|
|
|
@ -31,7 +31,7 @@ paths:
|
|||
'200':
|
||||
description: 'A successful healthcheck'
|
||||
content:
|
||||
application/json: {}
|
||||
'application/json': {}
|
||||
|
||||
'/v1/parse':
|
||||
post:
|
||||
|
@ -44,7 +44,7 @@ paths:
|
|||
description: 'A string payload in the Otto Pipeline syntax'
|
||||
required: true
|
||||
content:
|
||||
text/plain:
|
||||
'text/plain':
|
||||
schema:
|
||||
type: string
|
||||
examples:
|
||||
|
@ -64,13 +64,13 @@ paths:
|
|||
'200':
|
||||
description: 'Successfully parsed'
|
||||
content:
|
||||
application/json:
|
||||
'application/json':
|
||||
schema:
|
||||
$ref: '#/components/schemas/ParsePipelineResponse'
|
||||
'400':
|
||||
description: 'Failed to parse the pipeline for some reason'
|
||||
content:
|
||||
application/json:
|
||||
'application/json':
|
||||
schema:
|
||||
$ref: '#/components/schemas/ParsePipelineFailure'
|
||||
'422':
|
||||
|
|
|
@ -17,7 +17,7 @@ async fn parse(mut req: Request<()>) -> tide::Result {
|
|||
match parsed {
|
||||
Err(e) => {
|
||||
use pest::error::*;
|
||||
error!("Failed to parse: {:#?}", e);
|
||||
error!("Failed to parse: {:?}", e);
|
||||
|
||||
let variant = match e.variant {
|
||||
ErrorVariant::CustomError { message } => message,
|
||||
|
|
Loading…
Reference in New Issue