Implement some JANKY AF data access layers

Things I've learned today:

* The [sqlx
  documentation](https://docs.rs/sqlx/0.6.2/sqlx/sqlite/types/index.html#uuid)
  is *lying* about its `uuid` support. Basically `query_as!` does not
  ser/deserialize `Uuid` properly in/out of Sqlite with `TEXT` _or_ `BLOB`
* There are [no useful
  examples](https://github.com/launchbadge/sqlx/issues/1014) of doing nested
  struct queries in sqlx at the moment
This commit is contained in:
R Tyler Croy 2023-01-29 17:28:40 -08:00
parent 8c63bf6095
commit 09ed646c76
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
8 changed files with 322 additions and 110 deletions

View File

@ -1 +1 @@
{"openapi":"3.0.0","info":{"description":"Janky Server API defintion\n","version":"1.0.0","title":"Janky APIs","contact":{"email":"rtyler+janky@brokenco.de"},"license":{"name":"AGPL v3.0","url":"https://www.gnu.org/licenses/agpl-3.0.en.html"}},"servers":[{"url":"http://localhost:8000/api/v1","description":"Local dev server (APIv1)"}],"paths":{"/projects/{name}":{"post":{"summary":"Trigger execution for this project","description":null,"parameters":[{"in":"path","name":"name","required":true,"example":"janky","schema":{"type":"string"}}],"responses":{"404":{"summary":"No project configured by that name"},"200":{"summary":"Execution has been triggered"}}}}}}
{"openapi":"3.0.0","info":{"description":"Janky Agent API defintion\n","version":"1.0.0","title":"Janky APIs","contact":{"email":"rtyler+janky@brokenco.de"},"license":{"name":"AGPL v3.0","url":"https://www.gnu.org/licenses/agpl-3.0.en.html"}},"servers":[{"url":"http://localhost:9000/api/v1","description":"Local dev agent (APIv1)"}],"paths":{"/capabilities":{"get":{"summary":"Retrieve a list of capabilities of this agent","description":null,"responses":{"200":{"description":"Getting capabilities","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CapsResponse"}}}}}}},"/execute":{"put":{"summary":"Execute a series of commands on this agent","description":null,"requestBody":{"content":{"application/json":{"schema":{"$ref":"#/components/schemas/CommandRequest"},"example":{"commands":[{"script":"echo \"Hi\""}]}}}},"responses":{"201":{"description":"Successfully accepted the commands for execution","content":{"application/json":{"schema":{"$ref":"#/components/schemas/CommandResponse"}}}},"409":{"description":"Returned when the agent is busy with another series of commands"}}}}},"components":{"schemas":{"CapsResponse":{"type":"object","properties":{"caps":{"type":"array","items":{"$ref":"#/components/schemas/Capability"}}}},"Capability":{"type":"object","properties":{"name":{"type":"string"},"path":{"type":"string"},"data":{"type":"object"}}},"Command":{"type":"object","properties":{"script":{"type":"string","description":"A script that can be exec()'d on the agent"}}},"CommandRequest":{"type":"object","properties":{"commands":{"type":"array","items":{"$ref":"#/components/schemas/Command"}}}},"CommandResponse":{"type":"object","properties":{"uuid":{"type":"string","format":"uuid"},"stream":{"description":"URL to streaming WebSockets logs","type":"string","format":"url"},"task":{"description":"URL to the task metadata","type":"string","format":"url"},"log":{"description":"URL to the raw log of the task run","type":"string","format":"url"}}}}}}

View File

@ -1,9 +0,0 @@
CREATE TABLE agents (
id INTEGER PRIMARY KEY,
uuid TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
capabilities TEXT,
url TEXT NOT NUll,
created_at TEXT NOT NULL
);
CREATE UNIQUE INDEX uuid_idx ON agents(uuid);

View File

@ -0,0 +1,24 @@
CREATE TABLE runs (
uuid TEXT NOT NULL PRIMARY KEY,
num INTEGER NOT NULL,
status INTEGER NOT NULL,
log_url TEXT NOT NULL,
definition TEXT NOT NULL,
scm_info TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (DATETIME('now')),
FOREIGN KEY(scm_info) REFERENCES scm_info(uuid),
FOREIGN KEY(definition) REFERENCES run_definition(uuid)
);
CREATE TABLE scm_info (
uuid TEXT NOT NULL PRIMARY KEY,
git_url TEXT NOT NULL,
ref TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (DATETIME('now'))
);
CREATE TABLE run_definition (
uuid TEXT NOT NULL PRIMARY KEY,
definition TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (DATETIME('now'))
);

View File

