Remove everything actix in the eventbus, slowly replacing with warp

As of now, the only thing that is "back" is the index route, nothing else has
yet to be restored
This commit is contained in:
R Tyler Croy 2020-01-17 23:27:08 -08:00
parent 5b8538f38f
commit 0b067780a5
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
10 changed files with 63 additions and 1134 deletions

839
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,12 +13,8 @@ name = "otto-ebc"
path = "src/cli/main.rs"
[dependencies]
# Actix provides the web and websocket basis we sit on top of
actix = "~0.9.0"
actix-web = "~2.0.0"
actix-web-actors = "~2.0.0"
actix-rt = "~1.0.0"
actix-http = "~1.0.1"
warp = { version = "~0.2.0", features = ["websocket"] }
tokio = { version = "~0.2", features = ["full"] }
handlebars = "~2.0.2"
# Need to embed static resources into the binary for serving at runtime
@ -40,19 +36,9 @@ pretty_env_logger = "~0.3.1"
serde = { version = "~1.0.103", features = ["rc"] }
serde_json = "~1.0.0"
# Needed for websockets in cli.rs
tungstenite = "~0.9.2"
url = "~2.1.0"
# Needed for the nice bits in the cli
rustyline = "~5.0.6"
# Used for the websocket client
actix-codec = "~0.2.0"
awc = "~1.0.1"
futures = "~0.3.1"
bytes = "~0.5.3"
[dev-dependencies]
regex = "1"

View File

@ -1,6 +1,6 @@
/**
fn main() {}
/*
* The CLI is meant to be used for manual testing and verification of the eventbus only.
*/
extern crate rustyline;
use rustyline::error::ReadlineError;
@ -69,3 +69,4 @@ fn main() {
}
rl.save_history(history).unwrap();
}
*/

View File

@ -1,6 +1,5 @@
/**!
/*
* This module is the eventbus client
*/
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
@ -184,3 +183,4 @@ impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {}
*/

View File

