fix sse broadcasting (#11)

This commit is contained in:
Bevan Hunt 2020-02-02 19:11:11 -08:00 committed by GitHub
parent 3def918c20
commit a5e0a241f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 22 deletions

71
Cargo.lock generated
View File

@ -27,6 +27,11 @@ name = "arc-swap"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "atomic-option"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "atty"
version = "0.2.14"
@ -108,11 +113,12 @@ dependencies = [
[[package]]
name = "broker"
version = "4.2.3"
version = "4.2.4"
dependencies = [
"Inflector 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broker-ntp 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bus 2.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -157,6 +163,17 @@ name = "bumpalo"
version = "3.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bus"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "byte-tools"
version = "0.3.1"
@ -260,6 +277,14 @@ dependencies = [
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.0"
@ -299,6 +324,15 @@ dependencies = [
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.0"
@ -779,6 +813,11 @@ name = "matches"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "memchr"
version = "2.3.0"
@ -1017,6 +1056,21 @@ dependencies = [
"parking_lot_core 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parking_lot_core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "parking_lot_core"
version = "0.7.0"
@ -1546,6 +1600,14 @@ dependencies = [
"parking_lot 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "smallvec"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "smallvec"
version = "1.1.0"
@ -2010,6 +2072,7 @@ dependencies = [
"checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d"
"checksum anyhow 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)" = "7825f6833612eb2414095684fcf6c635becf3ce97fe48cf6421321e93bfbd53c"
"checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff"
"checksum atomic-option 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593"
"checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
"checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
@ -2023,6 +2086,7 @@ dependencies = [
"checksum broker-ntp 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "037d0f8fa2a4e20313da3d34d3d40606e6366fdc1458f664fc03edc89aa34292"
"checksum buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
"checksum bumpalo 3.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb8038c1ddc0a5f73787b130f4cc75151e96ed33e417fde765eb5a81e3532f4"
"checksum bus 2.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d90bf1d967e005a3e722e964051e8d544e298942a4b4d4a3f87d9ebc997ace61"
"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
"checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5"
"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
@ -2037,10 +2101,12 @@ dependencies = [
"checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
"checksum crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa"
"checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c"
"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
"checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4"
"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
@ -2097,6 +2163,7 @@ dependencies = [
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223"
"checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9"
"checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0"
@ -2121,6 +2188,7 @@ dependencies = [
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
"checksum openssl-sys 0.9.53 (registry+https://github.com/rust-lang/crates.io-index)" = "465d16ae7fc0e313318f7de5cecf57b2fbe7511fd213978b457e1c96ff46736f"
"checksum parking_lot 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc"
"checksum parking_lot_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cb88cb1cb3790baa6776844f968fea3be44956cf184fa1be5a03341f5491278c"
"checksum parking_lot_core 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1"
"checksum pem 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a1581760c757a756a41f0ee3ff01256227bdf64cb752839779b95ffb01c59793"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
@ -2181,6 +2249,7 @@ dependencies = [
"checksum siphasher 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum sled 0.30.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb8c32cb0e34e67ad74fae1a77f4635d0cc7ffc873088a0136f3c4849336d71"
"checksum smallvec 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6"
"checksum smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44e59e0c9fa00817912ae6e4e6e3c4fe04455e75699d06eedc7d85917ed8e8f4"
"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
"checksum sourcefile 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf77cb82ba8453b42b6ae1d692e4cdc92f9a47beaf89a847c8be83f4e328ad3"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "4.2.3"
version = "4.2.4"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -27,6 +27,7 @@ go-flag = "0.1"
envy = "0.4"
lazy_static = "1.4"
crossbeam = "0.7"
bus = "2.2"
broker-ntp = "0.0.1"
Inflector = "0.11"
json-patch = "0.2"

View File

@ -1,6 +1,6 @@
name: broker # you probably want to 'snapcraft register <name>'
base: core18 # the base snap is the execution environment for this snap
version: '4.2.3' # just for humans, typically '1.2+git' or '1.3.2'
version: '4.2.4' # just for humans, typically '1.2+git' or '1.3.2'
summary: Real-time Zero-Code API Server # 79 char long summary
description: |
The purpose of this library is to be your real-time zero-code API server.

View File

@ -11,21 +11,13 @@ use jsonwebtoken::{encode, decode, Header, Validation, EncodingKey, DecodingKey}
use std::convert::Infallible;
use std::time::Duration;
use lazy_static::lazy_static;
use bus::Bus;
use crossbeam::channel::unbounded;
use inflector::Inflector;
use json_patch::merge;
use std::sync::{Arc, Mutex};
lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = {
let (tx, rx) = unbounded();
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "connected".to_owned(), retry: Duration::from_millis(5000)};
let _ = tx.send(sse);
let mut m = HashMap::new();
m.insert("chan".to_owned(), (tx, rx));
m
};
static ref TREE: HashMap<String, sled::Db> = {
let configure = config();
let tree = sled::open(configure.save_path).unwrap();
@ -379,10 +371,9 @@ fn insert(tree: sled::Db, user_id_str: String, evt: EventForm) -> String {
json!({"event": j}).to_string()
}
fn event_stream(allowed: bool) -> Result<impl ServerSentEvent, Infallible> {
fn event_stream(rx: crossbeam::channel::Receiver<SSE>, allowed: bool) -> Result<impl ServerSentEvent, Infallible> {
if allowed {
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let sse = match rx.try_recv() {
Ok(sse) => sse,
Err(_) => {
@ -465,6 +456,11 @@ pub async fn broker() {
}
});
// set up fan-out
let mix_tx = Bus::new(100);
let tx = Arc::new(Mutex::new(mix_tx));
let tx2 = tx.clone();
let _ = tokio::spawn(async move {
loop {
let tree = TREE.get(&"tree".to_owned()).unwrap();
@ -583,17 +579,26 @@ pub async fn broker() {
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts, "columns": colz, "rows": rows});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
if &evt == &new_json.event {
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
tx2.lock().unwrap().broadcast(sse);
}
}
}
}
});
let with_sender = warp::any().map(move || tx.clone());
let sse_route = warp::path("events")
.and(auth_check)
.and(warp::get()).map(move |jwt: JWT| {
.and(with_sender)
.and(warp::get()).map(move |jwt: JWT, tx_main: Arc<Mutex<bus::Bus<SSE>>>| {
let mut rx_main = tx_main.lock().unwrap().add_rx();
let (tx, rx) = unbounded();
let tree = TREE.get(&"tree".to_owned()).unwrap();
let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
@ -670,12 +675,22 @@ pub async fn broker() {
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts, "columns": colz, "rows": rows});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream(jwt.check)
let event_stream = interval(Duration::from_millis(1000)).map(move |_| {
let sse = match rx_main.try_recv() {
Ok(sse) => sse,
Err(_) => {
let guid = Uuid::new_v4().to_string();
let polling = json!({"status": "polling"});
SSE{id: guid, event: "internal_status".to_owned(), data: polling.to_string(), retry: Duration::from_millis(5000)}
}
};
let _ = tx.send(sse);
event_stream(rx.clone(), jwt.check)
});
warp::sse::reply(event_stream)