mirror of https://github.com/apibillme/broker
add audit endpoint and fix form submission
This commit is contained in:
parent
7f794a9de6
commit
1ae7a8664d
|
@ -407,6 +407,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
dependencies = [
|
||||
"num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
@ -1245,11 +1246,12 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "sse-server"
|
||||
version = "0.0.1"
|
||||
version = "0.0.2"
|
||||
dependencies = [
|
||||
"actix-cors 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "sse-server"
|
||||
version = "0.0.1"
|
||||
version = "0.0.2"
|
||||
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
|
@ -23,3 +23,4 @@ json = "0.12.0"
|
|||
sled = "0.30.3"
|
||||
envy = "0.4.0"
|
||||
env_logger = "0.6"
|
||||
chrono = { version = "0.4.10", features = ["serde"] }
|
||||
|
|
4
Makefile
4
Makefile
|
@ -1,4 +1,6 @@
|
|||
build:
|
||||
PORT=8080 cargo run
|
||||
rusty:
|
||||
curl -S -v --header "Content-Type: application/json" POST --data '{"event":"user", "data":{"user":"Rusty"}}' http://localhost:8080/insert
|
||||
curl -S -v --header "Content-Type: application/json" POST --data '{"event":"user", "data":{"user":"Rusty"}}' http://localhost:8080/insert
|
||||
client:
|
||||
cd example && npm i && npm start
|
||||
|
|
|
@ -19,7 +19,7 @@ async fn main() -> std::result::Result<(), std::io::Error> {
|
|||
|
||||
## Example
|
||||
|
||||
- run ``` make ``` and ``` cd example && npm i && npm start ``` to run example
|
||||
- run ``` make ``` and ``` make client ``` in two different terminal windows to run example
|
||||
|
||||
## Example
|
||||
|
||||
|
|
|
@ -71,12 +71,12 @@ function App() {
|
|||
<Comments />
|
||||
</SSEProvider>
|
||||
</Typography>
|
||||
<form onChange={handleSubmit(onSubmit)}>
|
||||
<form>
|
||||
<label htmlFor="user">Name: </label>
|
||||
<DebounceInput
|
||||
name="user"
|
||||
minLength={2}
|
||||
debounceTimeout={300}
|
||||
debounceTimeout={500}
|
||||
onChange={handleSubmit(onSubmit)}
|
||||
inputRef={register({})}
|
||||
/>
|
||||
|
|
55
src/lib.rs
55
src/lib.rs
|
@ -5,6 +5,7 @@ use std::sync::Mutex;
|
|||
use sled;
|
||||
use actix_cors::Cors;
|
||||
use std::collections::HashMap;
|
||||
use chrono::prelude::*;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Config {
|
||||
|
@ -21,18 +22,59 @@ struct JSON {
|
|||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Path {
|
||||
record: String
|
||||
}
|
||||
|
||||
async fn audit(data: web::Data<MyData>, path: web::Path<Path>) -> Result<HttpResponse, Error> {
|
||||
|
||||
// get iter to loop through all keys in db
|
||||
let iter = data.db.iter();
|
||||
|
||||
// turn iVec(s) to String(s) and make HashMap
|
||||
let records: HashMap<String, serde_json::value::Value> = iter.into_iter().filter(|x| {
|
||||
let p = x.as_ref().unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
if k.contains(&path.record) && k.contains("_v_") {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}).map(|x| {
|
||||
let p = x.unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
let j : serde_json::Value = serde_json::from_str(&v).unwrap_or_default();
|
||||
(k, j)
|
||||
}).collect();
|
||||
|
||||
// return data to json response as 200
|
||||
Ok(HttpResponse::Ok().json(records))
|
||||
}
|
||||
|
||||
async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>) -> impl Responder {
|
||||
|
||||
// get iter to loop through all keys in db
|
||||
let iter = data.db.iter();
|
||||
|
||||
// turn iVec(s) to String(s) and make HashMap to pass to sse
|
||||
let vals: HashMap<String, String> = iter.into_iter().map(|x| {
|
||||
// turn iVec(s) to String(s) and make HashMap
|
||||
let vals: HashMap<String, String> = iter.into_iter().filter(|x| {
|
||||
let p = x.as_ref().unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
if k.contains("_v_") {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}).map(|x| {
|
||||
let p = x.unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
(k, v)
|
||||
}).collect();
|
||||
|
||||
// create new client for sse with hashmap of initial values
|
||||
let rx = broad.lock().unwrap().new_client(vals);
|
||||
|
||||
// create sse endpoint
|
||||
|
@ -44,7 +86,7 @@ async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>
|
|||
|
||||
async fn insert(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, json: web::Json<JSON>) -> Result<HttpResponse, Error> {
|
||||
|
||||
// write blank value if no value for key exists so that get later doesn't crash
|
||||
// write blank value if no value for key exists (to make sure that get works later)
|
||||
let data_cloned = data.clone();
|
||||
let _ = data_cloned.db.compare_and_swap(json.0.event.clone(), None as Option<&[u8]>, Some(b""));
|
||||
let _ = web::block(move || data_cloned.db.flush()).await;
|
||||
|
@ -56,7 +98,10 @@ async fn insert(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, j
|
|||
let old_value_string = std::str::from_utf8(&old_value_buffer).unwrap();
|
||||
let old_value = old_value_string.clone().as_bytes();
|
||||
|
||||
// write old and new values to db
|
||||
// write new value to db and write old value to prefixed epoch version (e.g. user_v_1577831518)
|
||||
let now = Utc::now().timestamp();
|
||||
let versioned = format!("{}_v_{}", json.0.event, now);
|
||||
let _ = data.db.compare_and_swap(versioned.clone(), None as Option<&[u8]>, Some(new_value.clone()));
|
||||
let _ = data.db.compare_and_swap(json.0.event.clone(), Some(old_value.clone()), Some(new_value.clone()));
|
||||
let _ = web::block(move || data.db.flush()).await;
|
||||
|
||||
|
@ -98,6 +143,7 @@ pub async fn sse_start(origin: String) -> std::result::Result<(), std::io::Error
|
|||
.data(MyData{ db: tree.clone() })
|
||||
.route("/insert", web::post().to(insert))
|
||||
.route("/events", web::get().to(new_client))
|
||||
.route("/audit/{record}", web::get().to(audit))
|
||||
})
|
||||
.bind(ip).unwrap()
|
||||
.run()
|
||||
|
@ -119,6 +165,7 @@ pub async fn sse_start(origin: String) -> std::result::Result<(), std::io::Error
|
|||
.data(MyData{ db: tree.clone() })
|
||||
.route("/insert", web::post().to(insert))
|
||||
.route("/events", web::get().to(new_client))
|
||||
.route("/audit/{record}", web::get().to(audit))
|
||||
})
|
||||
.bind(ip).unwrap()
|
||||
.run()
|
||||
|
|
Loading…
Reference in New Issue