@ -123,7 +123,10 @@ async fn worker(receiver: Receiver<Work>) {
while let Ok(work) = receiver.recv().await {
let log_file = std::fs::File::create(&work.log_file).unwrap();
let mut bufw = std::io::BufWriter::new(log_file);
debug!("Starting to execute the commands");
debug!(
"Starting to execute the commands, output in {:?}",
&work.log_file
);
for command in work.command.commands.iter() {
debug!("Command: {:?}", command);
use os_pipe::pipe;

189
src/server/dao.rs Normal file
View File

@ -0,0 +1,189 @@
/*
* The DAO module contains all the necessary structs for interacting with the database
*/
use chrono::{DateTime, NaiveDateTime, Utc};
use sqlx::{FromRow, SqlitePool};
use url::Url;
use uuid::Uuid;
#[derive(Clone, Debug)]
struct Run {
run: RunRow,
scm_info: ScmInfo,
definition: RunDefinition,
}
#[derive(Clone, Debug)]
struct RunRow {
// Unique identifier for the Run
uuid: String,
// User-identifiable number for the Run, monotonically increasing
num: i64,
// Unix status return code from the run, zero is success
status: i64,
// Globally resolvable URL for fetching raw logs
log_url: String,
definition: String,
scm_info: String,
created_at: NaiveDateTime,
}
/* The basic implementation for Run has all the database access operations
*/
impl Run {
/*
* Create the Run in the database given the appropriate struct
*/
async fn create(run: &Run, pool: &SqlitePool) -> Result<(), sqlx::Error> {
let mut tx = pool.begin().await?;
sqlx::query!(
r#"INSERT INTO scm_info (uuid, git_url, ref, created_at) VALUES (?, ?, ?, ?)"#,
run.scm_info.uuid,
run.scm_info.git_url,
run.scm_info.r#ref,
run.scm_info.created_at
)
.execute(&mut tx)
.await;
sqlx::query!(
r#"INSERT INTO run_definition (uuid, definition, created_at) VALUES (?, ?, ?)"#,
run.definition.uuid,
run.definition.definition,
run.definition.created_at,
)
.execute(&mut tx)
.await?;
sqlx::query!(
"INSERT INTO runs (uuid, num, status, log_url, definition, scm_info) VALUES ($1, $2, $3, $4, $5, $6)",
run.run.uuid,
run.run.num,
run.run.status,
run.run.log_url,
run.definition.uuid,
run.scm_info.uuid,
)
.execute(&mut tx)
.await?;
tx.commit().await
}
/*
* Allow finding a Run by the given Uuid
*/
async fn find_by(uuid: &str, pool: &SqlitePool) -> Result<Run, sqlx::Error> {
let row = sqlx::query_as!(RunRow, "SELECT * FROM runs WHERE uuid = ?", uuid)
.fetch_one(pool)
.await?;
let scm_info = sqlx::query_as!(
ScmInfo,
"SELECT * FROM scm_info WHERE uuid = ?",
row.scm_info
)
.fetch_one(pool)
.await?;
let definition = sqlx::query_as!(
RunDefinition,
"SELECT * FROM run_definition WHERE uuid = ?",
row.definition
)
.fetch_one(pool)
.await?;
Ok(Run {
run: row,
scm_info,
definition,
})
}
}
impl Default for Run {
fn default() -> Self {
Self {
run: RunRow::default(),
scm_info: ScmInfo::default(),
definition: RunDefinition::default(),
}
}
}
impl Default for RunRow {
fn default() -> Self {
Self {
uuid: Uuid::new_v4().hyphenated().to_string(),
num: 42,
status: 0,
log_url: "https://example.com/console.log".into(),
definition: Uuid::new_v4().hyphenated().to_string(),
scm_info: Uuid::new_v4().hyphenated().to_string(),
created_at: Utc::now().naive_utc(),
}
}
}
#[derive(Clone, Debug)]
struct ScmInfo {
uuid: String,
git_url: String,
r#ref: String,
created_at: NaiveDateTime,
}
impl Default for ScmInfo {
fn default() -> Self {
Self {
uuid: Uuid::new_v4().hyphenated().to_string(),
git_url: "https://example.com/some/repo.git".into(),
r#ref: "main".into(),
created_at: Utc::now().naive_utc(),
}
}
}
#[derive(Clone, Debug)]
struct RunDefinition {
uuid: String,
definition: String,
created_at: NaiveDateTime,
}
impl Default for RunDefinition {
fn default() -> Self {
Self {
uuid: Uuid::new_v4().hyphenated().to_string(),
definition: String::new(),
created_at: Utc::now().naive_utc(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::SqlitePool;
async fn setup_database() -> SqlitePool {
let pool = SqlitePool::connect(":memory:")
.await
.expect("Failed to setup_database()");
sqlx::migrate!()
.run(&pool)
.await
.expect("Failed to run migrations in a test");
pool
}
#[async_std::test]
async fn test_create_a_run() {
pretty_env_logger::try_init();
let pool = setup_database().await;
let run = Run::default();
let result = Run::create(&run, &pool).await.unwrap();
let fetched_run = Run::find_by(&run.run.uuid, &pool).await.unwrap();
assert_eq!(run.run.uuid, fetched_run.run.uuid);
}
}

View File

@ -17,6 +17,9 @@ use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use url::Url;
mod dao;
mod routes;
#[derive(Clone, Debug)]
pub struct AppState<'a> {
pub db: SqlitePool,
@ -57,104 +60,6 @@ impl AppState<'_> {
}
}
/**
* The routes module contains all the tide routes and the logic to fulfill the responses for each
* route.
*
* Modules are nested for cleaner organization here
*/
mod routes {
use crate::AppState;
use tide::{Body, Request};
/**
* GET /
*/
pub async fn index(req: Request<AppState<'_>>) -> Result<Body, tide::Error> {
let params = json!({
"page": "home",
"config" : req.state().config,
});
let mut body = req.state().render("index", &params).await?;
body.set_mime("text/html");
Ok(body)
}
pub mod api {
use log::*;
use crate::{AppState, JankyYml, Scm};
use tide::{Request, Response, StatusCode};
/**
* POST /projects/{name}
*/
pub async fn execute_project(req: Request<AppState<'_>>) -> Result<Response, tide::Error> {
let name: String = req.param("name")?.into();
let state = req.state();
if !state.config.has_project(&name) {
debug!("Could not find project named: {}", name);
return Ok(Response::new(StatusCode::NotFound));
}
if let Some(project) = state.config.projects.get(&name) {
match &project.scm {
Scm::GitHub {
owner,
repo,
scm_ref,
} => {
debug!(
"Fetching the file {} from {}/{}",
&project.filename, owner, repo
);
let res = octocrab::instance()
.repos(owner, repo)
.raw_file(
octocrab::params::repos::Commitish(scm_ref.into()),
&project.filename,
)
.await?;
let jankyfile: JankyYml = serde_yaml::from_str(&res.text().await?)?;
debug!("text: {:?}", jankyfile);
for agent in &state.agents {
if agent.can_meet(&jankyfile.needs) {
debug!("agent: {:?} can meet our needs", agent);
let commands: Vec<janky::Command> = jankyfile
.commands
.iter()
.map(|c| janky::Command::with_script(c))
.collect();
let commands = janky::CommandRequest { commands };
let client = reqwest::Client::new();
let _res = client
.put(
agent
.url
.join("/api/v1/execute")
.expect("Failed to join execute URL"),
)
.json(&commands)
.send()
.await?;
return Ok(json!({
"msg": format!("Executing on {}", &agent.url)
})
.into());
}
}
}
}
return Ok("{}".into());
}
Ok(Response::new(StatusCode::InternalServerError))
}
}
}
#[derive(Clone, Debug, Deserialize)]
struct JankyYml {
needs: Vec<String>,
@ -278,7 +183,10 @@ async fn main() -> Result<(), tide::Error> {
});
}
state.register_templates().await.expect("Failed to register handlebars templates");
state
.register_templates()
.await
.expect("Failed to register handlebars templates");
let mut app = tide::with_state(state);
#[cfg(not(debug_assertions))]

94
src/server/routes.rs Normal file
View File

@ -0,0 +1,94 @@
/**
* The routes module contains all the tide routes and the logic to fulfill the responses for each
* route.
*
* Modules are nested for cleaner organization here
*/
use crate::AppState;
use tide::{Body, Request};
/**
* GET /
*/
pub async fn index(req: Request<AppState<'_>>) -> Result<Body, tide::Error> {
let params = json!({
"page": "home",
"config" : req.state().config,
});
let mut body = req.state().render("index", &params).await?;
body.set_mime("text/html");
Ok(body)
}
pub mod api {
use crate::{AppState, JankyYml, Scm};
use log::*;
use tide::{Request, Response, StatusCode};
/**
* POST /projects/{name}
*/
pub async fn execute_project(req: Request<AppState<'_>>) -> Result<Response, tide::Error> {
let name: String = req.param("name")?.into();
let state = req.state();
if !state.config.has_project(&name) {
debug!("Could not find project named: {}", name);
return Ok(Response::new(StatusCode::NotFound));
}
if let Some(project) = state.config.projects.get(&name) {
match &project.scm {
Scm::GitHub {
owner,
repo,
scm_ref,
} => {
debug!(
"Fetching the file {} from {}/{}",
&project.filename, owner, repo
);
let res = octocrab::instance()
.repos(owner, repo)
.raw_file(
octocrab::params::repos::Commitish(scm_ref.into()),
&project.filename,
)
.await?;
let jankyfile: JankyYml = serde_yaml::from_str(&res.text().await?)?;
debug!("text: {:?}", jankyfile);
for agent in &state.agents {
if agent.can_meet(&jankyfile.needs) {
debug!("agent: {:?} can meet our needs", agent);
let commands: Vec<janky::Command> = jankyfile
.commands
.iter()
.map(|c| janky::Command::with_script(c))
.collect();
let commands = janky::CommandRequest { commands };
let client = reqwest::Client::new();
let _res = client
.put(
agent
.url
.join("/api/v1/execute")
.expect("Failed to join execute URL"),
)
.json(&commands)
.send()
.await?;
return Ok(
json!({ "msg": format!("Executing on {}", &agent.url) }).into()
);
}
}
}
}
return Ok("{}".into());
}
Ok(Response::new(StatusCode::InternalServerError))
}
}

View File

@ -34,6 +34,9 @@
No Description
</td>
<td>
<form method="POST" action="/api/v1/projects/{{@key}}">
<input type="submit" value="Execute"/>
</form>
</td>
</tr>
{{/each}}