fix dup sse event issue

This commit is contained in:
Bevan Hunt 2020-01-29 08:42:11 -08:00
parent 2687306431
commit 170d3fdfc0
4 changed files with 21 additions and 29 deletions

18
Cargo.lock generated
View File

@ -99,7 +99,7 @@ dependencies = [
[[package]]
name = "broker"
version = "4.1.0"
version = "4.1.1"
dependencies = [
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broker-ntp 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -107,7 +107,6 @@ dependencies = [
"envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonwebtoken 7.0.0-beta.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -317,11 +316,6 @@ name = "dtoa"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "either"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "encoding_rs"
version = "0.8.22"
@ -685,14 +679,6 @@ dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itertools"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "itoa"
version = "0.4.5"
@ -2029,7 +2015,6 @@ dependencies = [
"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e"
"checksum either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
"checksum encoding_rs 0.8.22 (registry+https://github.com/rust-lang/crates.io-index)" = "cd8d03faa7fe0c1431609dfad7bbe827af30f82e1e2ae6f7ee4fca6bd764bc28"
"checksum env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3"
"checksum envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f938a4abd5b75fe3737902dbc2e79ca142cc1526827a9e40b829a086758531a9"
@ -2070,7 +2055,6 @@ dependencies = [
"checksum indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc"
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
"checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9"
"checksum json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9a38661a28126f8621fb246611288ae28935ddf180f5e21f2d0fbfe5e4131dbe"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "4.1.0"
version = "4.1.1"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -28,7 +28,6 @@ envy = "0.4"
lazy_static = "1.4"
crossbeam = "0.7"
broker-ntp = "0.0.1"
itertools = "0.8"
[dev-dependencies]
reqwest = { version = "0.10", features = ["json"] }

View File

@ -1,6 +1,6 @@
name: broker # you probably want to 'snapcraft register <name>'
base: core18 # the base snap is the execution environment for this snap
version: '4.1.0' # just for humans, typically '1.2+git' or '1.3.2'
version: '4.1.1' # just for humans, typically '1.2+git' or '1.3.2'
summary: Real-time Zero-Code API Server # 79 char long summary
description: |
The purpose of this library is to be your real-time zero-code API server.

View File

@ -1,7 +1,7 @@
use tokio::stream::StreamExt;
use tokio::time::interval;
use std::iter::Iterator;
use std::collections::HashMap;
use std::collections::{HashSet, HashMap};
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
@ -12,7 +12,6 @@ use std::convert::Infallible;
use std::time::Duration;
use lazy_static::lazy_static;
use crossbeam::channel::unbounded;
use itertools::Itertools;
lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = {
@ -532,9 +531,14 @@ pub async fn broker() {
vals.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
for (evt, group) in &vals.into_iter().group_by(|evt| evt.event.clone()) {
let mut uniques : HashSet<String> = HashSet::new();
for evt in &vals {
uniques.insert(evt.clone().event);
}
for evt in uniques {
let mut events : HashMap<String, Event> = HashMap::new();
for event in group {
for event in &vals {
if evt == event.event {
events.insert(event.clone().collection_id.to_string(), event.clone());
}
@ -543,6 +547,7 @@ pub async fn broker() {
for (_, v) in events {
evts.push(v);
}
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
@ -554,8 +559,7 @@ pub async fn broker() {
});
let sse_route = warp::path("events")
.and(auth_check)
.and(warp::get()).map(move |jwt: JWT| {
.and(warp::get()).map(move || {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
@ -579,10 +583,14 @@ pub async fn broker() {
}).collect();
vals.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
let mut uniques : HashSet<String> = HashSet::new();
for evt in &vals {
uniques.insert(evt.clone().event);
}
for (evt, group) in &vals.into_iter().group_by(|evt| evt.event.clone()) {
for evt in uniques {
let mut events : HashMap<String, Event> = HashMap::new();
for event in group {
for event in &vals {
if evt == event.event {
events.insert(event.clone().collection_id.to_string(), event.clone());
}
@ -591,6 +599,7 @@ pub async fn broker() {
for (_, v) in events {
evts.push(v);
}
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
@ -599,7 +608,7 @@ pub async fn broker() {
}
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream(jwt.check)
event_stream(true)
});
warp::sse::reply(event_stream)