WIP tweaking

This commit is contained in:
R Tyler Croy 2020-01-26 15:33:15 -08:00
parent a6d008841e
commit 2dac86d95f
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
1 changed files with 33 additions and 81 deletions

View File

@ -28,7 +28,7 @@ use warp::ws::{Message, WebSocket};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
@ -120,51 +120,27 @@ fn load_templates(hb: &mut Handlebars) {
}
}
//let mut bus_rx = self.bus.receiver_for(named).unwrap();
//loop {
// match bus_rx.recv().await {
// Ok(ev) => {
// info!("Need to dispatch: {:?}", ev);
// let meta = msg::Meta::new("all".to_string());
struct Connection {
tx: SplitSink<WebSocket, Message>,
bus: Arc<Bus>,
channels: HashMap<String, Receiver<SendableEvent>>,
}
// let em = msg::OutputMessage {
// meta,
// msg: ev.m.clone(),
// };
// info!("dispatching output message: {:?}", em);
impl Connection {
fn new(tx: SplitSink<WebSocket, Message>, bus: Arc<Bus>) -> Connection {
Connection {
tx,
bus,
channels: HashMap::new(),
}
}
fn subscribe(&self, named: &str) {
let mut bus_rx = self.bus.receiver_for(named).unwrap();
tokio::task::spawn(async move {
loop {
match bus_rx.recv().await {
Ok(ev) => {
info!("Need to dispatch: {:?}", ev);
let meta = msg::Meta::new("all".to_string());
let em = msg::OutputMessage {
meta,
msg: ev.m.clone(),
};
info!("dispatching output message: {:?}", em);
self.tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
},
Err(err) => {
error!("Failed to listen to channel: {:?}", err);
},
}
}
});
}
fn dispatch(&self, _ev: Arc<msg::Output>) {
}
}
// // TODO
// //self.tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
// },
// Err(err) => {
// error!("Failed to listen to channel: {:?}", err);
// },
// }
//}
#[tokio::main]
@ -183,15 +159,15 @@ async fn main() {
bus.stateful(settings
.get::<Vec<String>>("channels.stateful")
.expect("Failed to load `channels.stateful`"));
let event_bus = Arc::new(bus);
let hb_bus = event_bus.clone();
// Create a filter for hte eventbus
let event_bus = warp::any().map(move || event_bus.clone());
let seconds = settings
.get("heartbeat")
.expect("Invalid `heartbeat` configuration, must be an integer");
let b = Arc::new(bus);
let b1 = b.clone();
let b2 = b.clone();
thread::spawn(move || loop {
let ts = Local::now();
let pulse = format!("heartbeat {}", ts);
@ -200,53 +176,26 @@ async fn main() {
let e = Event {
m: Arc::new(hb),
};
b1.send(&"all".to_string(), Arc::new(e));
hb_bus.send(&"all".to_string(), Arc::new(e));
thread::sleep(Duration::from_secs(seconds));
});
let index = warp::path::end().and(with_render(hb)).and_then(index);
let ws = warp::path("ws")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
/*
* Cloning again just to propagate the pointer, bleh
*/
let b3 = b2.clone();
.and(event_bus.clone())
.map(move |ws: warp::ws::Ws, bus: Arc<Bus>| {
// And then our closure will be called when it completes...
ws.on_upgrade(move |websocket| {
ws.on_upgrade(move |websocket| {
info!("Connection established for {:?}", websocket);
// Just echo all messages back...
let (tx, rx) = websocket.split();
let con = Connection::new(tx, b3);
con.subscribe(CHANNEL_ALL);
tokio::task::spawn(rx.for_each(|item| {
info!("Item received: {:?}", item);
future::ready(())
}));
future::ready(())
/*
* Task for receiving events from the bus that we are interested
* in, and then forwarding them along to the connected sink
*/
//tokio::task::spawn(async move {
// loop {
// if let Ok(bus_event) = erx.recv().await {
// let meta = msg::Meta::new("all".to_string());
// let em = msg::OutputMessage {
// meta,
// msg: bus_event.m.clone(),
// };
// info!("dispatching output message: {:?}", em);
// tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
// }
// else {
// error!("Failed to access a bus event in the websocket client loop");
// }
// }
//});
})
});
@ -254,6 +203,9 @@ async fn main() {
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}
struct Clients {
}
#[cfg(test)]
mod tests {
use super::*;