This commit is contained in:
Bevan Hunt 2020-01-06 21:48:29 -08:00
parent 166fb58786
commit dfdee36486
4 changed files with 77 additions and 89 deletions

12
Cargo.lock generated
View File

@ -386,7 +386,7 @@ dependencies = [
[[package]]
name = "broker"
version = "0.5.1"
version = "0.6.0"
dependencies = [
"actix-cors 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -396,6 +396,7 @@ dependencies = [
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"json 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
@ -845,6 +846,14 @@ dependencies = [
"winreg 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itertools"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itoa"
version = "0.4.4"
@ -1693,6 +1702,7 @@ dependencies = [
"checksum indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712d7b3ea5827fcb9d4fda14bf4da5f136f0db2ae9c8f4bd4e2d1c6fde4e6db2"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum ipconfig 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "aa79fa216fbe60834a9c0737d7fcd30425b32d1c58854663e24d4c4b328ed83f"
"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
"checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"
"checksum json 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3ca41abbeb7615d56322a984e63be5e5d0a117dfaca86c14393e32a762ccac1"
"checksum jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a81d1812d731546d2614737bee92aa071d37e9afa1409bc374da9e5e70e70b22"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "0.5.1"
version = "0.6.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -27,3 +27,4 @@ chrono = { version = "0.4.10", features = ["serde"] }
uuid = { version = "0.8.1", features = ["serde", "v4"] }
bcrypt = "0.6.1"
jsonwebtoken = "6.0.1"
itertools = "0.8.2"

View File

@ -68,9 +68,9 @@ will return
- authenticated endpoint
- POST JSON to insert an event
```json
{"event":{...}, "timestamp":{...}, "data":{...}}
{"event":{...}, "id":{...}, "timestamp":{...}, "data":{...}}
```
- where {...} is for the event a string, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
- where {...} is for the event a string, id is a uuid v4 you provide, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
will return
```json
@ -78,12 +78,6 @@ will return
```
- where {...} is the uuid (string) of the event
```html
/events/{event}
```
- authenticated endpoint
- do a GET request where {event} is the name of the event you want the events queue (sorted by ascending timestamp)
```html
/events/{id}/cancel
```
@ -100,8 +94,7 @@ will return
* Handles future events via Epoch UNIX timestamp
* Stateful immutable event persistence
* Insert event via JSON POST request
* Sync latest events on SSE client connection
* Event log via GET request
* Sync latest events with audit trail on SSE client connection
* Event cancellation via GET request
### Use

View File

@ -10,6 +10,8 @@ use uuid::Uuid;
use serde_json::json;
use bcrypt::{DEFAULT_COST, hash, verify};
use jsonwebtoken::{encode, decode, Header, Validation};
use itertools::Itertools;
use std::iter::FromIterator;
#[derive(Deserialize, Debug)]
pub struct Config {
@ -23,6 +25,31 @@ struct MyData {
db: sled::Db
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Events (Vec<HashMap<String, HashMap<uuid::Uuid, Vec<Event>>>>);
impl Events {
fn new() -> Events {
Events(Vec::new())
}
fn add(&mut self, elem: HashMap<String, HashMap<uuid::Uuid, Vec<Event>>>) {
self.0.push(elem);
}
}
impl std::iter::FromIterator<HashMap<String, HashMap<uuid::Uuid, Vec<Event>>>> for Events {
fn from_iter<I: IntoIterator<Item=HashMap<String, HashMap<uuid::Uuid, Vec<Event>>>>>(iter: I) -> Self {
let mut c = Events::new();
for i in iter {
c.add(i);
}
c
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Token {
jwt: String
@ -53,6 +80,7 @@ struct UserForm {
struct Event {
id: uuid::Uuid,
user_id: uuid::Uuid,
evt_id: uuid::Uuid,
event: String,
timestamp: i64,
published: bool,
@ -63,19 +91,14 @@ struct Event {
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventForm {
id: uuid::Uuid,
event: String,
timestamp: i64,
data: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize)]
struct EventPath {
event: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct EventSubPath {
event: String,
struct Path {
id: String,
}
@ -87,63 +110,6 @@ struct Claims {
exp: usize,
}
async fn collection(data: web::Data<MyData>, path: web::Path<EventPath>, req: HttpRequest) -> Result<HttpResponse, Error> {
// get origin env var
let config = envy::from_env::<Config>().unwrap();
let secret = config.secret;
// verify jwt
let headers = req.headers();
let mut check : i32 = 0;
for (k, v) in headers {
if k == "Authorization" {
let token = v.to_str().unwrap().to_owned();
let parts = token.split(" ");
for part in parts {
if part != "Bearer" {
let _ = match decode::<Claims>(&part, secret.as_ref(), &Validation::default()) {
Ok(c) => {
check = check + 1;
c
},
Err(err) => match *err.kind() {
_ => return Ok(HttpResponse::Unauthorized().json(""))
},
};
}
}
}
}
// if no auth header
if check == 0 {
return Ok(HttpResponse::Unauthorized().json(""))
}
// turn iVec(s) to String(s) and make HashMap
let mut records: Vec<Event> = data.db.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let versioned = format!("{}_v_", path.event);
if k.contains(&versioned) {
return true;
} else {
return false;
}
}).map(|x| {
let p = x.unwrap();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let j : Event = serde_json::from_str(&v).unwrap();
j
}).collect();
records.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
// return data to json response as 200
Ok(HttpResponse::Ok().json(records))
}
async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>) -> impl Responder {
// get origin env var
@ -204,10 +170,11 @@ async fn insert(data: web::Data<MyData>, json: web::Json<EventForm>, req: HttpRe
let user_id = uuid::Uuid::parse_str(&user_id_str).unwrap();
let id = Uuid::new_v4();
let j = Event{id: id, published: false, cancelled: false, data: json_clone.data, event: json_clone.event, timestamp: json.timestamp, user_id: user_id};
let j = Event{id: id, published: false, cancelled: false, data: json_clone.data, event: json_clone.event, timestamp: json.timestamp, user_id: user_id, evt_id: json.id};
let new_value_string = serde_json::to_string(&j).unwrap();
let new_value = new_value_string.as_bytes();
let versioned = format!("{}_v_{}", json.event, id.to_string());
let versioned = format!("_v_{}", id.to_string());
let _ = data.db.compare_and_swap(versioned, None as Option<&[u8]>, Some(new_value.clone()));
let _ = web::block(move || data.db.flush()).await;
@ -227,7 +194,7 @@ async fn insert(data: web::Data<MyData>, json: web::Json<EventForm>, req: HttpRe
Ok(HttpResponse::Unauthorized().json(""))
}
async fn cancel(data: web::Data<MyData>, path: web::Path<EventSubPath>, req: HttpRequest) -> Result<HttpResponse, Error> {
async fn cancel(data: web::Data<MyData>, path: web::Path<Path>, req: HttpRequest) -> Result<HttpResponse, Error> {
// get origin env var
let config = envy::from_env::<Config>().unwrap();
@ -262,8 +229,7 @@ async fn cancel(data: web::Data<MyData>, path: web::Path<EventSubPath>, req: Htt
}
let id = &path.id;
let event = &path.event;
let versioned = format!("{}_v_{}", event, id);
let versioned = format!("_v_{}", id);
let g = data.db.get(&versioned.as_bytes()).unwrap().unwrap();
let v = std::str::from_utf8(&g).unwrap().to_owned();
@ -402,20 +368,39 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
(k, json_cloned)
}).collect();
let vals_cloned = vals.clone();
for (k, v) in vals {
let old_json = v.clone();
let old_json_clone = old_json.clone();
let mut new_json = v.clone();
new_json.published = true;
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&new_json).unwrap().as_bytes()));
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&new_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
broadcast(new_json.event, serde_json::to_string(&new_json.data).unwrap(), events_cloned.clone());
}
}
let eventz : Vec<Event> = vals_cloned.iter().map(|(_x, y)| {
y.clone()
}).collect();
let groups : Events = eventz.into_iter()
.group_by(|evt| evt.event)
.into_iter()
.map(|(event, group)| {
// let evts : Vec<Event> = group.into_iter().map(|x| { x.clone() }).collect();
let groups = group.into_iter().group_by(|evt| evt.evt_id);
let mut x = HashMap::new();
x.insert(event, groups);
x
})
.collect();
}
// for (k, v) in grouped {
// broadcast(k, serde_json::to_string(&v).unwrap(), events_cloned.clone());
// }
});
x.thread();
@ -435,8 +420,7 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
.data(MyData{ db: tree_actix.clone() })
.route("/insert", web::post().to(insert))
.route("/events", web::get().to(new_client))
.route("/events/{event}", web::get().to(collection))
.route("/events/{event}/{id}/cancel", web::get().to(cancel))
.route("/events/{id}/cancel", web::get().to(cancel))
.route("/users", web::post().to(user_create))
.route("/login", web::post().to(login))
})