Initial commit

Still working
This commit is contained in:
R Tyler Croy 2020-06-07 15:17:25 -07:00
commit 1e42eb0640
7 changed files with 527 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
Cargo.lock

21
Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "meows"
version = "0.1.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
[dependencies]
async-std = "~1.6.0"
async-tungstenite = "~0.5.0"
futures = "~0.3.5"
lazy_static = "1.4.0"
log = "~0.4.8"
tungstenite = "~0.10.1"
serde = { version = "~1.0.111", features = ["rc"] }
serde_derive = "~1.0.106"
serde_json = "~1.0.53"
smol = "~0.1.11"
[dev-dependencies]
pretty_env_logger = "~0.3.1"

6
README.adoc Normal file
View File

@ -0,0 +1,6 @@
= Message Exchange Over Web Sockets 🐈
This crate implements the simple pattern needed for implementing
service-to-service message exchange over WebSocket connections

68
examples/index.html Normal file
View File

@ -0,0 +1,68 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<title>Meows</title>
</head>
<body>
<center>
<h1>Meows</h1>
<div style="width: 50%">
<form onsubmit="return sendMessage();">
<input type="text" width="100" id="input"/>
</form>
<div id="messages">
</div>
</div>
</center>
<script type="text/javascript">
<!--
function sendMessage() {
let msg = document.getElementById('input').value;
console.log(`Invoking send message for ${msg}`);
window.sock.send(msg);
document.getElementById('input').value = '';
return false;
}
window.wsDelay = 1000;
function connectSocket() {
const sock = new WebSocket("ws://127.0.0.1:8105/");
sock.onopen = (event) => {
// Reset our timeout interval
window.wsDelay = 10000;
console.log('WebSocket connected');
};
sock.onerror = (err) => {
console.error('WebSocket error, reconnecting..');
sock.close();
};
sock.onclose = (event) => {
console.log('WebSocket connection lost, reconnecting..');
setTimeout(() => {
window.wsDelay = window.wsDelay * 2;
console.log(`reconnection delay: ${window.wsDelay}`);
connectSocket();
}, window.wsDelay);
}
sock.onmessage = (event) => {
console.log(`Received response: ${event.data}`);
const buffer = document.getElementById('messages');
buffer.insertBefore(document.createElement('br'), buffer.firstChild);
buffer.insertBefore(document.createTextElement(event.data), buffer.firstChild);
};
window.sock = sock;
}
connectSocket();
-->
</script>
</body>
</html>

48
examples/simple.rs Normal file
View File

@ -0,0 +1,48 @@
/**
* The simple server just handles a ping/pong message via a websocket
*/
extern crate futures;
#[macro_use]
extern crate meows;
extern crate pretty_env_logger;
#[macro_use]
extern crate serde_derive;
use async_tungstenite::WebSocketStream;
use futures::prelude::*;
use meows::*;
use smol;
use std::net::TcpStream;
#[derive(Debug, Deserialize, Serialize)]
struct Ping {
msg: String,
}
impl meows::Handler for Ping {
fn handle(real: Ping) -> Result<(), std::io::Error> {
info!("Ping handler: {:?}", real);
Ok(())
}
}
struct Echo;
impl Echo {
async fn handle(message: String) -> tungstenite::Message {
info!("Message received! {}", message);
tungstenite::Message::text(message)
}
}
fn main() -> Result<(), std::io::Error> {
pretty_env_logger::init();
default_meows!(Echo);
meows!("ping" => Ping);
println!("Starting simple ping/pong websocket server with meows");
let server = meows::Server { };
smol::run(async move {
server.serve("127.0.0.1:8105".to_string()).await
})
}

249
src/lib.rs Normal file
View File

