Experimenting with a Connection struct

This commit is contained in:
R Tyler Croy 2020-01-20 20:24:04 -08:00
parent 34b2a970fa
commit a6d008841e
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
3 changed files with 83 additions and 43 deletions

View File

@ -17,6 +17,7 @@ use std::sync::Arc;
* The maximum number of items in transit for each channel
*/
const MAX_CHANNEL_QUEUE: usize = 16;
pub static CHANNEL_ALL: &str = "all";
#[derive(Debug, PartialEq)]
pub struct Event {
@ -30,7 +31,7 @@ impl Default for Event {
}
}
type SendableEvent = Arc<Event>;
pub type SendableEvent = Arc<Event>;
/**
* A channel is named and typed with the type of messages it should be carrying
@ -139,7 +140,7 @@ impl Bus {
/**
* Create a new receiver for the named channel
*/
pub fn receiver_for(&self, channel: &String) -> Result<Receiver<SendableEvent>, &str> {
pub fn receiver_for(&self, channel: &str) -> Result<Receiver<SendableEvent>, &str> {
debug!("receiver_for({})", channel);
if let Some(c) = self.channels.get(channel) {
Ok(c.sender.subscribe())
@ -156,7 +157,6 @@ mod tests {
#[test]
fn test_bus_send() {
pretty_env_logger::init();
let mut b = Bus::new();
let ch = "test".to_string();
b.stateless(vec!["test".to_string()]);
@ -166,7 +166,7 @@ mod tests {
let p = Arc::new(e);
if let Ok(value) = b.send(&ch, p.clone()) {
let value = rx.try_recv().unwrap();
//assert_eq!(p.m.id, value.m.id);
assert_eq!(p, value);
} else {
assert!(false);
}

View File

@ -17,13 +17,16 @@ extern crate serde_json;
use chrono::Local;
use futures::future;
use futures::{FutureExt, StreamExt, SinkExt};
use futures::stream::{SplitStream, SplitSink};
use handlebars::Handlebars;
use log::{debug, error, info, trace};
use serde::Serialize;
use tokio::sync::broadcast::Receiver;
use warp::Filter;
use warp::reject::Rejection;
use warp::ws::{Message, WebSocket};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::thread;
@ -117,21 +120,52 @@ fn load_templates(hb: &mut Handlebars) {
}
}
/*
* TODO: This is an idea for later
trait WarpRouter {
fn routes() -> String;
struct Connection {
tx: SplitSink<WebSocket, Message>,
bus: Arc<Bus>,
channels: HashMap<String, Receiver<SendableEvent>>,
}
struct Router;
impl WarpRouter for Router {
fn routes() -> String {
// TODO: Refactor the routes out of the main so that the warp main can be re-used across
// otto services
"".to_string()
impl Connection {
fn new(tx: SplitSink<WebSocket, Message>, bus: Arc<Bus>) -> Connection {
Connection {
tx,
bus,
channels: HashMap::new(),
}
}
fn subscribe(&self, named: &str) {
let mut bus_rx = self.bus.receiver_for(named).unwrap();
tokio::task::spawn(async move {
loop {
match bus_rx.recv().await {
Ok(ev) => {
info!("Need to dispatch: {:?}", ev);
let meta = msg::Meta::new("all".to_string());
let em = msg::OutputMessage {
meta,
msg: ev.m.clone(),
};
info!("dispatching output message: {:?}", em);
self.tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
},
Err(err) => {
error!("Failed to listen to channel: {:?}", err);
},
}
}
});
}
fn dispatch(&self, _ev: Arc<msg::Output>) {
}
}
*/
#[tokio::main]
async fn main() {
@ -172,48 +206,51 @@ async fn main() {
let index = warp::path::end().and(with_render(hb)).and_then(index);
let ws = warp::path("ws")
// The `ws()` filter will prepare the Websocket handshake.
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
/*
* Cloning again just to propagate the pointer, bleh
*/
let b3 = b2.clone();
// And then our closure will be called when it completes...
ws.on_upgrade(move |websocket| {
info!("Connection established for {:?}", websocket);
// Just echo all messages back...
let (mut tx, rx) = websocket.split();
//if let Ok(bus_rx) = b3.receiver_for(&"all".to_string()) {
// bus_rx.forward(tx).map(|result| {
// info!("forwarded: {}", result);
// });
// info!("hi");
//}
let mut erx = b3.receiver_for(&"all".to_string()).unwrap();
tokio::task::spawn(async move {
loop {
if let Ok(bus_event) = erx.recv().await {
let meta = msg::Meta::new("all".to_string());
let em = msg::OutputMessage {
meta,
msg: bus_event.m.clone(),
};
info!("dispatching output message: {:?}", em);
tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
}
else {
error!("Failed to access a bus event in the websocket client loop");
}
}
});
let (tx, rx) = websocket.split();
let con = Connection::new(tx, b3);
con.subscribe(CHANNEL_ALL);
tokio::task::spawn(rx.for_each(|item| {
info!("Item received: {:?}", item);
future::ready(())
}));
future::ready(())
/*
* Task for receiving events from the bus that we are interested
* in, and then forwarding them along to the connected sink
*/
//tokio::task::spawn(async move {
// loop {
// if let Ok(bus_event) = erx.recv().await {
// let meta = msg::Meta::new("all".to_string());
// let em = msg::OutputMessage {
// meta,
// msg: bus_event.m.clone(),
// };
// info!("dispatching output message: {:?}", em);
// tx.send(Message::text(serde_json::to_string(&em).unwrap())).await;
// }
// else {
// error!("Failed to access a bus event in the websocket client loop");
// }
// }
//});
})
});
let routes = warp::get().and(index.or(ws));
let routes = warp::get().and(index.or(ws));
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

View File

@ -79,14 +79,17 @@
};
sock.send(JSON.stringify(m));
};
sock.onerror = (err) => {
console.error('WebSocket error, reconnecting..');
sock.close();
};
sock.onclose = (event) => {
console.log('WebSocket connection lost, reconnecting..');
connectSocket();
}
sock.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(data);