Remove warp in favor of tide and async-fungstenite

Unfortunately I could not make peace with tokio, which is a good library but
just not for me in this project. Instead, switching gears to 100% async
This commit is contained in:
R Tyler Croy 2020-03-07 17:56:19 -08:00
parent 6b3f313478
commit d1276bbcc8
6 changed files with 910 additions and 205 deletions

783
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,14 @@ name = "otto-ebc"
path = "src/cli/main.rs"
[dependencies]
warp = { version = "~0.2.0", features = ["websocket"] }
tokio = { version = "~0.2.0", features = ["full"] }
futures = { version = "~0.3.0", default-features = false, features = ["alloc"] }
async-std = "~1.5.0"
# Library used for implementing HTTP APIs
tide = "~0.6.0"
mime = "~0.3.16"
http = "~0.1.19"
# Used for managing the websocket channels
async-tungstenite = "~0.4.0"
futures = "~0.3.4"
handlebars = "~2.0.2"
# Need to embed static resources into the binary for serving at runtime

View File

@ -8,7 +8,6 @@ pub mod client;
pub mod msg;
use log::*;
use tokio::sync::broadcast::{channel, Receiver, SendError, Sender};
use std::collections::HashMap;
use std::sync::Arc;
@ -33,6 +32,9 @@ impl Default for Event {
pub type SendableEvent = Arc<Event>;
/*
/**
* A channel is named and typed with the type of messages it should be carrying
*/
@ -50,8 +52,6 @@ impl Channel {
}
pub fn new(name: String, stateful: bool) -> Self {
let (sender, receiver) = tokio::sync::broadcast::channel(MAX_CHANNEL_QUEUE);
Channel {
name,
stateful,
@ -210,3 +210,5 @@ mod tests {
assert!(!c.is_stateful());
}
}
*/

View File

@ -5,26 +5,30 @@
* The main module for otto-eventbus simply sets up the responders and the main
* server loop that the eventbus uses.
*/
extern crate async_std;
extern crate config;
extern crate futures;
extern crate http;
extern crate log;
extern crate mime;
extern crate pretty_env_logger;
#[macro_use]
extern crate rust_embed;
#[macro_use]
extern crate serde_json;
extern crate tide;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::*;
use chrono::Local;
use futures::future;
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use config::Config;
use futures::StreamExt;
use handlebars::Handlebars;
use http::status::StatusCode;
use log::{debug, error, info, trace};
use serde::Serialize;
use tokio::sync::broadcast::Receiver;
use warp::reject::Rejection;
use warp::ws::{Message, WebSocket};
use warp::Filter;
use tide::{Request, Response, Server};
use std::collections::HashMap;
use std::convert::Infallible;
@ -50,35 +54,39 @@ struct Templates;
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct Static;
async fn index(hb: Arc<Handlebars>) -> Result<impl warp::Reply, Rejection> {
let data = json!({
"version" : option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"),
});
if let Ok(view) = hb.render("index.html", &data) {
Ok(warp::reply::html(view))
} else {
error!("Failed to render");
Ok(warp::reply::html("Fail".to_string()))
}
struct State {
hb: Arc<Handlebars>,
conf: Arc<Config>,
}
/**
* Return a filter which will proppogate a Handlebars Arc
*/
fn with_render(
hb: Arc<Handlebars>,
) -> impl Filter<Extract = (Arc<Handlebars>,), Error = Infallible> + Clone {
warp::any().map(move || hb.clone())
async fn index(ctx: Request<State>) -> Response {
let conf = &ctx.state().conf;
let bind = conf.get_str("ws.bind").expect("Could not locate ws.bind");
let port = conf.get_int("ws.port").expect("Could not locate ws.port");
let res = Response::new(200);
let data = json!({
"version" : option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"),
"ws" : format!("{}:{}", bind, port),
});
if let Ok(view) = ctx.state().hb.render("index.html", &data) {
return res.body_string(view).set_mime(mime::TEXT_HTML);
} else {
error!("Failed to render");
return res
.set_status(StatusCode::INTERNAL_SERVER_ERROR)
.body_string("Internal server error".to_string());
}
}
/**
* Load the hierarchy of settings and return the configuration object
*/
fn load_settings() -> config::Config {
let embedded_settings =
Static::get("eventbus.yml").expect("Failed to load built-in/static settings");
let defaults = std::str::from_utf8(embedded_settings.as_ref()).unwrap();
fn load_configuration() -> Config {
let embedded = Static::get("eventbus.yml").expect("Failed to load built-in/static settings");
let defaults = std::str::from_utf8(embedded.as_ref()).unwrap();
/*
* Load our settings in the priority order of:
*
@ -88,23 +96,25 @@ fn load_settings() -> config::Config {
*
* Each layer overriding properties from the last
*/
let mut settings = config::Config::default();
settings
.merge(config::File::from_str(defaults, config::FileFormat::Yaml))
let mut conf = Config::default();
conf.merge(config::File::from_str(defaults, config::FileFormat::Yaml))
.unwrap()
.merge(config::File::with_name("eventbus").required(false))
.unwrap()
.merge(config::Environment::with_prefix("OTTO_EB"))
.unwrap();
let motd: String = settings
let motd: String = conf
.get("motd")
.expect("Configuration had no `motd` setting");
debug!("configured motd: {}", motd);
return settings;
return conf;
}
/**
* Load the Handlebars templates needed for presenting the web UI
*/
fn load_templates(hb: &mut Handlebars) {
for t in Templates::iter() {
if !t.ends_with(".html") {
@ -122,127 +132,115 @@ fn load_templates(hb: &mut Handlebars) {
}
}
#[tokio::main]
async fn main() {
/**
* Handle a given WebSocket connection
*/
async fn handle_connection(stream: TcpStream) {
info!("handle_connection: {:?}", stream);
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
let (write, read) = ws_stream.split();
read.forward(write)
.await
.expect("Failed to forward message")
}
/**
* Create the WebSocket listener for accepting commands and pushing events
*/
async fn serve_ws(conf: Arc<config::Config>) -> Result<(), std::io::Error> {
let bind: String = conf
.get("ws.bind")
.expect("Invalid `ws.bind` configuration, must be a string (e.g. '127.0.0.1')");
let port: u64 = conf
.get("ws.port")
.expect("Invalid `ws.port` configuration, must be an integer");
let listener = TcpListener::bind(format!("{}:{}", bind, port))
.await
.expect(format!("Failed to bind to {}", port).as_str());
info!("Listening for WebSocket connections on {}:{}", bind, port);
while let Ok((stream, _)) = listener.accept().await {
task::spawn(handle_connection(stream));
}
Ok(())
}
/**
* Create the tide HTTP listener for handling conventional web requests
*
* This is mostly used for serving up the web UI for the eventbus
*/
async fn serve_http(conf: Arc<config::Config>, state: State) -> Result<(), std::io::Error> {
let bind: String = conf
.get("http.bind")
.expect("Invalid `http.bind` configuration, must be a string (e.g. '127.0.0.1')");
let port: u64 = conf
.get("http.port")
.expect("Invalid `http.port` configuration, must be an integer");
let mut app = Server::with_state(state);
app.at("/").get(index);
info!("Listening for HTTP connections on {}:{}", bind, port);
app.listen(format!("{}:{}", bind, port)).await?;
info!("HTTP listener exiting..");
Ok(())
}
fn main() {
pretty_env_logger::init();
let settings = load_settings();
info!("Initializing the Otto Eventbus");
let config = load_configuration();
let conf = Arc::new(config);
let mut hb = Handlebars::new();
load_templates(&mut hb);
let hb = Arc::new(hb);
let state = State {
hb: hb,
conf: conf.clone(),
};
let mut bus = Bus::new();
bus.stateless(
settings
.get::<Vec<String>>("channels.stateless")
.expect("Failed to load `channels.stateless`"),
);
bus.stateful(
settings
.get::<Vec<String>>("channels.stateful")
.expect("Failed to load `channels.stateful`"),
);
let event_bus = Arc::new(bus);
let hb_bus = event_bus.clone();
// Create a filter for hte eventbus
let event_bus = warp::any().map(move || event_bus.clone());
let seconds = settings
let seconds: u64 = conf
.get("heartbeat")
.expect("Invalid `heartbeat` configuration, must be an integer");
thread::spawn(move || loop {
let ts = Local::now();
let pulse = format!("heartbeat {}", ts);
info!("sending pulse: {}", pulse);
let hb = msg::Output::Heartbeat {};
let e = Event { m: Arc::new(hb) };
hb_bus.send(&"all".to_string(), Arc::new(e));
thread::sleep(Duration::from_secs(seconds));
});
task::spawn(serve_ws(conf.clone()));
task::block_on(serve_http(conf.clone(), state));
let index = warp::path::end().and(with_render(hb)).and_then(index);
let ws = warp::path("ws").and(warp::ws()).and(event_bus.clone()).map(
move |ws: warp::ws::Ws, bus: Arc<Bus>| {
// And then our closure will be called when it completes...
ws.on_upgrade(move |websocket| {
info!("Connection established for {:?}", websocket);
let (tx, rx) = websocket.split();
// Clone a reference of the bus for this WebSocket connection
let bus_conn = bus.clone();
tokio::task::spawn(async {
let mut conn = Connection::new(tx, rx, bus_conn);
conn.runloop().await;
});
future::ready(())
})
},
);
let routes = warp::get().and(index.or(ws));
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
//thread::spawn(move || loop {
// let ts = Local::now();
// let pulse = format!("heartbeat {}", ts);
// info!("sending pulse: {}", pulse);
// let hb = msg::Output::Heartbeat {};
// let e = Event { m: Arc::new(hb) };
// hb_bus.send(&"all".to_string(), Arc::new(e));
// thread::sleep(Duration::from_secs(seconds));
//});
}
type WsOutput = SplitSink<WebSocket, Message>;
type WsInput = SplitStream<WebSocket>;
#[derive(Debug)]
struct Connection {
tx: WsOutput,
rx: WsInput,
bus: Arc<Bus>,
}
impl Connection {
fn new(tx: WsOutput, rx: WsInput, bus: Arc<Bus>) -> Self {
Connection {
tx,
rx,
bus,
}
}
async fn runloop(&mut self) {
let reader = self.rx.by_ref().for_each(|item| {
info!("Item received: {:?}", item);
future::ready(())
});
let writer = tokio::task::spawn(async {
/*
* NOTE: we need to wait for messages to come in on some channel here?
* busy loop
*/
debug!("Starting writer task for connection {:?}", self);
let bus_rx = self.bus.receiver_for("all").unwrap();
loop {
match bus_rx.recv().await {
Ok(ev) => {
info!("Need to dispatch: {:?}", ev);
},
Err(err) => {
error!("Failed to listen to channel: {:?}", err);
},
}
}
});
// TODO handle errors on the join
future::join(reader, writer).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_settings() {
let c = load_settings();
fn test_load_conf() {
let c = load_configuration();
let motd: String = c.get("motd").unwrap();
let pulse: u64 = c.get("heartbeat").unwrap();
assert!(motd.len() > 0);

View File

@ -1,7 +1,20 @@
# This file contains the default configuration for otto-eventbus
---
motd: 'Welcome to the otto-eventbus, beep beep'
# the http section configures the HTTP (RESTful) listener
http:
bind: 127.0.0.1
port: 9310
# the ws section configures the WebSocket only listener
ws:
bind: 127.0.0.1
# The ws.port is expected to conventionally be at http.port + 1
port: 9311
heartbeat: 60
channels:
stateful:
- audit

View File

@ -60,14 +60,22 @@
<h1>Otto Eventbus 🤖</h1>
</center>
<div>
Configured to hit a WebSocket connection at {{ws}}
</div>
<div class="footer">
v{{version}}
</div>
<script type="text/javascript">
<!--
window.wsDelay = 1000;
function connectSocket() {
const sock = new WebSocket('ws://localhost:8000/ws');
const sock = new WebSocket("ws://{{ws}}/");
sock.onopen = (event) => {
// Reset our timeout interval
window.wsDelay = 1000;
console.log('WebSocket connected');
const m = {
meta: {
@ -82,17 +90,27 @@
sock.onerror = (err) => {
console.error('WebSocket error, reconnecting..');
sock.close();
};
sock.onclose = (event) => {
console.log('WebSocket connection lost, reconnecting..');
connectSocket();
setTimeout(() => {
window.wsDelay = window.wsDelay * 2;
console.log(`reconnection delay: ${window.wsDelay}`);
connectSocket();
}, window.wsDelay);
}
sock.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(data);
try {
const data = JSON.parse(event.data);
console.log(data);
} catch (err) {
console.error("Did not receive JSON data");
console.error(event.data);
}
};
window.sock = sock;
}