Refactor the eventbus crate a bit to allow for client and server separation

This should make it a bit easier to incorporate the necessary client code that
all other services will want to import
This commit is contained in:
R Tyler Croy 2019-12-30 08:14:00 -08:00
parent 950fec403a
commit ee4aa60618
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
5 changed files with 27 additions and 18 deletions

View File

@ -4,6 +4,10 @@ version = "0.1.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
[[bin]]
name = "otto-eventbus"
path = "src/server/main.rs"
[dependencies]
actix = "~0.9.0"
# Grabbing an alpha version with async/await support.

View File

@ -9,9 +9,6 @@ use actix::Message;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub mod bus;
pub mod client;
#[derive(Serialize, Deserialize, Debug, Message)]
#[serde(tag = "command", rename_all = "camelCase")]
#[rtype(result = "()")]

View File

@ -10,6 +10,9 @@ use serde_json;
use std::sync::Arc;
use otto_eventbus::Command;
use crate::*;
/*
* Define the Websocket Actor needed for Actix
*
@ -17,26 +20,26 @@ use std::sync::Arc;
* keep track of this connection and its configuration
*/
pub struct WSClient {
events: Addr<crate::bus::EventBus>,
events: Addr<eventbus::EventBus>,
}
impl WSClient {
pub fn new(eb: Addr<crate::bus::EventBus>) -> Self {
pub fn new(eb: Addr<eventbus::EventBus>) -> Self {
Self { events: eb }
}
fn handle_text(&self, text: String, ctx: &<WSClient as Actor>::Context) {
let command = serde_json::from_str::<crate::Command>(&text);
let command = serde_json::from_str::<Command>(&text);
match command {
Ok(c) => {
// Since we have a Command, what kind?
match c {
crate::Command::Subscribe { client, channel } => {
Command::Subscribe { client, channel } => {
info!("Subscribing {} to {}", client, channel);
// Sent it along to the bus
// TODO: This should not use do_send which ignores errors
self.events.do_send(crate::bus::Subscribe {
self.events.do_send(eventbus::Subscribe {
to: channel,
addr: ctx.address(),
});
@ -54,10 +57,10 @@ impl WSClient {
/**
* Handle Basic eventbus messages by serializing them over to the websocket
*/
impl Handler<Arc<crate::Command>> for WSClient {
impl Handler<Arc<Command>> for WSClient {
type Result = ();
fn handle(&mut self, msg: Arc<crate::Command>, ctx: &mut Self::Context) {
fn handle(&mut self, msg: Arc<Command>, ctx: &mut Self::Context) {
ctx.text(serde_json::to_string(&msg).unwrap());
}
}
@ -70,7 +73,7 @@ impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let sub = crate::bus::Subscribe {
let sub = eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
};

View File

@ -8,13 +8,15 @@ use std::sync::Arc;
use log::{error, info};
use crate::*;
/*
* NOTE: I would like for the bus module not to know anything at all about the clients.
*
* At the moment I believe that would require a bit more type and generics surgery
* than I am currently willing to expend on the problem
*/
type ClientId = Addr<crate::client::WSClient>;
type ClientId = Addr<connection::WSClient>;
#[derive(Message)]
#[rtype(result = "()")]

View File

@ -24,7 +24,10 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;
use otto_eventbus::*;
use otto_eventbus::Command;
pub mod connection;
pub mod eventbus;
/**
* Templates is a rust-embed struct which will contain all the files embedded from the
@ -43,7 +46,7 @@ struct Templates;
struct Static;
struct AppState {
bus: Addr<bus::EventBus>,
bus: Addr<eventbus::EventBus>,
// Handlebars uses a repository for the compiled templates. This object must be
// shared between the application threads, and is therefore passed to the
// Application Builder as an atomic reference-counted pointer.
@ -76,7 +79,7 @@ async fn ws_index(
stream: web::Payload,
state: web::Data<AppState>,
) -> Result<HttpResponse, Error> {
let actor = client::WSClient::new(state.bus.clone());
let actor = connection::WSClient::new(state.bus.clone());
let res = ws::start(actor, &r, stream);
trace!("{:?}", res.as_ref().unwrap());
res
@ -119,14 +122,14 @@ async fn main() -> std::io::Result<()> {
.get::<Vec<String>>("channels.stateful")
.expect("Failed to load `channels.stateful` configuration, which must be an array");
let events = bus::EventBus::with_channels(stateless, stateful).start();
let events = eventbus::EventBus::with_channels(stateless, stateful).start();
let bus = events.clone();
thread::spawn(move || loop {
let pulse = format!("heartbeat {}", Local::now());
trace!("sending pulse: {}", pulse);
let event = crate::bus::Event {
e: Arc::new(crate::Command::Heartbeat),
let event = eventbus::Event {
e: Arc::new(Command::Heartbeat),
channel: "all".to_string(),
};
bus.do_send(event);