diff --git a/src/main.rs b/src/main.rs index 97ac35f..ec67a8f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,11 +18,40 @@ use std::collections::HashMap; use std::fs::File; use std::io::BufReader; use std::sync::{Arc, Mutex}; - +use std::time::Duration; type Tx = UnboundedSender; -type PeerMap = Arc>>; +#[derive(Debug)] +struct WebsocketState { + /** + * Boolean for the timer to check in on periodically + */ + run_timer: bool, + peers: HashMap, +} + +type SafeState = Arc>; + +/** + * Websocket events + */ +mod events { + use serde::Deserialize; + + #[derive(Debug, Deserialize)] + pub struct StartTimer { + pub action: String, + pub actor: String, + pub time: u64, + } + + #[derive(Debug, Deserialize)] + pub struct Simple { + pub action: String, + pub actor: String, + } +} /** * Simple struct to deserialize some query parameters @@ -70,10 +99,7 @@ async fn index(req: Request<()>) -> Result { async fn play(req: Request<()>) -> tide::Result { if let Ok(sound) = req.param::("sound") { - let device = rodio::default_output_device().unwrap(); - let file = File::open(format!("sounds/{}.wav", sound)).unwrap(); - let source = rodio::Decoder::new(BufReader::new(file)).unwrap(); - rodio::play_raw(&device, source.convert_samples()); + play_sound(sound); Ok(tide::Redirect::new("/").into()) } else { @@ -81,7 +107,47 @@ async fn play(req: Request<()>) -> tide::Result { } } -async fn handle_websocket(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) { +fn play_sound(sound: String) { + let device = rodio::default_output_device().unwrap(); + let file = File::open(format!("sounds/{}.wav", sound)).unwrap(); + let source = rodio::Decoder::new(BufReader::new(file)).unwrap(); + rodio::play_raw(&device, source.convert_samples()); +} + +/** + * This is stupid, but I want to make sure I can stop the timer at any point + */ +fn countdown(time_left: Duration, state: SafeState) { + task::spawn(async move { + task::sleep(Duration::from_secs(1)).await; + + let mut real_state = state.lock().unwrap(); + if ! real_state.run_timer { + return; + } + let time_left = time_left.as_secs() - 1; + debug!("Countdown: {:?}", time_left); + + if time_left == 0 { + play_sound("factorywhistle".to_string()); + error!("Time is up!"); + + real_state.run_timer = false; + let broadcast_recipients = real_state.peers + .iter() + .map(|(_, ws_sink)| ws_sink); + + for recp in broadcast_recipients { + recp.unbounded_send(Message::Text(r#"{"action":"stop_timer","actor":"system"}"#.to_string())).unwrap(); + } + } + else { + countdown(Duration::from_secs(time_left), state.clone()); + } + }); +} + +async fn handle_websocket(state: SafeState, raw_stream: TcpStream, addr: SocketAddr) { println!("Incoming TCP connection from: {}", addr); let ws_stream = async_tungstenite::accept_async(raw_stream) @@ -91,7 +157,7 @@ async fn handle_websocket(peer_map: PeerMap, raw_stream: TcpStream, addr: Socket // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); - peer_map.lock().unwrap().insert(addr, tx); + state.lock().unwrap().peers.insert(addr, tx); let (outgoing, incoming) = ws_stream.split(); @@ -107,20 +173,35 @@ async fn handle_websocket(peer_map: PeerMap, raw_stream: TcpStream, addr: Socket addr, msg.to_text().unwrap() ); - let peers = peer_map.lock().unwrap(); - if let Ok(json) = serde_json::from_str::(msg.to_text().unwrap()) { - info!("JSON: {:?}", json); + // Putting this in a block so that peers is dropped before we handle other messages + // that need to lock our state + { + let peers = &state.lock().unwrap().peers; + + + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&addr) + .map(|(_, ws_sink)| ws_sink); + + for recp in broadcast_recipients { + recp.unbounded_send(msg.clone()).unwrap(); + } } - // We want to broadcast the message to everyone except ourselves. - let broadcast_recipients = peers - .iter() - .filter(|(peer_addr, _)| peer_addr != &&addr) - .map(|(_, ws_sink)| ws_sink); - - for recp in broadcast_recipients { - recp.unbounded_send(msg.clone()).unwrap(); + if let Ok(start) = serde_json::from_str::(msg.to_text().unwrap()) { + let time_left = Duration::from_secs(start.time * 60); + { + state.lock().unwrap().run_timer = true; + } + countdown(time_left, state.clone()); + } + if let Ok(simple) = serde_json::from_str::(msg.to_text().unwrap()) { + if simple.action == "stop_timer" { + state.lock().unwrap().run_timer = false; + } } future::ok(()) @@ -132,7 +213,7 @@ async fn handle_websocket(peer_map: PeerMap, raw_stream: TcpStream, addr: Socket future::select(broadcast_incoming, receive_from_others).await; println!("{} disconnected", &addr); - peer_map.lock().unwrap().remove(&addr); + state.lock().unwrap().peers.remove(&addr); } #[async_std::main] @@ -146,7 +227,10 @@ async fn main() -> Result<(), tide::Error> { task::spawn(async move { let addr = "0.0.0.0:9078"; - let state = PeerMap::new(Mutex::new(HashMap::new())); + let state = SafeState::new(Mutex::new(WebsocketState { + run_timer: false, + peers: HashMap::new(), + })); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect("Failed to bind"); diff --git a/views/index.hbs b/views/index.hbs index 95d9ecc..8b3b5cd 100644 --- a/views/index.hbs +++ b/views/index.hbs @@ -67,7 +67,7 @@ window.socket = socket; socket.onopen = (event) => { - console.log('Websocket connected! 😺') + console.log('Websocket connected!') socket.send(JSON.stringify({ action: "connected", show: true, @@ -75,7 +75,7 @@ })); } socket.onerror = (err) => { - console.error('😿 Websocket encountered an error:', err) + console.error('Websocket encountered an error:', err) socket.close() } socket.onclose = (event) => { @@ -107,7 +107,6 @@ m.appendChild(document.createTextNode(message)); messages.insertBefore(m, messages.firstChild); } - --> @@ -175,6 +174,7 @@
+
{{else}} {{/if}}