smol/examples/chat-server.rs

112 lines
3.4 KiB
Rust
Raw Normal View History

2020-04-20 15:45:14 +00:00
//! A TCP chat server.
//!
//! First start a server:
//!
//! ```
//! cargo run --example chat-server
//! ```
//!
//! Then start clients:
//!
//! ```
//! cargo run --example chat-client
//! ```
2020-04-01 19:40:27 +00:00
use std::collections::HashMap;
2020-07-15 09:37:50 +00:00
use std::net::{SocketAddr, TcpListener, TcpStream};
2020-04-01 19:40:27 +00:00
2020-07-14 19:19:01 +00:00
use async_channel::{bounded, Receiver, Sender};
2020-07-15 09:37:50 +00:00
use async_dup::Arc;
2020-08-26 21:59:49 +00:00
use smol::{io, prelude::*, Async};
2020-04-20 15:45:14 +00:00
2020-04-26 17:37:29 +00:00
/// An event on the chat server.
2020-04-20 15:45:14 +00:00
enum Event {
2020-04-26 17:37:29 +00:00
/// A client has joined.
2020-07-15 09:37:50 +00:00
Join(SocketAddr, Arc<Async<TcpStream>>),
2020-04-26 17:37:29 +00:00
/// A client has left.
2020-04-01 19:40:27 +00:00
Leave(SocketAddr),
2020-04-26 17:37:29 +00:00
/// A client sent a message.
2020-04-01 19:40:27 +00:00
Message(SocketAddr, String),
}
2020-04-20 15:45:14 +00:00
/// Dispatches events to clients.
async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
// Currently active clients.
2020-07-15 09:37:50 +00:00
let mut map = HashMap::<SocketAddr, Arc<Async<TcpStream>>>::new();
2020-04-01 19:40:27 +00:00
2020-04-20 15:45:14 +00:00
// Receive incoming events.
2020-07-14 19:19:01 +00:00
while let Ok(event) = receiver.recv().await {
2020-04-20 15:45:14 +00:00
// Process the event and format a message to send to clients.
let output = match event {
Event::Join(addr, stream) => {
2020-04-01 19:40:27 +00:00
map.insert(addr, stream);
format!("{} has joined\n", addr)
}
2020-04-20 15:45:14 +00:00
Event::Leave(addr) => {
2020-04-01 19:40:27 +00:00
map.remove(&addr);
format!("{} has left\n", addr)
}
2020-04-20 15:45:14 +00:00
Event::Message(addr, msg) => format!("{} says: {}\n", addr, msg),
2020-04-01 19:40:27 +00:00
};
2020-04-20 15:45:14 +00:00
// Display the event in the server process.
2020-04-01 19:40:27 +00:00
print!("{}", output);
2020-04-20 15:45:14 +00:00
// Send the event to all active clients.
2020-04-01 19:40:27 +00:00
for stream in map.values_mut() {
2020-04-21 15:51:07 +00:00
// Ignore errors because the client might disconnect at any point.
2020-09-13 11:41:18 +00:00
stream.write_all(output.as_bytes()).await.ok();
2020-04-01 19:40:27 +00:00
}
}
Ok(())
}
2020-04-20 15:45:14 +00:00
/// Reads messages from the client and forwards them to the dispatcher task.
2020-07-15 09:37:50 +00:00
async fn read_messages(sender: Sender<Event>, client: Arc<Async<TcpStream>>) -> io::Result<()> {
let addr = client.get_ref().peer_addr()?;
2020-07-19 22:55:35 +00:00
let mut lines = io::BufReader::new(client).lines();
2020-04-01 19:40:27 +00:00
while let Some(line) = lines.next().await {
2020-04-20 15:45:14 +00:00
let line = line?;
2020-09-13 11:41:18 +00:00
sender.send(Event::Message(addr, line)).await.ok();
2020-04-01 19:40:27 +00:00
}
Ok(())
}
fn main() -> io::Result<()> {
2020-08-26 21:59:49 +00:00
smol::block_on(async {
2020-04-20 15:45:14 +00:00
// Create a listener for incoming client connections.
2020-07-15 09:37:50 +00:00
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 6000))?;
2020-04-20 15:45:14 +00:00
// Intro messages.
2020-07-15 09:37:50 +00:00
println!("Listening on {}", listener.get_ref().local_addr()?);
2020-04-20 15:45:14 +00:00
println!("Start a chat client now!\n");
2020-04-01 19:40:27 +00:00
2020-04-20 15:45:14 +00:00
// Spawn a background task that dispatches events to clients.
2020-07-14 19:19:01 +00:00
let (sender, receiver) = bounded(100);
2020-08-26 21:59:49 +00:00
smol::spawn(dispatch(receiver)).detach();
2020-04-01 19:40:27 +00:00
loop {
2020-04-20 15:45:14 +00:00
// Accept the next connection.
2020-04-01 19:40:27 +00:00
let (stream, addr) = listener.accept().await?;
2020-07-15 09:37:50 +00:00
let client = Arc::new(stream);
2020-04-01 19:40:27 +00:00
let sender = sender.clone();
2020-04-20 15:45:14 +00:00
// Spawn a background task reading messages from the client.
2020-08-26 21:59:49 +00:00
smol::spawn(async move {
2020-04-20 15:45:14 +00:00
// Client starts with a `Join` event.
2020-09-13 11:41:18 +00:00
sender.send(Event::Join(addr, client.clone())).await.ok();
2020-04-20 15:45:14 +00:00
// Read messages from the client and ignore I/O errors when the client quits.
2020-09-13 11:41:18 +00:00
read_messages(sender.clone(), client).await.ok();
2020-04-20 15:45:14 +00:00
// Client ends with a `Leave` event.
2020-09-13 11:41:18 +00:00
sender.send(Event::Leave(addr)).await.ok();
2020-04-01 19:40:27 +00:00
})
.detach();
}
})
}