parent
b2011e85e1
commit
508346498d
|
@ -1600,6 +1600,16 @@ dependencies = [
|
|||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "otto-local-orchestrator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"log",
|
||||
"pretty_env_logger 0.4.0",
|
||||
"tide 0.15.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "otto-models"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -7,6 +7,7 @@ members = [
|
|||
|
||||
"services/auctioneer",
|
||||
"services/eventbus",
|
||||
"services/local-orchestrator",
|
||||
"services/object-store",
|
||||
"services/parser",
|
||||
|
||||
|
|
2
Procfile
2
Procfile
|
@ -3,5 +3,7 @@
|
|||
#
|
||||
|
||||
object-store: OTTO_OBJECT_DIR=tmp/objects RUST_LOG=debug ./target/debug/otto-object-store
|
||||
orchestrator: RUST_LOG=debug ./target/debug/otto-local-orchestrator
|
||||
parser: RUST_LOG=debug ./target/debug/otto-parser
|
||||
|
||||
# vim: ft=sh
|
||||
|
|
|
@ -23,5 +23,9 @@ This document captures some details around developing these Otto services.
|
|||
| Parser
|
||||
| Service which parses .otto files and spits out the Otto intermediate execution format.
|
||||
|
||||
| 7673
|
||||
| Orchestrator
|
||||
| Service which can provision environments capable of executing agents.
|
||||
|
||||
|===
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
target/
|
|
@ -0,0 +1,11 @@
|
|||
[package]
|
||||
name = "otto-local-orchestrator"
|
||||
version = "0.1.0"
|
||||
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
async-std = { version = "~1.7", features = ["attributes"]}
|
||||
log = "~0.4.11"
|
||||
pretty_env_logger = "~0.4.0"
|
||||
tide = "~0.15.0"
|
|
@ -0,0 +1,12 @@
|
|||
= Local Orchestrator
|
||||
|
||||
The Otto Local Orchestrator is the simplest implementation of an orchestrator
|
||||
in that it simply receives the orchestration payload and launches the
|
||||
`otto-agent` locally.
|
||||
|
||||
|
||||
.Environnment Variables
|
||||
|===
|
||||
| Name | Default | Description
|
||||
|
||||
|===
|
|
@ -0,0 +1,75 @@
|
|||
use log::*;
|
||||
use std::path::PathBuf;
|
||||
use tide::Request;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct State {
|
||||
pub upload_dir: PathBuf,
|
||||
}
|
||||
|
||||
async fn put_object(req: Request<State>) -> tide::Result {
|
||||
use async_std::{fs::OpenOptions, io};
|
||||
let key = req.url().path();
|
||||
|
||||
info!("Uploading: {:#?} into {:#?}", key, req.state().upload_dir);
|
||||
|
||||
/*
|
||||
* A path will normally come in like /SomeFile.xlsx and Path::push will see
|
||||
* that as a new absolute file which doesn't _join_ but instead overwrites
|
||||
*/
|
||||
let key = key.strip_prefix("/").unwrap_or(key);
|
||||
let fs_path = req.state().upload_dir.join(key);
|
||||
|
||||
/*
|
||||
* In the case of nested keys, we need to create the layout on disk
|
||||
*/
|
||||
if let Some(parent) = fs_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&fs_path)
|
||||
.await?;
|
||||
|
||||
let bytes_written = io::copy(req, file).await?;
|
||||
|
||||
tide::log::info!("file written", {
|
||||
bytes: bytes_written,
|
||||
path: fs_path.canonicalize()?.to_str()
|
||||
});
|
||||
|
||||
Ok("{}".into())
|
||||
}
|
||||
|
||||
async fn get_object(req: Request<State>) -> tide::Result {
|
||||
use tide::{Body, Response};
|
||||
|
||||
let key = req.url().path();
|
||||
info!("Fetching: {:#?} from{:#?}", key, req.state().upload_dir);
|
||||
|
||||
let key = key.strip_prefix("/").unwrap_or(key);
|
||||
let fs_path = req.state().upload_dir.join(key);
|
||||
|
||||
if fs_path.exists() {
|
||||
Ok(Response::builder(200)
|
||||
.body(Body::from_file(&fs_path).await?)
|
||||
.build())
|
||||
} else {
|
||||
Err(tide::Error::from_str(404, "Failed to locate key"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn app(mut upload_dir: PathBuf) -> tide::Server<State> {
|
||||
upload_dir = std::fs::canonicalize(upload_dir).expect("Unable to canonicalize the upload_dir");
|
||||
let state = State { upload_dir };
|
||||
let mut app = tide::with_state(state);
|
||||
app.at("/*").put(put_object);
|
||||
app.at("/*").get(get_object);
|
||||
app.at("/").get(|_| async { Ok("Hello, world!") });
|
||||
app
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {}
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* The local orchestrator doesn't do much
|
||||
*/
|
||||
|
||||
#[async_std::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
use std::{env, net::TcpListener, os::unix::io::FromRawFd};
|
||||
tide::log::start();
|
||||
|
||||
let app = tide::new();
|
||||
|
||||
if let Some(fd) = env::var("LISTEN_FD").ok().and_then(|fd| fd.parse().ok()) {
|
||||
app.listen(unsafe { TcpListener::from_raw_fd(fd) }).await?;
|
||||
} else {
|
||||
app.listen("http://localhost:7673").await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue