Add support for sending a termination signal to the server loop

See examples/terminate.rs, but this basically allows for using Meows easily
inside of a test infrastructure.
This commit is contained in:
R Tyler Croy 2020-07-05 07:56:23 -07:00
parent 71e48ad9e9
commit c49b9a09d7
4 changed files with 139 additions and 23 deletions

View File

@ -11,7 +11,6 @@ keywords = ["websocket", "async"]
readme = "README.md"
[dependencies]
async-channel = "~1.1.1"
async-tungstenite = "~0.5.0"
futures = "~0.3.5"
log = "~0.4.8"

View File

@ -9,6 +9,7 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate serde_derive;
use futures::sink::SinkExt;
use meows::*;
use smol;
use std::sync::Arc;
@ -25,7 +26,7 @@ async fn handle_ping(mut req: Request<(), ()>) -> Option<Message> {
if let Some(ping) = req.from_value::<Ping>() {
info!("Ping received with message: {}", ping.msg);
for i in 1..5 {
for i in 1..5_u64 {
req.sink.send(
Message::text(format!("pong {}", i))
).await;

47
examples/terminate.rs Normal file
View File

@ -0,0 +1,47 @@
/**
* This example demonstrates terminating the server from another task or thread
* inside the same process
*/
#[macro_use]
extern crate meows;
extern crate pretty_env_logger;
use futures::sink::SinkExt;
use meows::*;
use smol;
use std::time::Duration;
use std::sync::Arc;
/**
* The default handler for unknown strings is to do nothing,
* this handler will instead send whatever message we get back to the client
*/
async fn default_echo(message: String, _state: Arc<()>) -> Option<Message> {
info!("Default echo: {}", message);
Some(Message::text(message))
}
/**
* THe main is pretty simple, just fire up the Meows server and set the handlers
*/
fn main() -> Result<(), std::io::Error> {
pretty_env_logger::init();
println!("Starting simple ping/pong websocket server with meows");
let mut server = meows::Server::new();
server.default(default_echo);
let mut controller = server.get_control_channel();
smol::Task::spawn(async move {
info!("Counting to three, then you have to go to your room mister!");
for _ in 1..10 {
let dur = Duration::from_secs(1);
smol::Timer::after(dur).await;
}
controller.send(Control::Terminate).await;
}).detach();
smol::run(async move { server.serve("127.0.0.1:8105".to_string()).await })
}

View File

@ -1,5 +1,13 @@
/*
* This is apparently helpful/necessary for dealing with recursion wackiness
* with the use of futures::select! inside of the `serve` function.
*
* This should be removed at a later point when somebody understands better than
* I do currently, wth should happen.
*/
#![recursion_limit="256"]
/**
* Meows is a simpmle library for making it easy to implement websocket message
* Meows is a simple library for making it easy to implement websocket message
* handlers, built on top of async-tungstenite and the async ecosystem
*/
@ -10,7 +18,9 @@ extern crate smol;
use async_tungstenite::WebSocketStream;
use futures::future::BoxFuture;
use futures::channel::mpsc::{channel, Sender, Receiver};
use futures::prelude::*;
use futures::sink::SinkExt;
use log::*;
use serde::de::DeserializeOwned;
use smol::{Async, Task};
@ -68,7 +78,7 @@ pub struct Request<ServerState, ClientState> {
pub env: Envelope,
pub state: Arc<ServerState>,
pub client_state: Arc<RwLock<ClientState>>,
pub sink: async_channel::Sender<Message>,
pub sink: Sender<Message>,
}
impl<ServerState, ClientState> Request<ServerState, ClientState> {
@ -115,10 +125,38 @@ where
type Callback<ServerState, ClientState> = Arc<Box<dyn Endpoint<ServerState, ClientState>>>;
type DefaultCallback<ServerState, ClientState> = Arc<Box<dyn DefaultEndpoint<ServerState, ClientState>>>;
/**
* The Control enum contains all the control commands that another task/thread
* can send to the meows::Server in order to control some of its behaviors
*/
#[derive(Debug)]
pub enum Control {
// Terminate the server loop
Terminate,
}
#[derive(Debug)]
struct Controller {
tx: Sender<Control>,
rx: Receiver<Control>,
}
impl Default for Controller {
fn default() -> Self {
let (tx, rx) = channel::<Control>(1);
Self {
tx,
rx,
}
}
}
/**
* The Server is the primary means of listening for messages
*/
pub struct Server<ServerState, ClientState> {
control: Controller,
state: Arc<ServerState>,
handlers: Arc<RwLock<HashMap<String, Callback<ServerState, ClientState>>>>,
default: DefaultCallback<ServerState, ClientState>,
@ -131,6 +169,7 @@ impl<ServerState: 'static + Send + Sync, ClientState: 'static + Default + Send +
*/
pub fn with_state(state: ServerState) -> Self {
Server {
control: Controller::default(),
state: Arc::new(state),
handlers: Arc::new(RwLock::new(HashMap::default())),
default: Arc::new(Box::new(Server::<ServerState, ClientState>::default_handler)),
@ -189,6 +228,16 @@ impl<ServerState: 'static + Send + Sync, ClientState: 'static + Default + Send +
self.default = Arc::new(Box::new(invoke));
}
/**
* Retrieve a cloned reference to the contrroller's sender
*
* This allows the caller to send messages to the control loop from other
* tasks and threads
*/
pub fn get_control_channel(&self) -> Sender<Control> {
return self.control.tx.clone();
}
/**
* Default handler which is used if the user doesn't specify a handler
* that should be used for messages Meows doesn't understand
@ -213,27 +262,46 @@ impl<ServerState: 'static + Send + Sync, ClientState: 'static + Default + Send +
* }
* ```
*/
pub async fn serve(&self, listen_on: String) -> Result<(), std::io::Error> {
pub async fn serve(&mut self, listen_on: String) -> Result<(), std::io::Error> {
debug!("Starting to listen on: {}", &listen_on);
let listener = Async::<TcpListener>::bind(listen_on)?;
loop {
let (stream, _) = listener.accept().await?;
match async_tungstenite::accept_async(stream).await {
Ok(ws) => {
let state = self.state.clone();
let handlers = self.handlers.clone();
let default = self.default.clone();
Task::spawn(async move {
Server::<ServerState, ClientState>::handle_connection(state, default, handlers, ws)
.await;
})
.detach();
}
Err(e) => {
error!("Failed to process WebSocket handshake: {}", e);
}
// Beware the recursion issues here, see:
// <https://stackoverflow.com/questions/56930108/select-from-a-list-of-sockets-using-futures>
futures::select! {
res = listener.accept().fuse() => {
let (stream, _) = res?;
match async_tungstenite::accept_async(stream).await {
Ok(ws) => {
let state = self.state.clone();
let handlers = self.handlers.clone();
let default = self.default.clone();
Task::spawn(async move {
Server::<ServerState, ClientState>::handle_connection(state, default, handlers, ws)
.await;
})
.detach();
},
Err(e) => {
error!("Failed to process WebSocket handshake: {}", e);
},
}
},
// XXX: This is ending up dominating the loop, need to figure out a way to
// try-recv() in a block like this
ctrl = self.control.rx.next().fuse() => {
info!("receiving control socket: {:?}", ctrl);
match ctrl {
Some(Control::Terminate) => {
// Exit out of our accept loop
return Ok(());
},
other => {
warn!("Unhandled message on control channel: {:?}", other);
}
}
},
}
}
}
@ -260,10 +328,10 @@ impl<ServerState: 'static + Send + Sync, ClientState: 'static + Default + Send +
* function itself.
*/
let (mut writer, mut reader) = stream.split();
let (channel_tx, channel_rx) = async_channel::unbounded::<Message>();
let (mut channel_tx, mut channel_rx) = channel::<Message>(1024);
Task::spawn(async move {
while let Ok(outgoing) = channel_rx.recv().await {
while let Some(outgoing) = channel_rx.next().await {
trace!("Must send {:?} to the socket", outgoing);
writer.send(outgoing).await;
}
@ -327,6 +395,7 @@ impl<ServerState: 'static + Send + Sync, ClientState: 'static + Default + Send +
impl Server<(), ()> {
pub fn new() -> Self {
Server {
control: Controller::default(),
state: Arc::new(()),
handlers: Arc::new(RwLock::new(HashMap::default())),
default: Arc::new(Box::new(Server::<(), ()>::default_handler)),