Provide the basics for an EventBusClient #2

Manually merged
rtyler merged 12 commits from resource-allocation into master 2020-01-14 05:44:41 +00:00
5 changed files with 63 additions and 18 deletions
Showing only changes of commit 0f3582db01 - Show all commits

1
Cargo.lock generated
View File

@ -1317,7 +1317,6 @@ dependencies = [
"actix-web-actors 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"awc 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -31,6 +31,3 @@ serde = { version = "~1.0.103", features = ["rc"] }
serde_json = "~1.0.0"
otto-eventbus = { path = "../eventbus" }
# Used for formatting times in messages
chrono = { version = "~0.4.10", features = ["serde"] }

View File

@ -5,7 +5,6 @@ extern crate actix;
extern crate actix_http;
extern crate actix_web;
extern crate awc;
extern crate chrono;
extern crate pretty_env_logger;
use actix::io::SinkWrite;
@ -19,7 +18,6 @@ use awc::{
BoxedSocket, Client,
};
use bytes::Bytes;
use chrono::Utc;
use futures::stream::{SplitSink, StreamExt};
use log::{error, info};
@ -54,7 +52,10 @@ async fn main() -> std::io::Result<()> {
let (sink, stream) = framed.split();
let _addr = EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient(SinkWrite::new(sink, ctx))
EventBusClient {
sink: SinkWrite::new(sink, ctx),
id: "auctioneer",
}
});
HttpServer::new(move || {
@ -68,7 +69,24 @@ async fn main() -> std::io::Result<()> {
.await
}
struct EventBusClient(SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>);
/**
* An EventBusClient is capable of connecting to, reading messages from, and sending messages to
* the eventbus.
*/
struct EventBusClient {
/**
* The sink is a writable object which can send messages back to the EventBus
*/
sink: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
/**
* String identifier for the client
*
* This should be persisted between invocations of the process if the client
* should have any semblence of persistence with its messages
*/
id: &'static str,
}
#[derive(Message)]
#[rtype(result = "()")]
@ -79,15 +97,12 @@ impl Actor for EventBusClient {
fn started(&mut self, ctx: &mut Context<Self>) {
let input = InputMessage {
meta: Meta {
channel: "".to_string(),
ts: Utc::now(),
},
meta: Meta::default(),
msg: Input::Connect {
name: "auctioneer".to_string(),
name: self.id.to_string(),
},
};
self.0
self.sink
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
@ -102,7 +117,7 @@ impl Actor for EventBusClient {
impl EventBusClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.0.write(Message::Ping(Bytes::from_static(b""))).unwrap();
act.sink.write(Message::Ping(Bytes::from_static(b""))).unwrap();
act.hb(ctx);
// client should also check for a timeout here, similar to the
@ -116,7 +131,7 @@ impl Handler<ClientCommand> for EventBusClient {
type Result = ();
fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context<Self>) {
self.0.write(Message::Text(msg.0)).unwrap();
self.sink.write(Message::Text(msg.0)).unwrap();
}
}

View File

@ -20,6 +20,14 @@ fn epoch() -> DateTime<Utc> {
return Utc.ymd(1970, 1, 1).and_hms_milli(0, 0, 1, 444);
}
/**
* Default function for the deserialization of an empty channel, will just create an empty string
*/
fn default_channel() -> String {
return "".to_owned();
}
/**
* The `Meta` struct contains the necessary metadata about a message which is being sent over the
* wire
@ -29,6 +37,7 @@ fn epoch() -> DateTime<Utc> {
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
pub struct Meta {
#[serde(default = "default_channel")]
pub channel: String,
#[serde(default = "epoch")]
pub ts: DateTime<Utc>,
@ -120,3 +129,29 @@ pub struct InputMessage {
pub msg: Input,
pub meta: Meta,
}
/**
* The Default trait for `Meta` will create a functionally empty Meta struct
*/
impl Default for Meta {
fn default() -> Meta {
Meta {
channel: default_channel(),
ts: epoch(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use chrono::Utc;
#[test]
fn test_default_for_meta() {
let m = Meta::default();
assert_eq!(m.channel, "");
assert!(m.ts < Utc::now());
}
}

View File

@ -169,5 +169,4 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
}
#[cfg(test)]
mod test {
}
mod test {}