Provide the basics for an EventBusClient #2

Manually merged
rtyler merged 12 commits from resource-allocation into master 2020-01-14 05:44:41 +00:00
1 changed files with 28 additions and 16 deletions
Showing only changes of commit e0a9b0c870 - Show all commits

View File

@ -8,20 +8,20 @@ extern crate awc;
extern crate chrono;
extern crate pretty_env_logger;
use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use actix_web::{middleware, web};
use actix_web::{App, HttpResponse, HttpServer};
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
BoxedSocket, Client,
};
use actix::io::SinkWrite;
use actix_web::{middleware, web};
use actix_web::{App, HttpResponse, HttpServer};
use bytes::Bytes;
use chrono::Utc;
use futures::stream::{SplitSink, StreamExt};
use log::{info, error};
use log::{error, info};
use std::time::Duration;
@ -52,7 +52,7 @@ async fn main() -> std::io::Result<()> {
info!("{:?}", response);
let (sink, stream) = framed.split();
let addr = EventBusClient::create(|ctx| {
let _addr = EventBusClient::create(|ctx| {
EventBusClient::add_stream(stream, ctx);
EventBusClient(SinkWrite::new(sink, ctx))
});
@ -87,7 +87,9 @@ impl Actor for EventBusClient {
name: "auctioneer".to_string(),
},
};
self.0.write(Message::Text(serde_json::to_string(&input).unwrap())).unwrap();
self.0
.write(Message::Text(serde_json::to_string(&input).unwrap()))
.unwrap();
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
@ -118,9 +120,19 @@ impl Handler<ClientCommand> for EventBusClient {
}
}
impl Handler<OutputMessage> for EventBusClient {
type Result = ();
fn handle(&mut self, _output: OutputMessage, _ctx: &mut Context<Self>) {
/*
* For heartbeats we really don't need to do anything
*/
}
}
/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, _: &mut Context<Self>) {
fn handle(&mut self, msg: Result<Frame, WsProtocolError>, ctx: &mut Context<Self>) {
if let Ok(Frame::Text(txt)) = msg {
/*
* We have received _some_ message from the eventbus, let's try to
@ -128,12 +140,16 @@ impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
*/
let msg = serde_json::from_slice::<OutputMessage>(&txt);
match msg {
Ok(msg) => {
info!("Received valid message: {:?}", msg);
},
Ok(output) => {
/*
* Dispatch the message basically back to ourself for easy
* handling
*/
ctx.address().do_send(output);
}
Err(e) => {
error!("Received invalid message: {}", e);
},
}
}
}
}
@ -150,7 +166,6 @@ impl StreamHandler<Result<Frame, WsProtocolError>> for EventBusClient {
impl actix::io::WriteHandler<WsProtocolError> for EventBusClient {}
#[cfg(test)]
mod test {
use super::*;
@ -162,10 +177,7 @@ mod test {
*/
#[actix_rt::test]
async fn test_basic_http() {
let srv = test::start(move || {
App::new()
.route("/", web::get().to(route_index))
});
let srv = test::start(move || App::new().route("/", web::get().to(route_index)));
let req = srv.get("/");
let response = req.send().await.unwrap();