Refactored the work execution into its own task governed by a bounded channel

The bounded channel prevents too many things from trying to be executed, but at
the moment the API will not return a 409 unless there are *two* subsequent
requests while work is being done
This commit is contained in:
R Tyler Croy 2023-01-28 18:35:37 -08:00
parent 433239f47d
commit eea8fbf8a9
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
2 changed files with 82 additions and 31 deletions

View File

@ -39,7 +39,7 @@ paths:
commands:
- script: 'echo "Hi"'
responses:
200:
201:
description: 'Successfully accepted the commands for execution'
content:
application/json:

View File

@ -3,8 +3,11 @@ extern crate serde_json;
use std::path::PathBuf;
use async_std::channel::{bounded, Receiver, Sender};
use dotenv::dotenv;
use janky::CommandRequest;
use log::*;
use uuid::Uuid;
const AGENT_LOGS_DIR: &str = "agent-logs";
@ -16,22 +19,22 @@ mod routes {
/**
* GET /
*/
pub async fn index(_req: Request<()>) -> Result<Body, tide::Error> {
pub async fn index(_req: Request<crate::State>) -> Result<Body, tide::Error> {
Ok("Hello World from the Janky Agent".into())
}
pub mod api {
use crate::caps::*;
use crate::AGENT_LOGS_DIR;
use crate::*;
use janky::{CommandRequest, CommandResponse};
use log::*;
use tide::{Body, Request};
use tide::{Body, Request, Response, StatusCode};
use url::Url;
use uuid::Uuid;
use std::path::Path;
pub fn register(app: &mut tide::Server<()>) {
pub fn register(app: &mut tide::Server<State>) {
app.at("/api/v1/capabilities").get(get_caps);
app.at("/api/v1/execute").put(execute);
}
@ -41,7 +44,14 @@ mod routes {
*
* This will take in the commands to actually execute
*/
pub async fn execute(mut req: Request<()>) -> Result<Body, tide::Error> {
pub async fn execute(mut req: Request<State>) -> Result<Response, tide::Error> {
// If we cannot accept work right now return an HTTP 409
if req.state().channel.is_full() {
let mut response = Response::new(StatusCode::Conflict);
response.set_body("{}");
return Ok(response);
}
let c: CommandRequest = req.body_json().await?;
debug!("Commands to exec: {:?}", c);
let uuid = Uuid::new_v4();
@ -51,28 +61,12 @@ mod routes {
std::fs::create_dir(log_dir.clone());
let log_file_path = log_dir.join("console.log");
let log_file = std::fs::File::create(log_file_path.clone()).unwrap();
let mut bufw = std::io::BufWriter::new(log_file);
for command in c.commands.iter() {
use os_pipe::pipe;
use std::io::{BufRead, BufReader, Write};
use std::process::Command;
let mut cmd = Command::new("sh");
cmd.args(["-xec", &command.script]);
let (mut reader, writer) = pipe().unwrap();
let writer_clone = writer.try_clone().unwrap();
cmd.stdout(writer);
cmd.stderr(writer_clone);
let mut handle = cmd.spawn()?;
drop(cmd);
debug!("executing: {}", &command.script);
std::io::copy(&mut reader, &mut bufw);
let status = handle.wait()?;
debug!("status of {}: {:?}", &command.script, status);
}
let work = Work {
task: uuid,
log_file: log_file_path.clone(),
command: c,
};
req.state().channel.send(work).await;
let response = CommandResponse {
uuid,
@ -83,13 +77,15 @@ mod routes {
.join(&format!("../../{}", log_file_path.display()))
.unwrap(),
};
Ok(Body::from_json(&response)?)
let mut http_response = Response::new(StatusCode::Created);
http_response.set_body(Body::from_json(&response)?);
Ok(http_response)
}
/*
* GET /capabilities
*/
pub async fn get_caps(_req: Request<()>) -> Result<Body, tide::Error> {
pub async fn get_caps(_req: Request<State>) -> Result<Body, tide::Error> {
let response = json!({
"caps" : [
Git::has_capability(),
@ -102,11 +98,66 @@ mod routes {
}
}
/*
* Struct to encapsulate execution from a request handler to the worker thread
*/
#[derive(Clone, Debug)]
struct Work {
task: Uuid,
log_file: PathBuf,
command: CommandRequest,
}
/*
* State struct just carries data into Tide request handlers
*/
#[derive(Clone, Debug)]
pub struct State {
channel: Sender<Work>,
}
/*
* The worker function just does a busy loop executing Work
*/
async fn worker(receiver: Receiver<Work>) {
debug!("Worker thread starting");
while let Ok(work) = receiver.recv().await {
let log_file = std::fs::File::create(&work.log_file).unwrap();
let mut bufw = std::io::BufWriter::new(log_file);
debug!("Starting to execute the commands");
for command in work.command.commands.iter() {
debug!("Command: {:?}", command);
use os_pipe::pipe;
use std::io::{BufRead, BufReader, Write};
use std::process::Command;
let mut cmd = Command::new("sh");
cmd.args(["-xec", &command.script]);
let (mut reader, writer) = pipe().expect("Failed to create pipe");
let writer_clone = writer.try_clone().expect("Failed to clone writer pipe");
cmd.stdout(writer);
cmd.stderr(writer_clone);
let mut handle = cmd.spawn().expect("Failed to launch command");
drop(cmd);
debug!("executing: {}", &command.script);
std::io::copy(&mut reader, &mut bufw);
let status = handle.wait().expect("Failed to wait on handle");
debug!("status of {}: {:?}", &command.script, status);
}
}
}
#[async_std::main]
async fn main() -> Result<(), tide::Error> {
pretty_env_logger::init();
dotenv().ok();
let mut app = tide::new();
let (sender, receiver) = bounded(1);
async_std::task::spawn(worker(receiver));
let state = State { channel: sender };
let mut app = tide::with_state(state);
#[cfg(not(debug_assertions))]
{