Return all event collections with latest event on SSE connection (#10)

This commit is contained in:
Bevan Hunt 2020-01-28 14:37:02 -08:00 committed by GitHub
parent 2a3a681633
commit 124f259e2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 27 deletions

18
Cargo.lock generated
View File

@ -99,7 +99,7 @@ dependencies = [
[[package]]
name = "broker"
version = "3.1.2"
version = "4.0.0"
dependencies = [
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broker-ntp 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -107,6 +107,7 @@ dependencies = [
"envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonwebtoken 7.0.0-beta.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -316,6 +317,11 @@ name = "dtoa"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "either"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "encoding_rs"
version = "0.8.22"
@ -679,6 +685,14 @@ dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itertools"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itoa"
version = "0.4.5"
@ -2015,6 +2029,7 @@ dependencies = [
"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e"
"checksum either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
"checksum encoding_rs 0.8.22 (registry+https://github.com/rust-lang/crates.io-index)" = "cd8d03faa7fe0c1431609dfad7bbe827af30f82e1e2ae6f7ee4fca6bd764bc28"
"checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3"
"checksum envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f938a4abd5b75fe3737902dbc2e79ca142cc1526827a9e40b829a086758531a9"
@ -2055,6 +2070,7 @@ dependencies = [
"checksum indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc"
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
"checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9"
"checksum json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9a38661a28126f8621fb246611288ae28935ddf180f5e21f2d0fbfe5e4131dbe"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "3.1.2"
version = "4.0.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -28,6 +28,7 @@ envy = "0.4"
lazy_static = "1.4"
crossbeam = "0.7"
broker-ntp = "0.0.1"
itertools = "0.8"
[dev-dependencies]
reqwest = { version = "0.10", features = ["json"] }

View File

@ -210,5 +210,6 @@ pub async fn main() {
### Migrations
- from 2.0 to 3.0: the sse endpoint is now secure and requires to use the [broker-client](https://www.npmjs.com/package/broker-client) library
- from 3.0 to 4.0: the sse endpoint now returns all events with all collections with the latest collection event rather than just the latest event data for all event types
- from 2.0 to 3.0: the sse endpoint is now secure and requires the use of the [broker-client](https://www.npmjs.com/package/broker-client) library
- from 1.0 to 2.0: the optional API endpoints URLs have been changed but have the same functionality

View File

@ -1,6 +1,6 @@
name: broker # you probably want to 'snapcraft register <name>'
base: core18 # the base snap is the execution environment for this snap
version: '3.1.2' # just for humans, typically '1.2+git' or '1.3.2'
version: '4.0.0' # just for humans, typically '1.2+git' or '1.3.2'
summary: Real-time Zero-Code API Server # 79 char long summary
description: |
The purpose of this library is to be your real-time zero-code API server.

View File

@ -12,6 +12,7 @@ use std::convert::Infallible;
use std::time::Duration;
use lazy_static::lazy_static;
use crossbeam::channel::unbounded;
use itertools::Itertools;
lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = {
@ -468,12 +469,12 @@ pub async fn broker() {
let vals : HashMap<String, Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
if k.contains("_v_") {
let json : Event = serde_json::from_str(&v).unwrap();
if !json.published && !json.cancelled {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let evt : Event = serde_json::from_str(&v).unwrap();
if !evt.published && !evt.cancelled {
let now = get_cloudflare_time();
if json.timestamp <= now {
if evt.timestamp <= now {
return true
} else {
return false
@ -488,9 +489,9 @@ pub async fn broker() {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let json_cloned = json.clone();
(k, json_cloned)
let evt : Event = serde_json::from_str(&v).unwrap();
let evt_cloned = evt.clone();
(k, evt_cloned)
}).collect();
for (k, v) in vals {
@ -506,10 +507,6 @@ pub async fn broker() {
let _ = tx.send(sse).unwrap();
let tree_cloned = tree.clone();
let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
}).await;
@ -518,34 +515,52 @@ pub async fn broker() {
});
let sse_route = warp::path("events")
.and(auth_check)
.and(warp::get()).map(move |jwt: JWT| {
// .and(auth_check)
.and(warp::get()).map(move || {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let vals: HashMap<String, String> = tree.iter().into_iter().filter(|x| {
let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if !k.contains("_v_") && !k.contains("_u_") {
return true
if k.contains("_v_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let evt : Event = serde_json::from_str(&v).unwrap();
if !evt.cancelled {
return true
} else {
return false
}
} else {
return false
return false
}
}).map(|x| {
let p = x.as_ref().unwrap();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let data : String = serde_json::to_string(&json.data).unwrap();
(json.event, data)
let evt : Event = serde_json::from_str(&v).unwrap();
evt
}).collect();
for (k, v) in vals {
vals.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
for (evt, group) in &vals.into_iter().group_by(|evt| evt.event.clone()) {
let mut events : HashMap<String, Event> = HashMap::new();
for event in group {
if evt == event.event {
events.insert(event.clone().collection_id.to_string(), event.clone());
}
}
let mut evts : Vec<Event> = Vec::new();
for (_, v) in events {
evts.push(v);
}
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: k, data: v, retry: Duration::from_millis(5000)};
let events_json = json!({"events": evts});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream(jwt.check)
event_stream(true)
});
warp::sse::reply(event_stream)