smol/examples/websocket-server.rs

117 lines
3.7 KiB
Rust
Raw Normal View History

2020-03-29 15:59:28 +00:00
#![recursion_limit = "1024"]
use std::fs;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Context as _, Result};
use async_native_tls::{TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::prelude::*;
use smol::{blocking, Async, Task};
use tungstenite::Message;
async fn serve(mut stream: WsStream, host: String) -> Result<()> {
println!("Serving {}", host);
let msg = stream.next().await.context("expected a message")??;
2020-04-01 19:40:27 +00:00
stream
.send(Message::text(format!("Server echoes: {}", msg)))
.await?;
2020-03-29 15:59:28 +00:00
Ok(())
}
2020-04-01 19:40:27 +00:00
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
let host = match &tls {
None => format!("ws://{}", listener.get_ref().local_addr()?),
Some(_) => format!("wss://{}", listener.get_ref().local_addr()?),
};
println!("Listening on {}", host);
loop {
let (stream, _) = listener.accept().await?;
let host = host.clone();
match &tls {
None => {
let stream = WsStream::Plain(async_tungstenite::accept_async(stream).await?);
Task::spawn(serve(stream, host.clone())).unwrap().detach();
}
Some(tls) => {
let stream = tls.accept(stream).await?;
let stream = WsStream::Tls(async_tungstenite::accept_async(stream).await?);
Task::spawn(serve(stream, host.clone())).unwrap().detach();
}
}
}
}
2020-03-29 15:59:28 +00:00
fn main() -> Result<()> {
// Create a thread pool.
for _ in 0..num_cpus::get_physical().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
smol::block_on(async {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("identity.pfx");
let identity = blocking!(fs::read(path))?;
let tls = TlsAcceptor::new(&identity[..], "password").await?;
2020-04-01 19:40:27 +00:00
let ws = listen(Async::<TcpListener>::bind("127.0.0.1:9000")?, None);
let wss = listen(Async::<TcpListener>::bind("127.0.0.1:9001")?, Some(tls));
future::try_join(ws, wss).await?;
2020-03-29 15:59:28 +00:00
2020-04-01 19:40:27 +00:00
Ok(())
2020-03-29 15:59:28 +00:00
})
}
enum WsStream {
Plain(WebSocketStream<Async<TcpStream>>),
Tls(WebSocketStream<TlsStream<Async<TcpStream>>>),
}
impl Sink<Message> for WsStream {
type Error = tungstenite::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_ready(cx),
WsStream::Tls(s) => Pin::new(s).poll_ready(cx),
}
}
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).start_send(item),
WsStream::Tls(s) => Pin::new(s).start_send(item),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_flush(cx),
WsStream::Tls(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_close(cx),
WsStream::Tls(s) => Pin::new(s).poll_close(cx),
}
}
}
impl Stream for WsStream {
type Item = tungstenite::Result<Message>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self {
WsStream::Plain(s) => Pin::new(s).poll_next(cx),
WsStream::Tls(s) => Pin::new(s).poll_next(cx),
}
}
}