This commit is contained in:
Bevan Hunt 2020-01-23 04:20:26 -08:00
parent 12d8066f75
commit 0961b0058f
2 changed files with 71 additions and 62 deletions

6
Cargo.lock generated
View File

@ -117,7 +117,7 @@ dependencies = [
"go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"portal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"portal 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1043,7 +1043,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "portal"
version = "0.2.0"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2033,7 +2033,7 @@ dependencies = [
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
"checksum portal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b5fc229ded1b410f364f5a71da644f18ff55bf603d65b80071b605270230d7cf"
"checksum portal 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d41af5850b7c32eed34380bfc9d27b44ea90debdc428fb70153471e5d9aee31"
"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
"checksum pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "717ee476b1690853d222af4634056d830b5197ffd747726a9a1eee6da9f49074"
"checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5"

View File

@ -237,6 +237,7 @@ async fn main() {
let tree_clone = tree.clone();
let tree_clone1 = tree.clone();
let tree_clone2 = tree.clone();
let tree_clone3 = Arc::new(Mutex::new(tree.clone()));
let user_create_route = portal::post()
.and(portal::path("users"))
@ -287,12 +288,66 @@ async fn main() {
}
});
let (mut tx, rx) = channel(100);
let recv_arc = Arc::new(Mutex::new(rx));
let tree_clone = tree_clone.clone();
let (tx, rx) = channel(100);
let rx_arc = Arc::new(Mutex::new(rx));
let tx_arc = Arc::new(Mutex::new(tx));
let tx_arc2 = tx_arc.clone();
tokio::spawn(async move {
let _ = tokio::spawn(async move {
let mut tx_send = tx_arc.lock().unwrap().clone();
loop {
let vals : HashMap<String, Event> = tree_clone.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
if k.contains("_v_") {
let json : Event = serde_json::from_str(&v).unwrap();
let now = Utc::now().timestamp();
if json.timestamp <= now && !json.published && !json.cancelled {
return true
} else {
return false
}
} else {
return false
}
}).map(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let json_cloned = json.clone();
(k, json_cloned)
}).collect();
for (k, v) in vals {
let old_json = v.clone();
let old_json_clone = old_json.clone();
let mut new_json = v.clone();
new_json.published = true;
let newer_json = new_json.clone();
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
let _ = tx_send.send(sse).await;
let tree_cloned = tree_clone.clone();
let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
}).await;
}
}
});
let sse = portal::path("events").and(portal::get()).map(move || {
let tree_clone = tree_clone3.lock().unwrap().clone();
let mut tx_send = tx_arc2.lock().unwrap().clone();
let _ = tokio::spawn(async move {
let vals: HashMap<String, String> = tree_clone.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
@ -312,68 +367,22 @@ async fn main() {
for (k, v) in vals {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: k, data: v, retry: Duration::from_millis(5000)};
let _ = tx.send(sse).await;
let _ = tx_send.send(sse).await;
}
let _ = tokio::spawn(async move {
loop {
let vals : HashMap<String, Event> = tree_clone.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
if k.contains("_v_") {
let json : Event = serde_json::from_str(&v).unwrap();
let now = Utc::now().timestamp();
if json.timestamp <= now && !json.published && !json.cancelled {
return true
} else {
return false
}
} else {
return false
}
}).map(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let json_cloned = json.clone();
(k, json_cloned)
}).collect();
for (k, v) in vals {
let old_json = v.clone();
let old_json_clone = old_json.clone();
let mut new_json = v.clone();
new_json.published = true;
let newer_json = new_json.clone();
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
let _ = tx.send(sse).await;
let tree_cloned = tree_clone.clone();
let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
}).await;
}
}
}).await;
});
let sse = portal::path("events").and(portal::get()).map(move || {
let messages = recv_arc.lock().unwrap().clone();
let messages = rx_arc.lock().unwrap().clone();
let events = messages.map(|sse| {
event_stream(sse)
});
portal::sse::reply(portal::sse::keep_alive().interval(Duration::from_secs(5)).text("ping".to_string()).stream(events))
});
let routes = portal::any().and(login_route).or(user_create_route).or(insert_route).or(sse);
let cors = portal::cors()
.allow_any_origin()
.allow_methods(vec!["GET", "POST"]);
let routes = portal::any().and(login_route).or(user_create_route).or(insert_route).or(sse).with(cors);
portal::serve(routes).run(([0, 0, 0, 0], 8080)).await
}