fix scheduler to send everything out

This commit is contained in:
Bevan Hunt 2020-01-28 16:08:50 -08:00
parent 279a027c4b
commit aedc76e946
4 changed files with 46 additions and 12 deletions

2
Cargo.lock generated
View File

@ -99,7 +99,7 @@ dependencies = [
[[package]]
name = "broker"
version = "4.0.0"
version = "4.1.0"
dependencies = [
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broker-ntp 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "4.0.0"
version = "4.1.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"

View File

@ -1,6 +1,6 @@
name: broker # you probably want to 'snapcraft register <name>'
base: core18 # the base snap is the execution environment for this snap
version: '4.0.0' # just for humans, typically '1.2+git' or '1.3.2'
version: '4.1.0' # just for humans, typically '1.2+git' or '1.3.2'
summary: Real-time Zero-Code API Server # 79 char long summary
description: |
The purpose of this library is to be your real-time zero-code API server.

View File

@ -500,21 +500,55 @@ pub async fn broker() {
let old_json_clone = old_json.clone();
let mut new_json = v.clone();
new_json.published = true;
let newer_json = new_json.clone();
let newest_json = new_json.clone();
let guid = Uuid::new_v4().to_string();
let mut events : Vec<Event> = Vec::new();
events.push(newer_json);
let events = json!({"events": events});
let sse = SSE{id: guid, event: new_json.event, data: events.to_string(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse).unwrap();
let tree_cloned = tree.clone();
let tree_clone = tree.clone();
let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newest_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
}).await;
let mut vals : Vec<Event> = tree_clone.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if k.contains("_v_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let evt : Event = serde_json::from_str(&v).unwrap();
if !evt.cancelled {
return true
} else {
return false
}
} else {
return false
}
}).map(|x| {
let p = x.as_ref().unwrap();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let evt : Event = serde_json::from_str(&v).unwrap();
evt
}).collect();
vals.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
for (evt, group) in &vals.into_iter().group_by(|evt| evt.event.clone()) {
let mut events : HashMap<String, Event> = HashMap::new();
for event in group {
if evt == event.event {
events.insert(event.clone().collection_id.to_string(), event.clone());
}
}
let mut evts : Vec<Event> = Vec::new();
for (_, v) in events {
evts.push(v);
}
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts});
let sse = SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
}
}
});