This commit is contained in:
Bevan Hunt 2020-02-02 13:06:46 -08:00
parent 638f31bf4d
commit 7255e74c50
1 changed files with 12 additions and 17 deletions

View File

@ -16,16 +16,6 @@ use inflector::Inflector;
use json_patch::merge; use json_patch::merge;
lazy_static! { lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = {
let (tx, rx) = unbounded();
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "connected".to_owned(), retry: Duration::from_millis(5000)};
let _ = tx.send(sse);
let mut m = HashMap::new();
m.insert("chan".to_owned(), (tx, rx));
m
};
static ref TREE: HashMap<String, sled::Db> = { static ref TREE: HashMap<String, sled::Db> = {
let configure = config(); let configure = config();
let tree = sled::open(configure.save_path).unwrap(); let tree = sled::open(configure.save_path).unwrap();
@ -379,10 +369,9 @@ fn insert(tree: sled::Db, user_id_str: String, evt: EventForm) -> String {
json!({"event": j}).to_string() json!({"event": j}).to_string()
} }
fn event_stream(allowed: bool) -> Result<impl ServerSentEvent, Infallible> { fn event_stream(rx: crossbeam::channel::Receiver<SSE>, allowed: bool) -> Result<impl ServerSentEvent, Infallible> {
if allowed { if allowed {
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let sse = match rx.try_recv() { let sse = match rx.try_recv() {
Ok(sse) => sse, Ok(sse) => sse,
Err(_) => { Err(_) => {
@ -465,6 +454,11 @@ pub async fn broker() {
} }
}); });
// setup channels
let (tx, rx) = unbounded();
let tx2 = tx.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
loop { loop {
let tree = TREE.get(&"tree".to_owned()).unwrap(); let tree = TREE.get(&"tree".to_owned()).unwrap();
@ -584,15 +578,17 @@ pub async fn broker() {
let guid = Uuid::new_v4().to_string(); let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts, "columns": colz, "rows": rows}); let events_json = json!({"events": evts, "columns": colz, "rows": rows});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)}; let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap(); let _ = tx2.send(sse);
let _ = tx.send(sse);
} }
} }
} }
}); });
let with_channel = warp::any().map(move || (tx.clone(), rx.clone()));
let sse_route = warp::path("events") let sse_route = warp::path("events")
.and(warp::get()).map(move || { .and(with_channel)
.and(warp::get()).map(move |(tx, rx):(crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)| {
let tree = TREE.get(&"tree".to_owned()).unwrap(); let tree = TREE.get(&"tree".to_owned()).unwrap();
let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| { let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap(); let p = x.as_ref().unwrap();
@ -669,12 +665,11 @@ pub async fn broker() {
let guid = Uuid::new_v4().to_string(); let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts, "columns": colz, "rows": rows}); let events_json = json!({"events": evts, "columns": colz, "rows": rows});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)}; let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse); let _ = tx.send(sse);
} }
let event_stream = interval(Duration::from_millis(100)).map(move |_| { let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream(true) event_stream(rx.clone(), true)
}); });
warp::sse::reply(event_stream) warp::sse::reply(event_stream)