@ -0,0 +1,249 @@
/**
* Meows is a simpmle library for making it easy to implement websocket message
* handlers, built on top of async-tungstenite and the async ecosystem
*/
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate smol;
use async_tungstenite::WebSocketStream;
use futures::stream::*;
use log::*;
use serde::de::DeserializeOwned;
use smol::{Async, Task};
use std::collections::HashMap;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
type DefaultDispatch = Arc<dyn Fn(String) -> (Box<dyn std::future::Future<Output=tungstenite::Message> + Unpin>) + Send + Sync>;
/**
* The internal mechanism for keeping track of message handlers
*/
pub struct Registry {
dispatchers: HashMap<String, Dispatch>,
default: Option<DefaultDispatch>,
}
impl Registry {
/**
* Insert a handler into the registry
*/
pub fn insert(&mut self, key: String, value: Dispatch) {
self.dispatchers.insert(key, value);
}
/**
* Retrieve a registered handler
*/
pub fn get(&self, key: &String) -> Option<&Dispatch> {
self.dispatchers.get(key)
}
/**
* Set the default dispatch handler
*/
pub fn set_default(&mut self, handler: DefaultDispatch) {
self.default = Some(handler);
}
}
impl Default for Registry {
fn default() -> Registry {
Registry {
dispatchers: HashMap::new(),
default: None,
}
}
}
lazy_static! {
pub static ref REGISTRY: Arc<Mutex<Registry>> = {
Arc::new(Mutex::new(Registry::default()))
};
}
#[allow(unused_macros)]
#[macro_export]
macro_rules! meows {
($($e:expr), * => $($t:ty), *) => {
$(
meows::REGISTRY.lock().expect("Failed to unlock meows registry")
.insert($e.to_string(), meows::Dispatch::new(|v| { <$t>::handle_value::<$t>(v); }));
)*
}
}
#[allow(unused_macros)]
#[macro_export]
macro_rules! default_meows {
($T:ty) => {
meows::REGISTRY.lock().expect("Failed to unlock meows registry")
.set_default(
Arc::new(
|m| Box::new(<$T>::handle(m))
)
);
}
}
/**
* The Envelope handles the serialization/deserialization of the outer part of a
* websocket message.
*
* All websocket messages are expected to have the basic format of:
* ```json
* {
* "type" : "foo",
* "value": {}
* }
* ```
* The contents of `value` can be completely arbitrary and are expected to be
* deserializable into whatever the `type` value string is , e.g. `Foo` in this
* example.
*/
#[derive(Debug, Deserialize, Serialize)]
pub struct Envelope {
#[serde(rename = "type")]
ttype: String,
value: serde_json::Value,
}
/**
* The Handler trait must be applied to any structs which are expected to
* be deserialized and instantiated to handle an incoming message
*
* For example
*
* ```
* # #[macro_use] extern crate serde_derive; fn main() {
* use meows::Handler;
*
* #[derive(Debug, Deserialize, Serialize)]
* struct Hello {
* friend: String,
* }
* impl meows::Handler for Hello {
* fn handle(r: Self) -> Result<(), std::io::Error> {
* println!("Handling the hello for: {:?}", r);
* Ok(())
* }
* }
* let value = serde_json::from_str(r#"{"friend":"ferris"}"#).unwrap();
* assert!(Hello::handle_value::<Hello>(value).is_ok());
* # }
* ```
*/
pub trait Handler: Send + Sync {
/**
* convert will take the given Value and attempt to convert it to the trait implementer's type
*
* If the conversion cannot be done properly, None will be returned
*/
fn convert(v: serde_json::Value) -> Option<Self> where Self: std::marker::Sized + serde::de::DeserializeOwned {
serde_json::from_value::<Self>(v).map_or(None, |res| Some(res))
}
/**
* Implementors should implement the handle function which will be invoked
* with a deserialized/constructed version of the struct itself
*/
fn handle(r: Self) -> Result<(), std::io::Error>;
/**
* handle must be implemented by the trait implementer and should
* do something novel with the value
*/
fn handle_value<T: Handler + DeserializeOwned>(v: serde_json::Value) -> Result<(), std::io::Error> {
if let Some(real) = T::convert(v) {
return Handler::handle(real);
}
Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!"))
}
}
/**
* Dispatch is a struct which exists solely to help Box some closures into the
* registry.
*/
pub struct Dispatch {
f: Box<dyn Fn(serde_json::Value) + Send + Sync>,
}
impl Dispatch {
pub fn new<F>(f: F) -> Self
where
F: Fn(serde_json::Value) + 'static + Send + Sync,
{
Self { f: Box::new(f) }
}
}
/**
* The Server is the primary means of listening for messages
*/
pub struct Server {
}
impl Server {
pub async fn serve(&self, listen_on: String) -> Result<(), std::io::Error> {
debug!("Starting to listen on: {}", &listen_on);
let listener = Async::<TcpListener>::bind(listen_on)?;
loop {
let (stream, _) = listener.accept().await?;
match async_tungstenite::accept_async(stream).await {
Ok(ws) => {
Task::spawn(async move {
Server::handle_connection(ws).await;
}).detach();
},
Err(e) => {
error!("Failed to process WebSocket handshake: {}", e);
}
}
}
}
async fn handle_connection(mut stream: WebSocketStream<Async<TcpStream>>) -> Result<(), std::io::Error> {
while let Some(raw) = stream.next().await {
trace!("WebSocket message received: {:?}", raw);
match raw {
Ok(message) => {
let message = message.to_string();
if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
debug!("Value deserialized: {}", value);
}
else {
/*
* If we didn't have a specific handler, try to invoke the default handler
* if it exists
*/
let default = match &REGISTRY.lock().unwrap().default {
Some(d) => Some(d.clone()),
None => None,
};
if default.is_some() {
(default.unwrap())(message).await;
}
}
},
Err(e) => {
error!("Error receiving message: {}", e);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}

133
src/server.rs Normal file
View File

@ -0,0 +1,133 @@
//! A WebSocket+TLS echo server based on `async-tungstenite` and `async-native-tls`.
//!
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-client
//! ```
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Context as _, Result};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::prelude::*;
use smol::{Async, Task};
use tungstenite::Message;
/// Echoes messages from the client back to it.
async fn echo(mut stream: WsStream) -> Result<()> {
let msg = stream.next().await.context("expected a message")??;
stream.send(Message::text(msg.to_string())).await?;
Ok(())
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
let host = match &tls {
None => format!("ws://{}", listener.get_ref().local_addr()?),
Some(_) => format!("wss://{}", listener.get_ref().local_addr()?),
};
println!("Listening on {}", host);
loop {
// Accept the next connection.
let (stream, _) = listener.accept().await?;
println!("Accepted client: {}", stream.get_ref().peer_addr()?);
match &tls {
None => {
let stream = WsStream::Plain(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
}
Some(tls) => {
// In case of WSS, establish a secure TLS connection first.
let stream = tls.accept(stream).await?;
let stream = WsStream::Tls(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
}
}
}
}
fn main() -> Result<()> {
// Initialize TLS with the local certificate, private key, and password.
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start WS and WSS servers.
smol::block_on(async {
let ws = listen(Async::<TcpListener>::bind("127.0.0.1:9000")?, None);
let wss = listen(Async::<TcpListener>::bind("127.0.0.1:9001")?, Some(tls));
future::try_join(ws, wss).await?;
Ok(())
})
}
/// A WebSocket or WebSocket+TLS connection.
enum WsStream {
/// A plain WebSocket connection.
Plain(WebSocketStream<Async<TcpStream>>),
/// A WebSocket connection secured by TLS.
Tls(WebSocketStream<TlsStream<Async<TcpStream>>>),
}
impl Sink<Message> for WsStream {
type Error = tungstenite::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_ready(cx),
WsStream::Tls(s) => Pin::new(s).poll_ready(cx),
}
}
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).start_send(item),
WsStream::Tls(s) => Pin::new(s).start_send(item),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_flush(cx),
WsStream::Tls(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_close(cx),
WsStream::Tls(s) => Pin::new(s).poll_close(cx),
}
}
}
impl Stream for WsStream {
type Item = tungstenite::Result<Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_next(cx),
WsStream::Tls(s) => Pin::new(s).poll_next(cx),
}
}
}