@ -1,3 +1,6 @@
#![allow(unused_imports)]
#![allow(dead_code)]
/**
* The msg module contains common message definitions for serialization and deserialization of data
* across the eventbus
@ -5,7 +8,6 @@
extern crate serde;
extern crate serde_json;
use actix::Message;
use chrono::prelude::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -36,8 +38,7 @@ fn default_channel() -> String {
*
* It is not intended to carry message contents itself, but rather information about the message
*/
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[derive(Serialize, Deserialize, Debug)]
pub struct Meta {
#[serde(default = "default_channel")]
pub channel: String,
@ -61,9 +62,8 @@ impl Meta {
* The Output enums are all meant to capture the types of messages which can be received from the
* eventbus.
*/
#[derive(Serialize, Deserialize, Debug, Message)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type", rename_all = "camelCase")]
#[rtype(result = "()")]
pub enum Output {
Heartbeat,
Message {
@ -81,8 +81,7 @@ pub enum Output {
* Clients should be prepared to handle each of these messages coming over the channels they
* subscribe to.
*/
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[derive(Serialize, Deserialize, Debug)]
pub struct OutputMessage {
pub msg: Arc<Output>,
pub meta: Meta,
@ -93,9 +92,8 @@ pub struct OutputMessage {
* as "inputs."
*
*/
#[derive(Serialize, Deserialize, Debug, Message)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type", rename_all = "camelCase")]
#[rtype(result = "()")]
pub enum Input {
/**
* A Connect message must be sent for the client to start receiving messages
@ -140,8 +138,7 @@ pub enum Input {
* This struct should never be constructed except by client websocket handlers just prior to
* writing to an active websocket
*/
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[derive(Serialize, Deserialize, Debug)]
pub struct InputMessage {
pub msg: Input,
pub meta: Meta,

View File

@ -1,172 +0,0 @@
/**
* The client module contains all the logic for handling the client connections to the eventbus.
*
* It does _not_ contain a client implementation to the eventbus however
*/
use actix::*;
use actix_web_actors::ws;
use chrono::prelude::Utc;
use log::{error, info, trace};
use serde_json;
use std::sync::Arc;
use crate::*;
/*
* Define the Websocket Actor needed for Actix
*
* This actor will contain state for each connection, and can also have some helper functions to
* keep track of this connection and its configuration
*/
pub struct WSClient {
events: Addr<eventbus::EventBus>,
}
/**
* The WSClient is an actor responsible for the stateful behavior of each websocket
* connected client
*/
impl WSClient {
/**
* Construct the WSClient with the given EventBus actor to communicate with
*/
pub fn new(eb: Addr<eventbus::EventBus>) -> Self {
Self { events: eb }
}
/**
* handle_connect() takes care of the Input::Connect message and will ensure
* that the client is connected to the "all" channel as well as its inbox"
*/
fn handle_connect(&self, client_name: String, ctx: &mut <WSClient as Actor>::Context) {
self.events.do_send(eventbus::Subscribe {
to: "all".to_owned(),
addr: ctx.address(),
});
/*
* Both of these message sends are creating their own String to represent
* the inbox channel.
*
* This SHOULD be fixed, but will require some more thinking about how
* the lifetimes of Channel objects should work
*/
self.events.do_send(eventbus::CreateChannel {
channel: eventbus::Channel {
name: format!("inbox.{}", client_name),
stateful: true,
},
});
self.events.do_send(eventbus::Subscribe {
to: format!("inbox.{}", client_name),
addr: ctx.address(),
});
}
/**
* handle_text handles _all_ incoming messages from the websocket connection,
* it is responsible for translating those JSON messages into something the
* eventbus can pass around internally
*/
fn handle_text(&self, text: String, ctx: &mut <WSClient as Actor>::Context) {
let command = serde_json::from_str::<InputMessage>(&text);
match command {
Ok(c) => {
// Since we have a Command, what kind?
match c {
InputMessage { msg, meta } => {
match msg {
Input::Connect { name } => {
info!("Received connect for client named: {}", name);
self.handle_connect(name, ctx)
}
Input::Publish { payload } => {
info!("received publish: {:?}", payload);
self.events.do_send(eventbus::Event {
e: Arc::new(Output::Message { payload: payload }),
channel: Arc::new(meta.channel),
});
}
Input::Subscribe { client } => {
info!("Subscribing {} to {}", client, meta.channel);
// Sent it along to the bus
// TODO: This should not use do_send which ignores errors
self.events.do_send(eventbus::Subscribe {
to: meta.channel,
addr: ctx.address(),
});
}
_ => (),
};
}
};
}
Err(e) => {
error!("Error parsing message from client: {:?}", e);
}
}
}
}
/**
* Handle eventbus Output messages by serializing them over to the websocket
*/
impl Handler<eventbus::Event> for WSClient {
type Result = ();
/**
* The `handle` function will be invoked when the WSClient actor receives a message which is
* intended to be sent to the client via a WebSocket.
*
* The handler will serialize the Output, and add additional metadata for the client
*/
fn handle(&mut self, event: eventbus::Event, ctx: &mut Self::Context) {
let meta = Meta {
channel: event.channel.to_string(),
ts: Utc::now(),
};
let out = OutputMessage { msg: event.e, meta };
// TODO: error
ctx.text(serde_json::to_string(&out).unwrap());
}
}
/**
* Implement the Actor trait, which ensures we can plug into the actix-web-actors tooling for
* handling websocket connections
*/
impl Actor for WSClient {
type Context = ws::WebsocketContext<Self>;
}
/**
* Handler for the ws::Message message for the WSClient actor
*
* This handler will be called for every message from a websocket client
*/
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
trace!("WebSocket message received: {:?}", msg);
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => self.handle_text(text, ctx),
ws::Message::Binary(bin) => ctx.binary(bin),
_ => (),
}
}
}
#[cfg(test)]
mod test {}

View File

@ -1,7 +1,6 @@
/**
* The bus module contains the actual eventbus actor
*/
use actix::*;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
@ -10,13 +9,7 @@ use log::{error, info};
use crate::*;
/*
* NOTE: I would like for the bus module not to know anything at all about the clients.
*
* At the moment I believe that would require a bit more type and generics surgery
* than I am currently willing to expend on the problem
*/
type ClientId = Addr<connection::WSClient>;
type ClientId = u64;
/**
* The Channel struct is used as an internal representation of each channel that
@ -38,28 +31,20 @@ pub struct Channel {
* It is used primarily for creating new channels on-demand, such as those needed
* for new client inboxes
*/
#[derive(Message)]
#[rtype(result = "()")]
pub struct CreateChannel {
pub channel: Channel,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Subscribe {
pub to: String,
pub addr: ClientId,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
pub struct Event {
pub e: Arc<crate::Output>,
pub channel: Arc<String>,
}
#[derive(Message)]
#[rtype(usize)]
pub struct Unsubscribe {
pub addr: ClientId,
}
@ -90,18 +75,6 @@ pub struct EventBus {
channels: HashMap<Channel, HashSet<ClientId>>,
}
/**
*
* The Actor trait for the Eventbus allows it to act as an actor in the actix system
*/
impl Actor for EventBus {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
info!("Starting Eventbus with channels {:?}", self.channels.keys());
}
}
impl EventBus {
/**
* Configure the EventBus instance with channels
@ -131,6 +104,7 @@ impl EventBus {
}
}
/*
impl Handler<CreateChannel> for EventBus {
type Result = ();
@ -197,6 +171,7 @@ impl Handler<Event> for EventBus {
}
}
}
*/
#[cfg(test)]
mod test {

View File

@ -1,10 +1,10 @@
#![allow(unused_imports)]
#![allow(dead_code)]
/**
* The main module for otto-eventbus simply sets up the responders and the main
* server loop that the eventbus uses.
*/
extern crate actix;
extern crate actix_http;
extern crate actix_web;
extern crate config;
extern crate log;
extern crate pretty_env_logger;
@ -13,21 +13,19 @@ extern crate rust_embed;
#[macro_use]
extern crate serde_json;
use actix::{Actor, Addr};
use actix_web::{middleware, web};
use actix_web::{App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use chrono::Local;
use handlebars::Handlebars;
use log::{info, trace};
use log::{error, info, trace};
use serde::Serialize;
use warp::Filter;
use std::convert::Infallible;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use otto_eventbus::*;
pub mod connection;
pub mod eventbus;
/**
@ -46,50 +44,45 @@ struct Templates;
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct Static;
struct AppState {
bus: Addr<eventbus::EventBus>,
// Handlebars uses a repository for the compiled templates. This object must be
// shared between the application threads, and is therefore passed to the
// Application Builder as an atomic reference-counted pointer.
hb: Arc<Handlebars>,
}
/**
* index serves up the homepage which is not really functional, but at least shows lost users
* something
*/
async fn index(state: web::Data<AppState>) -> HttpResponse {
async fn index(hb: Arc<Handlebars>) -> Result<impl warp::Reply, Infallible> {
let data = json!({
"version" : option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"),
});
let template = Templates::get("index.html").unwrap();
let body = state
.hb
.render_template(std::str::from_utf8(template.as_ref()).unwrap(), &data)
.expect("Failed to render the index.html template!");
HttpResponse::Ok().body(body)
if let Ok(view) = hb.render("index.html", &data) {
Ok(warp::reply::html(view))
}
else {
error!("Failed to render");
Ok(warp::reply::html("Fail".to_string()))
}
}
/**
* ws_index is the handler for all websocket connections, all it is responsible for doing is
* handling the inbound connection and creating a new WSClient for the connection
*/
async fn ws_index(
r: HttpRequest,
stream: web::Payload,
state: web::Data<AppState>,
) -> Result<HttpResponse, Error> {
let actor = connection::WSClient::new(state.bus.clone());
let res = ws::start(actor, &r, stream);
trace!("{:?}", res.as_ref().unwrap());
res
fn with_render(hb: Arc<Handlebars>) -> impl Filter<Extract = (Arc<Handlebars>,), Error=Infallible> + Clone {
warp::any().map(move || hb.clone())
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let mut hb = Handlebars::new();
for t in Templates::iter() {
let template = Templates::get(&t)
.expect("Somehow we iterated Templates but didn't get one? How is this possible!");
let buf = std::str::from_utf8(template.as_ref())
.expect(format!("Unable to convert {} to a string buffer", &t).as_str());
hb.register_template_string(&t, buf)
.expect(format!("Failed to register {} as a Handlebars template", &t).as_str());
info!("Registered handlebars template: {}", &t);
}
let hb = Arc::new(hb);
let routes = warp::any().and(with_render(hb)).and_then(index);
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
/*
let embedded_settings = Static::get("eventbus.yml").unwrap();
let defaults = std::str::from_utf8(embedded_settings.as_ref()).unwrap();
/*
@ -139,30 +132,13 @@ async fn main() -> std::io::Result<()> {
.expect("Invalid `heartbeat` configuration, must be an integer");
thread::sleep(Duration::from_secs(seconds));
});
*/
let state = AppState {
bus: events,
hb: Arc::new(Handlebars::new()),
};
let wd = web::Data::new(state);
HttpServer::new(move || {
App::new()
.app_data(wd.clone())
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::default())
.route("/", web::get().to(index))
.route("/ws/", web::get().to(ws_index))
})
.bind("127.0.0.1:8000")?
.run()
.await
}
#[cfg(test)]
mod test {
mod tests {
use super::*;
use actix_web::{test, web, App};
use regex::Regex;
@ -173,8 +149,9 @@ mod test {
* It doesn't really test much useful, but does ensure that critical failures in the eventbus
* can sometimes be prevented
*/
#[actix_rt::test]
#[tokio::test]
async fn test_basic_http() {
/*
let events = eventbus::EventBus::with_channels(vec![], vec![]).start();
let state = AppState {
bus: events,
@ -202,5 +179,6 @@ mod test {
version.as_str(),
format!("v{}", option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"))
);
*/
}
}

View File

@ -11,7 +11,6 @@ serde = "~1.0.103"
serde_json = "~1.0.0"
serde_yaml = "~0.8.11"
actix = "~0.9.0"
log = "~0.4.8"
pretty_env_logger = "~0.3.1"

View File

@ -1,11 +1,9 @@
extern crate actix;
extern crate clap;
extern crate pretty_env_logger;
extern crate serde;
extern crate serde_yaml;
extern crate uuid;
use actix::*;
use clap::{App, Arg};
use log::*;
use serde::{Deserialize, Serialize};
@ -89,6 +87,7 @@ fn main() {
serde_yaml::to_string(&output).expect("Failed to serialize manifest")
);
/*
let sys = System::new("name");
Arbiter::spawn(async {
let client = connect("http://127.0.0.1:8000/ws/", "processor-travis-ci").await;
@ -111,6 +110,7 @@ fn main() {
//System::current().stop();
});
sys.run().unwrap();
*/
}
#[derive(Deserialize, Debug, Serialize)]