This commit is contained in:
Bevan Hunt 2020-01-23 09:50:11 -08:00
parent 2de574bb9f
commit 4bc57312a2
3 changed files with 23 additions and 11 deletions

1
Cargo.lock generated
View File

@ -117,6 +117,7 @@ dependencies = [
"go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonwebtoken 6.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"portal 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -28,3 +28,4 @@ reqwest = { version = "0.10", features = ["json", "blocking"] }
go-flag = "0.1"
envy = "0.4"
crossbeam = "0.7.3"
lazy_static = "1.4"

View File

@ -9,8 +9,20 @@ use jsonwebtoken::{encode, decode, Header, Validation};
use crossbeam::channel::unbounded;
use std::convert::Infallible;
use std::time::Duration;
use futures::stream::{Stream};
use std::sync::{Mutex, Arc, RwLock};
use std::sync::{Mutex, Arc};
use lazy_static::lazy_static;
lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = {
let (tx, rx) = unbounded();
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "connected".to_owned(), retry: Duration::from_millis(5000)};
let _ = tx.send(sse);
let mut m = HashMap::new();
m.insert("chan".to_owned(), (tx, rx));
m
};
}
#[derive(Debug, Clone)]
pub struct SSE {
@ -234,7 +246,6 @@ async fn main() {
let configure = config();
let tree = sled::open(configure.save_path).unwrap();
let tree_clone = tree.clone();
let tree_clone1 = tree.clone();
let tree_clone2 = tree.clone();
let tree_clone3 = Arc::new(Mutex::new(tree.clone()));
@ -288,8 +299,6 @@ async fn main() {
}
});
let (tx, rx) = unbounded();
let rxx = Arc::new(RwLock::new(rx));
let tree_clone = tree_clone3.lock().unwrap().clone();
let _ = tokio::spawn(async move {
@ -327,6 +336,7 @@ async fn main() {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse).unwrap();
let tree_cloned = tree_clone.clone();
let _ = tokio::spawn(async move {
@ -345,8 +355,6 @@ async fn main() {
let tree_clone = tree_clone3.lock().unwrap().clone();
let tree_clone2 = tree_clone.clone();
let rx = rx.clone();
let tx = tx.clone();
let vals: HashMap<String, String> = tree_clone2.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
@ -367,16 +375,18 @@ async fn main() {
for (k, v) in vals {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: k, data: v, retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
let foo = rx.iter().map(|sse| {
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let messages = rx.iter().map(|sse| {
event_stream(sse)
}
});
let events = futures::stream::iter(messages);
portal::sse::reply(portal::sse::keep_alive().interval(Duration::from_secs(5)).text("ping".to_string()).stream()))
portal::sse::reply(portal::sse::keep_alive().interval(Duration::from_secs(5)).text("ping".to_string()).stream(events))
});
let cors = portal::cors()