Refactor the EventBusClient code into the eventus crate where it belongs

Now that I have something simple and functioning, this no longer needs to be
in the auctioneer

The fun work begins now, supporting custom messages known only to the auctioneer
for its needs, but passing those through EventBusClient
This commit is contained in:
R Tyler Croy 2020-01-11 16:16:54 -08:00
parent 274b5020aa
commit 7066789a3b
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
7 changed files with 179 additions and 161 deletions

9
Cargo.lock generated
View File

@ -1310,16 +1310,11 @@ name = "otto-auctioneer"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"otto-eventbus 0.1.0",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1332,13 +1327,17 @@ name = "otto-eventbus"
version = "0.1.0"
dependencies = [
"actix 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-codec 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-http 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"handlebars 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_env_logger 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -10,12 +10,6 @@ actix = "~0.9.0"
actix-http = "~1.0.1"
actix-rt = "~1.0.0"
actix-web = "~2.0.0"
# Used for the websocket client
actix-codec = "~0.2.0"
actix-web-actors = "~2.0.0"
awc = "~1.0.1"
futures = "~0.3.1"
bytes = "~0.5.3"
# Handling command line options
clap = { version = "~2.33.0", features = ["yaml"] }

View File

@ -4,26 +4,13 @@
extern crate actix;
extern crate actix_http;
extern crate actix_web;
extern crate awc;
extern crate pretty_env_logger;
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use actix_web::{middleware, web};
use actix_web::{App, HttpResponse, HttpServer};
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use bytes::Bytes;
use futures::stream::{SplitSink, StreamExt};
use log::{error, info};
use log::debug;
use std::time::Duration;
use otto_eventbus::*;
use otto_eventbus::client::*;
/**
* The index handler for the root of the Auctioneer web interface
@ -36,27 +23,8 @@ async fn route_index() -> HttpResponse {
async fn main() -> std::io::Result<()> {
pretty_env_logger::init();
/*
* Creating a awc::Client to handle the first part of our WebSocket client bootstrap
*/
let (response, framed) = Client::new()
.ws("http://127.0.0.1:8000/ws/")
.connect()
.await
.map_err(|e| {
error!("Error: {}", e);
})
.unwrap();
info!("{:?}", response);
let (sink, stream) = framed.split();
let _addr = EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: "auctioneer",
}
});
let client = connect("http://127.0.0.1:8000/ws/").await;
debug!("Client created: {:?}", client);
HttpServer::new(move || {
App::new()
@ -69,118 +37,6 @@ async fn main() -> std::io::Result<()> {
.await
}
/**
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
* the eventbus.
*/
struct EventBusClient {
/**
* The sink is a writable object which can send messages back to the EventBus
*/
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
/**
* String identifier for the client
*
* This should be persisted between invocations of the process if the client
* should have any semblence of persistence with its messages
*/
id: &'static str,
}
#[derive(Message)]
#[rtype(result = "()")]
struct ClientCommand(String);
impl Actor for EventBusClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let input = InputMessage {
meta: Meta::default(),
msg: Input::Connect {
name: self.id.to_string(),
},
};
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
info!("Disconnected");
}
}
impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap();
act.hb(ctx);
// client should also check for a timeout here, similar to the
// server code
});
}
}
/// Handle stdin commands
impl Handler<ClientCommand> for EventBusClient {
type Result = ();
fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context<Self>) {
self.sink.write(Message::Text(msg.0)).unwrap();
}
}
impl Handler<OutputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
/*
* For heartbeats we really don't need to do anything
*/
}
}
/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
if let Ok(Frame::Text(txt)) = msg {
/*
* We have received _some_ message from the eventbus, let's try to
* decode it!
*/
let msg = serde_json::from_slice::<OutputMessage>(&txt);
match msg {
Ok(output) => {
/*
* Dispatch the message basically back to ourself for easy
* handling
*/
ctx.address().do_send(output);
}
Err(e) => {
error!("Received invalid message: {}", e);
}
}
}
}
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("Connected");
}
fn finished(&mut self, ctx: &mut Context<Self>) {
info!("Server disconnected");
ctx.stop()
}
}
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {
use super::*;

View File

@ -10,7 +10,7 @@ path = "src/server/main.rs"
[[bin]]
name = "otto-ebc"
path = "src/cli.rs"
path = "src/cli/main.rs"
[dependencies]
# Actix provides the web and websocket basis we sit on top of
@ -47,5 +47,12 @@ url = "~2.1.0"
rustyline = "~5.0.6"
# Used for the websocket client
actix-codec = "~0.2.0"
awc = "~1.0.1"
futures = "~0.3.1"
bytes = "~0.5.3"
[dev-dependencies]
regex = "1"

161
eventbus/src/client.rs Normal file
View File

@ -0,0 +1,161 @@
/**!
* This module is the eventbus client
*/
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use bytes::Bytes;
use futures::stream::{SplitSink, StreamExt};
use log::{error, info};
use std::time::Duration;
use crate::*;
/**
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
* the eventbus.
*/
pub struct EventBusClient {
/**
* The sink is a writable object which can send messages back to the EventBus
*/
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
/**
* String identifier for the client
*
* This should be persisted between invocations of the process if the client
* should have any semblence of persistence with its messages
*/
id: &'static str,
}
/**
* Implementation of the Debug trait so that the EventBusClient can be included
* in logging statements and print each instance's `id`
*/
impl std::fmt::Debug for EventBusClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EventBusClient<id:{}>", self.id)
}
}
/**
* connect will create begin the connection process and start the EventBusClient
* actor.
*
* The caller will need to await in order to get the EventBusClient
*/
pub async fn connect(ws: &str) -> Addr<EventBusClient> {
/*
* Creating a awc::Client to handle the first part of our WebSocket client bootstrap
*/
let (response, framed) = Client::new()
.ws(ws)
.connect()
.await
.map_err(|e| {
error!("Error: {}", e);
})
.unwrap();
info!("{:?}", response);
let (sink, stream) = framed.split();
EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: "auctioneer",
}
})
}
impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap();
act.hb(ctx);
// client should also check for a timeout here, similar to the
// server code
});
}
}
impl Actor for EventBusClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let input = InputMessage {
meta: Meta::default(),
msg: Input::Connect {
name: self.id.to_string(),
},
};
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
info!("Disconnected");
}
}
impl Handler<OutputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
/*
* For heartbeats we really don't need to do anything
*/
}
}
/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
if let Ok(Frame::Text(txt)) = msg {
/*
* We have received _some_ message from the eventbus, let's try to
* decode it!
*/
let msg = serde_json::from_slice::<OutputMessage>(&txt);
match msg {
Ok(output) => {
/*
* Dispatch the message basically back to ourself for easy
* handling
*/
ctx.address().do_send(output);
}
Err(e) => {
error!("Received invalid message: {}", e);
}
}
}
}
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("Connected");
}
fn finished(&mut self, ctx: &mut Context<Self>) {
info!("Server disconnected");
ctx.stop()
}
}
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {
}

View File

@ -12,6 +12,8 @@ use serde_json::Value;
use std::sync::Arc;
pub mod client;
/**
* Default function for deserialize/serialize of times, always defaults to 1970-01-01
*/
@ -142,7 +144,6 @@ impl Default for Meta {
}
}
#[cfg(test)]
mod test {
use super::*;