add new endpoints

This commit is contained in:
Bevan Hunt 2020-01-03 17:01:35 -08:00
parent afd6a859bf
commit 6650fbaa99
4 changed files with 85 additions and 2 deletions

10
Cargo.lock generated
View File

@ -354,6 +354,7 @@ dependencies = [
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clokwerk 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -431,6 +432,14 @@ dependencies = [
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "clokwerk"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "cloudabi"
version = "0.0.3"
@ -1542,6 +1551,7 @@ dependencies = [
"checksum cc 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)" = "f52a465a666ca3d838ebbf08b241383421412fe7ebb463527bba275526d89f76"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "31850b4a4d6bae316f7a09e691c944c28299298837edc0a03f755618c23cbc01"
"checksum clokwerk 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "705530849622d83d6028de6491520e8b00fccd26309e5df63295686d95465500"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum copyless 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "6ff9c56c9fb2a49c05ef0e431485a22400af20d33226dc0764d891d09e724127"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"

View File

@ -24,3 +24,4 @@ sled = "0.30.3"
envy = "0.4.0"
env_logger = "0.6"
chrono = { version = "0.4.10", features = ["serde"] }
clokwerk = "0.3.0"

View File

@ -16,6 +16,14 @@ struct MyData {
db: sled::Db
}
#[derive(Debug, Serialize, Deserialize)]
struct Scheduled {
event: String,
timestamp: i64,
data: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize)]
struct JSON {
event: String,
@ -112,6 +120,61 @@ async fn insert(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, j
Ok(HttpResponse::Ok().json(json.0.data))
}
async fn future(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, json: web::Json<Scheduled>) -> Result<HttpResponse, Error> {
// get new value from json
let new_value_string = serde_json::to_string(&json.0.data).unwrap();
let new_value = new_value_string.as_bytes();
// write new value to prefixed epoch version (e.g. user_f_1577831518)
let versioned = format!("{}_f_{}", json.0.event, json.timestamp);
let _ = data.db.compare_and_swap(versioned.clone(), None as Option<&[u8]>, Some(new_value.clone()));
let _ = web::block(move || data.db.flush()).await;
// write new event and data to sse
broadcast(json.0.event.clone(), serde_json::to_string(&json.0.data).unwrap(), broad.clone()).await;
// return data to json response as 200
Ok(HttpResponse::Ok().json(json.0.data))
}
async fn process(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>) -> Result<HttpResponse, Error> {
// get iter to loop through all keys in db
let data_cloned = data.db.clone();
let iter = data_cloned.iter();
// turn iVec(s) to String(s) and make HashMap
let vals : HashMap<String, Scheduled> = iter.into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if k.contains("_f_") {
return true
} else {
return false
}}).map(|x| {
let p = x.unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Scheduled = serde_json::from_str(&v).unwrap();
(k, json)
}).collect();
for (k, json) in vals {
let now = Utc::now().timestamp();
if json.timestamp <= now {
broadcast(json.event.clone(), serde_json::to_string(&json.data).unwrap(), broad.clone()).await;
let data_clone = data_cloned.clone();
let _ = data_clone.remove(&k);
let _ = web::block(move || data_clone.flush()).await;
}
}
// return data to json response as 200
Ok(HttpResponse::Ok().json(&"{}"))
}
pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Error> {
// set actix web env vars
std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
@ -125,6 +188,11 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
let tree = sled::open("./tmp/data").unwrap();
let data = Broadcaster::create();
// setup future key (event) in db
let tree_cloned = tree.clone();
let _ = tree_cloned.compare_and_swap(&"future", None as Option<&[u8]>, Some(b""));
let _ = web::block(move || tree_cloned.flush()).await;
// create actix web server with CORS, data, and routes - handle wildcard origins
if origin == "*" {
HttpServer::new(move || {
@ -144,6 +212,8 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
.route("/insert", web::post().to(insert))
.route("/events", web::get().to(new_client))
.route("/audit/{record}", web::get().to(audit))
.route("/future", web::post().to(future))
.route("/process", web::get().to(process))
})
.bind(ip).unwrap()
.run()
@ -166,6 +236,8 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
.route("/insert", web::post().to(insert))
.route("/events", web::get().to(new_client))
.route("/audit/{record}", web::get().to(audit))
.route("/future", web::post().to(future))
.route("/process", web::get().to(process))
})
.bind(ip).unwrap()
.run()

View File

@ -2,6 +2,6 @@ mod lib;
use lib::{broker_run};
#[actix_rt::main]
async fn main() -> std::result::Result<(), std::io::Error> {
broker_run("*".to_owned()).await
async fn main() -> () {
let _ = broker_run("*".to_owned()).await;
}