use anyhow::{Context, Result}; use future::{abortable, AbortHandle}; use futures::{ channel::mpsc::*, lock::{Mutex, MutexGuard}, prelude::*, }; use serde_json::json; use std::sync::Arc; use stream::iter; use tokio::time::{interval, Instant}; use super::*; pub async fn greet(sink: &mut Sender, player_id: Option) -> Result<()> { let mut greeting = iter(vec![ ServerMessage::Meta { meta: Meta { version: "Unstable", helo: Some("Dedicated base2020 server".into()), }, }, ServerMessage::SetState { player_id, state: json!({}), }, ]) .map(Ok); sink.send_all(&mut greeting) .await .context("Greeting client") } pub struct PlayerState { sender: Sender, input: PlayerInput, } pub struct Server { players: Vec>, heartbeat_task: Option, } #[derive(Clone)] pub struct Handle { server: Arc>, } impl Handle { pub async fn lock<'a>(&'a self) -> MutexGuard<'a, Server> { self.server.lock().await } } impl Server { pub fn create() -> Handle { Handle { server: Arc::new(Mutex::new(Server { players: Vec::new(), heartbeat_task: None, })), } } /// connect a client to this server /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. pub async fn add_player( &mut self, mut sender: Sender, handle: &Handle, ) -> Result { // TODO: limit total number of players // allot player ID let player_id = self .players .iter() .position(|slot| slot.is_none()) .unwrap_or_else(|| { self.players.push(None); self.players.len() - 1 }); // connect player greet(&mut sender, Some(player_id)).await?; self.players[player_id] = Some(PlayerState { sender, input: json!([]), }); info!("Client#{} connected", player_id); // ensure server task is running self.spinup(handle); Ok(player_id) } pub fn input(&mut self, player: PlayerId, local_input: Value) { if let Some(Some(player)) = self.players.get_mut(player) { player.input = local_input; } } pub fn set_state(&mut self, new_state: Value) { self.broadcast(ServerMessage::SetState { player_id: None, state: new_state, }) } fn tick(&mut self, tick: Instant) { trace!("Tick {:?}", tick); let total_input = self .players .iter() .map(|player| match player { Some(PlayerState { input, .. }) => input.clone(), None => json!([]), }) .collect(); self.broadcast(ServerMessage::Input { total_input }); } fn broadcast(&mut self, msg: ServerMessage) { // iterate by index instead of iterator, because we need to call // remove_player(&mut self) in the error case for slot in 0..self.players.len() { if let Some(ref mut player) = self.players[slot] { let msg_for_player = Server::postprocess_message(slot, &msg); // don't poll ready; we give the channel enough buffer that an overflow indicates // the client has fallen hopelessly behind. if player.sender.start_send(msg_for_player).is_err() { info!("Client#{} fell behind", slot); self.remove_player(slot); } } } } /// given a reference to a ServerMessage, do any per-player "customization" that // may be called for, returning an owned instance fn postprocess_message(player: PlayerId, msg: &ServerMessage) -> ServerMessage { match msg { ServerMessage::SetState { player_id: _, state, } => ServerMessage::SetState { player_id: Some(player), state: state.clone(), }, msg => msg.clone(), } } pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { error!( "Tried to disconnect Client#{} but there was no record for them", player_id ); } // check if no players left if self.players.iter().all(Option::is_none) { self.shutdown(); } } /// Start the heartbeat task, if it's not running /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. fn spinup(&mut self, server_handle: &Handle) { if let None = self.heartbeat_task { info!("Starting heartbeat task"); // Take a reference to the server so the server tick task can access its state + send messages; // A strong reference is fine, because we manually abort (& drop) this task when // no more players are connected. let server_handle = server_handle.clone(); let (task, abort_handle) = abortable(async move { info!("Heartbeat task started"); let mut clock = interval(TICK_LENGTH); loop { let tick = clock.tick().await; server_handle.server.lock().await.tick(tick); } }); self.heartbeat_task = Some(abort_handle); tokio::spawn(task); } } /// Stop any active heartbeat task fn shutdown(&mut self) { if let Some(handle) = self.heartbeat_task.take() { info!("Stopping heartbeat task"); handle.abort(); } } }