Shuffle messages in and out of the websockets and the eventbus

This has the basics of working, but the Event objects are properly formed to
share the right msgs
This commit is contained in:
R Tyler Croy 2020-01-19 23:53:21 -08:00
parent d837eb8af1
commit bf662d72d5
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
3 changed files with 249 additions and 58 deletions

View File

@ -7,27 +7,65 @@
pub mod client;
pub mod msg;
use log::*;
use tokio::sync::broadcast::{channel, Receiver, SendError, Sender};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{channel, Receiver, Sender};
/**
* The maximum number of items in transit for each channel
*/
const MAX_CHANNEL_QUEUE: usize = 16;
#[derive(Clone, Debug, PartialEq)]
pub struct Event {
pub id: i64,
}
type SendableEvent = Arc<Event>;
/**
* A channel is named and typed with the type of messages it should be carrying
*/
#[derive(Debug)]
struct Channel {
name: String,
send: Sender<Message>,
recv: Receiver<Message>,
stateful: bool,
sender: Sender<Arc<Event>>,
receiver: Receiver<Arc<Event>>,
}
struct Message {}
impl Channel {
fn send(&self, msg: Message) {}
fn recv(&self, msg: Message) {}
fn send(&self, ev: SendableEvent) -> Result<usize, SendError<SendableEvent>> {
self.sender.send(ev)
}
pub fn new(name: String, stateful: bool) -> Self {
let (sender, receiver) = tokio::sync::broadcast::channel(MAX_CHANNEL_QUEUE);
Channel {
name,
stateful,
sender,
receiver,
}
}
pub fn stateful(name: String) -> Channel {
Channel::new(name, true)
}
pub fn stateless(name: String) -> Channel {
Channel::new(name, false)
}
pub fn is_stateful(&self) -> bool {
self.stateful == true
}
}
struct Bus {
pub struct Bus {
/**
* Channels are named and can implement a number of different types. This should
* allow the Bus to handle different channels with different message payloads
@ -36,7 +74,131 @@ struct Bus {
channels: HashMap<String, Channel>,
}
impl Bus {
pub fn new() -> Bus {
Bus {
channels: HashMap::new(),
}
}
/**
* Configure the bus with a number of stateless channels
*
* Stateless channels are not intended to be persisted by the eventbus
*/
pub fn stateless(&mut self, channels: Vec<String>) -> &mut Bus {
for channel in channels.iter() {
self.channels.insert(
channel.to_string(),
Channel::new(channel.to_string(), false),
);
}
self
}
/**
* Configure the bus with a number of stateful channels
*/
pub fn stateful(&mut self, channels: Vec<String>) -> &mut Bus {
for channel in channels.iter() {
self.channels
.insert(channel.to_string(), Channel::new(channel.to_string(), true));
}
self
}
/**
* Determine whether the named channel is configured in thebus
*/
pub fn has_channel(&self, channel: &str) -> bool {
self.channels.contains_key(channel)
}
/**
* Send an event to the named channel
*/
pub fn send(
&self,
channel: &String,
ev: SendableEvent,
) -> Result<usize, SendError<SendableEvent>> {
if let Some(c) = self.channels.get(channel) {
c.send(ev)
} else {
Err(SendError(ev))
}
}
/**
* Create a new receiver for the named channel
*/
pub fn receiver_for(&self, channel: &String) -> Result<Receiver<SendableEvent>, &str> {
debug!("receiver_for({})", channel);
if let Some(c) = self.channels.get(channel) {
Ok(c.sender.subscribe())
} else {
error!("Failed to get channel");
Err("Fail")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bus_send() {
pretty_env_logger::init();
let mut b = Bus::new();
let ch = "test".to_string();
b.stateless(vec!["test".to_string()]);
if let Ok(mut rx) = b.receiver_for(&ch) {
let e = Event { id: 1 };
let p = Arc::new(e);
if let Ok(value) = b.send(&ch, p.clone()) {
let value = rx.try_recv().unwrap();
assert_eq!(p.id, value.id);
} else {
assert!(false);
}
} else {
/*
* This branch should never execute unless the test should fail
*/
assert!(false);
}
}
#[test]
fn test_bus_stateless() {
let mut b = Bus::new();
b.stateless(vec!["test".to_string()]);
assert!(b.has_channel("test"));
}
#[test]
fn test_bus_stateful() {
let mut b = Bus::new();
b.stateful(vec!["test".to_string()]);
assert!(b.has_channel("test"));
}
#[test]
fn test_channel_ctor() {
let c = Channel::new("test".to_string(), false);
}
#[test]
fn test_channel_stateful() {
let c = Channel::stateful("test".to_string());
assert!(c.is_stateful());
}
#[test]
fn test_channel_stateless() {
let c = Channel::stateless("test".to_string());
assert!(!c.is_stateful());
}
}

View File

@ -15,6 +15,7 @@ extern crate rust_embed;
extern crate serde_json;
use chrono::Local;
use futures::future;
use futures::{FutureExt, StreamExt};
use handlebars::Handlebars;
use log::{debug, error, info, trace};
@ -115,28 +116,88 @@ fn load_templates(hb: &mut Handlebars) {
}
}
/*
* TODO: This is an idea for later
trait WarpRouter {
fn routes() -> String;
}
struct Router;
impl WarpRouter for Router {
fn routes() -> String {
// TODO: Refactor the routes out of the main so that the warp main can be re-used across
// otto services
"".to_string()
}
}
*/
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let settings = load_settings();
let mut hb = Handlebars::new();
load_templates(&mut hb);
let hb = Arc::new(hb);
let _settings = load_settings();
let mut bus = Bus::new();
bus.stateless(settings
.get::<Vec<String>>("channels.stateless")
.expect("Failed to load `channels.stateless`"));
bus.stateful(settings
.get::<Vec<String>>("channels.stateful")
.expect("Failed to load `channels.stateful`"));
let seconds = settings
.get("heartbeat")
.expect("Invalid `heartbeat` configuration, must be an integer");
let b = Arc::new(bus);
let b1 = b.clone();
let b2 = b.clone();
thread::spawn(move || loop {
let ts = Local::now();
let pulse = format!("heartbeat {}", ts);
info!("sending pulse: {}", pulse);
let e = Event { id: ts.timestamp(), };
b1.send(&"all".to_string(), Arc::new(e));
//let event = eventbus::Event {
// e: Arc::new(Output::Heartbeat),
// channel: Arc::new("all".to_string()),
//};
//bus.do_send(event);
thread::sleep(Duration::from_secs(seconds));
});
let index = warp::path::end().and(with_render(hb)).and_then(index);
let ws = warp::path("ws")
// The `ws()` filter will prepare the Websocket handshake.
.and(warp::ws())
.map(|ws: warp::ws::Ws| {
.map(move |ws: warp::ws::Ws| {
let b3 = b2.clone();
// And then our closure will be called when it completes...
ws.on_upgrade(|websocket| {
ws.on_upgrade(move |websocket| {
// Just echo all messages back...
let (tx, rx) = websocket.split();
rx.forward(tx).map(|result| {
if let Err(e) = result {
error!("websocket error: {:?}", e);
//if let Ok(bus_rx) = b3.receiver_for(&"all".to_string()) {
// bus_rx.forward(tx).map(|result| {
// info!("forwarded: {}", result);
// });
// info!("hi");
//}
let mut erx = b3.receiver_for(&"all".to_string()).unwrap();
tokio::task::spawn(async move {
loop {
let t = erx.recv().await;
info!("t: {:?}", t);
}
})
});
tokio::task::spawn(rx.for_each(|item| {
info!("Item received: {:?}", item);
future::ready(())
}));
future::ready(())
})
});
let routes = warp::get().and(index.or(ws));
@ -148,8 +209,6 @@ async fn main() {
mod tests {
use super::*;
use regex::Regex;
#[test]
fn test_load_settings() {
let c = load_settings();
@ -158,44 +217,4 @@ mod tests {
assert!(motd.len() > 0);
assert_eq!(pulse, 60);
}
/**
* This test just ensures that the server can come online properly and render its index handler
* properly.
*
* It doesn't really test much useful, but does ensure that critical failures in the eventbus
* can sometimes be prevented
*/
#[tokio::test]
async fn test_basic_http() {
/*
let events = eventbus::EventBus::with_channels(vec![], vec![]).start();
let state = AppState {
bus: events,
hb: Arc::new(Handlebars::new()),
};
let wd = web::Data::new(state);
let srv = test::start(move || {
App::new()
.app_data(wd.clone())
.route("/", web::get().to(index))
});
let req = srv.get("/");
let mut response = req.send().await.unwrap();
assert!(response.status().is_success());
let re = Regex::new(r"(v\d\.\d\.\d)").unwrap();
let body = response.body().await.unwrap();
let buffer = String::from_utf8(body.to_vec()).unwrap();
let matches = re.captures(&buffer).unwrap();
let version = matches.get(1).unwrap();
assert_eq!(
version.as_str(),
format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))
);
*/
}
}

View File

@ -67,7 +67,17 @@
<!--
function connectSocket() {
const sock = new WebSocket('ws://localhost:8000/ws');
sock.onopen = (event) => console.log('WebSocket connected');
sock.onopen = (event) => {
console.log('WebSocket connected');
const m = {
meta: {
},
msg: {
name: "eventbus-homepage",
},
};
sock.send(JSON.stringify(m));
};
sock.onerror = (err) => {
console.error('WebSocket error, reconnecting..');
sock.close();