#[macro_use] extern crate serde_json; use async_std::task; use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_tungstenite::tungstenite::protocol::Message; use futures::pin_mut; use futures::prelude::*; use futures::channel::mpsc::{unbounded, UnboundedSender}; use glob::glob; use handlebars::Handlebars; use log::*; use rodio::Source; use serde::Deserialize; use tide::{Body, Request, StatusCode}; use std::collections::HashMap; use std::fs::File; use std::io::BufReader; use std::sync::{Arc, Mutex}; use std::time::Duration; type Tx = UnboundedSender; #[derive(Debug)] struct WebsocketState { /** * Boolean for the timer to check in on periodically */ run_timer: bool, peers: HashMap, } type SafeState = Arc>; /** * Websocket events */ mod events { use serde::Deserialize; #[derive(Debug, Deserialize)] pub struct StartTimer { pub action: String, pub actor: String, pub time: u64, } #[derive(Debug, Deserialize)] pub struct Simple { pub action: String, pub actor: String, } } /** * Simple struct to deserialize some query parameters */ #[derive(Debug, Deserialize)] struct Query { admin: String, } async fn index(req: Request<()>) -> Result { let mut hb = Handlebars::new(); if let Err(failure) = hb.register_templates_directory(".hbs", "views") { error!("Failed to render: {:?}", failure); return Err(tide::Error::from_str(StatusCode::InternalServerError, "Could not render templates")) } let mut admin = false; if let Ok(query) = req.query::() { info!("admin: {:?}", query); admin = query.admin == "rtyler"; } let mut sounds = vec![]; for sound in glob("sounds/*.wav").expect("Failed to glob sounds") { if let Ok(sound) = sound { if let Some(name) = sound.as_path().file_stem() { // factory whistle is a special admin only sound ^_^ if name != "factorywhistle" { sounds.push(name.to_os_string().into_string().unwrap()); } } } } info!("sounds: {:?}", sounds); let view = hb.render("index", &json!({"sounds": sounds, "admin" : admin})) .expect("Failed to render"); let mut body = Body::from_string(view); body.set_mime("text/html"); Ok(body) } async fn play(req: Request<()>) -> tide::Result { if let Ok(sound) = req.param::("sound") { play_sound(sound); Ok(tide::Redirect::new("/").into()) } else { Err(tide::Error::from_str(StatusCode::NotFound, "Could not find sound")) } } fn play_sound(sound: String) { let device = rodio::default_output_device().unwrap(); let file = File::open(format!("sounds/{}.wav", sound)).unwrap(); let source = rodio::Decoder::new(BufReader::new(file)).unwrap(); if sound == "factorywhistle" { rodio::play_raw(&device, source.amplify(1.5).convert_samples()); } else if sound == "cheer" { rodio::play_raw(&device, source.amplify(0.7).convert_samples()); } else { rodio::play_raw(&device, source.convert_samples()); } } /** * This is stupid, but I want to make sure I can stop the timer at any point */ fn countdown(time_left: Duration, state: SafeState) { task::spawn(async move { task::sleep(Duration::from_secs(1)).await; let mut real_state = state.lock().unwrap(); if ! real_state.run_timer { return; } let time_left = time_left.as_secs() - 1; debug!("Countdown: {:?}", time_left); if time_left == 0 { play_sound("factorywhistle".to_string()); error!("Time is up!"); real_state.run_timer = false; let broadcast_recipients = real_state.peers .iter() .map(|(_, ws_sink)| ws_sink); for recp in broadcast_recipients { recp.unbounded_send(Message::Text(r#"{"action":"stop_timer","actor":"system"}"#.to_string())).unwrap(); } } else { countdown(Duration::from_secs(time_left), state.clone()); } }); } async fn handle_websocket(state: SafeState, raw_stream: TcpStream, addr: SocketAddr) { info!("Incoming TCP connection from: {}", addr); let ws_stream = async_tungstenite::accept_async(raw_stream) .await; if ws_stream.is_err() { error!("Failed to accept connection: {:?}", ws_stream); return; } let ws_stream = ws_stream.unwrap(); info!("WebSocket connection established: {}", addr); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); state.lock().unwrap().peers.insert(addr, tx); let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming .try_filter(|msg| { // Broadcasting a Close message from one client // will close the other clients. future::ready(!msg.is_close()) }) .try_for_each(|msg| { info!( "Received a message from {}: {}", addr, msg.to_text().unwrap() ); // Putting this in a block so that peers is dropped before we handle other messages // that need to lock our state { let peers = &state.lock().unwrap().peers; // We want to broadcast the message to everyone except ourselves. let broadcast_recipients = peers .iter() .filter(|(peer_addr, _)| peer_addr != &&addr) .map(|(_, ws_sink)| ws_sink); for recp in broadcast_recipients { recp.unbounded_send(msg.clone()).unwrap(); } } if let Ok(start) = serde_json::from_str::(msg.to_text().unwrap()) { let time_left = Duration::from_secs(start.time * 60); { state.lock().unwrap().run_timer = true; } countdown(time_left, state.clone()); } if let Ok(simple) = serde_json::from_str::(msg.to_text().unwrap()) { if simple.action == "stop_timer" { state.lock().unwrap().run_timer = false; } } future::ok(()) }); let receive_from_others = rx.map(Ok).forward(outgoing); pin_mut!(broadcast_incoming, receive_from_others); future::select(broadcast_incoming, receive_from_others).await; println!("{} disconnected", &addr); state.lock().unwrap().peers.remove(&addr); } #[async_std::main] async fn main() -> Result<(), tide::Error> { pretty_env_logger::init(); let mut app = tide::new(); app.at("/play/:sound").post(play); app.at("/").get(index); app.at("/assets").serve_dir("assets/"); task::spawn(async move { let addr = "0.0.0.0:9078"; let state = SafeState::new(Mutex::new(WebsocketState { run_timer: false, peers: HashMap::new(), })); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect("Failed to bind"); info!("Listening on: {}", addr); while let Ok((stream, addr)) = listener.accept().await { task::spawn(handle_websocket(state.clone(), stream, addr)); } }); Ok(app.listen("0.0.0.0:9077").await?) }