async-tungstenite/tests/communication.rs

118 lines
3.7 KiB
Rust

#![cfg(feature = "handshake")]
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::{accept_async, client_async, WebSocketStream};
use futures::prelude::*;
use log::*;
use tungstenite::Message;
async fn run_connection<S>(
connection: WebSocketStream<S>,
msg_tx: futures::channel::oneshot::Sender<Vec<Message>>,
) where
S: AsyncRead + AsyncWrite + Unpin,
{
info!("Running connection");
let mut connection = connection;
let mut messages = vec![];
while let Some(message) = connection.next().await {
info!("Message received");
let message = message.expect("Failed to get message");
messages.push(message);
}
msg_tx.send(messages).expect("Failed to send results");
}
#[async_std::test]
async fn communication() {
let _ = env_logger::try_init();
let (con_tx, con_rx) = futures::channel::oneshot::channel();
let (msg_tx, msg_rx) = futures::channel::oneshot::channel();
let f = async move {
let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
info!("Server ready");
con_tx.send(()).unwrap();
info!("Waiting on next connection");
let (connection, _) = listener.accept().await.expect("No connections to accept");
let stream = accept_async(connection).await;
let stream = stream.expect("Failed to handshake with connection");
run_connection(stream, msg_tx).await;
};
task::spawn(f);
info!("Waiting for server to be ready");
con_rx.await.expect("Server not ready");
let tcp = TcpStream::connect("127.0.0.1:12345")
.await
.expect("Failed to connect");
let url = "ws://localhost:12345/";
let (mut stream, _) = client_async(url, tcp)
.await
.expect("Client failed to connect");
for i in 1..10 {
info!("Sending message");
stream
.send(Message::Text(format!("{}", i)))
.await
.expect("Failed to send message");
}
stream.close(None).await.expect("Failed to close");
info!("Waiting for response messages");
let messages = msg_rx.await.expect("Failed to receive messages");
assert_eq!(messages.len(), 10);
}
#[async_std::test]
async fn split_communication() {
let _ = env_logger::try_init();
let (con_tx, con_rx) = futures::channel::oneshot::channel();
let (msg_tx, msg_rx) = futures::channel::oneshot::channel();
let f = async move {
let listener = TcpListener::bind("127.0.0.1:12346").await.unwrap();
info!("Server ready");
con_tx.send(()).unwrap();
info!("Waiting on next connection");
let (connection, _) = listener.accept().await.expect("No connections to accept");
let stream = accept_async(connection).await;
let stream = stream.expect("Failed to handshake with connection");
run_connection(stream, msg_tx).await;
};
task::spawn(f);
info!("Waiting for server to be ready");
con_rx.await.expect("Server not ready");
let tcp = TcpStream::connect("127.0.0.1:12346")
.await
.expect("Failed to connect");
let url = url::Url::parse("ws://localhost:12345/").unwrap();
let (stream, _) = client_async(url, tcp)
.await
.expect("Client failed to connect");
let (mut tx, _rx) = stream.split();
for i in 1..10 {
info!("Sending message");
tx.send(Message::Text(format!("{}", i)))
.await
.expect("Failed to send message");
}
tx.close().await.expect("Failed to close");
info!("Waiting for response messages");
let messages = msg_rx.await.expect("Failed to receive messages");
assert_eq!(messages.len(), 10);
}