use super::*; use anyhow::{Context, Error, Result}; use future::{abortable, AbortHandle}; use futures::{channel::mpsc::*, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; use tokio::time::{interval, Instant}; /// There is a point at which a client falls far enough behind /// that it's probably not worth trying to catch them up; for now, /// implement this as a buffer size limit and disconnect a client if /// the cap is reached. More elegant solutions may be reached in the future. const CHANNEL_BUFFER: usize = 200; pub async fn greet(sink: &mut Sender, player_id: Option) -> Result<()> { let mut greeting = iter(vec![ ServerMessage::Meta { meta: Meta { version: "Unstable", helo: Some("Dedicated base2020 server".into()), }, }, ServerMessage::SetState { player_id, state: json!({}), }, ]) .map(Ok); sink.send_all(&mut greeting) .await .context("Greeting client") } pub struct PlayerState { sender: Sender, input: PlayerInput, } pub struct Server { players: Vec>, heartbeat_task: Option, } #[derive(Clone)] pub struct Handle { server: Arc>, } impl Server { pub fn create() -> Handle { Handle { server: Arc::new(Mutex::new(Server { players: Vec::new(), heartbeat_task: None, })), } } /// connect a client to this server /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. pub async fn add_player( &mut self, mut sender: Sender, handle: &Handle, ) -> Result { // TODO: limit total number of players // allot player ID let player_id = self .players .iter() .position(|slot| slot.is_none()) .unwrap_or_else(|| { self.players.push(None); self.players.len() - 1 }); // connect player greet(&mut sender, Some(player_id)).await?; self.players[player_id] = Some(PlayerState { sender, input: json!([]), }); info!("Client#{} connected", player_id); // ensure server task is running self.spinup(handle); Ok(player_id) } pub fn process_message(&mut self, player: PlayerId, msg: ClientMessage) { trace!("Client#{} message: {:?}", player, &msg); match msg { ClientMessage::Input { local_input } => { if let Some(Some(player)) = self.players.get_mut(player) { player.input = local_input; } } // for now, anybody can set the state ClientMessage::SetState { new_state } => self.broadcast(ServerMessage::SetState { player_id: None, state: new_state, }), ClientMessage::GetState { .. } => {} } } fn tick(&mut self, tick: Instant) { trace!("Tick {:?}", tick); let total_input = self .players .iter() .map(|player| match player { Some(PlayerState { input, .. }) => input.clone(), None => json!([]), }) .collect(); self.broadcast(ServerMessage::Input { total_input }); } fn broadcast(&mut self, msg: ServerMessage) { // iterate by index instead of iterator, because we need to call // remove_player(&mut self) in the error case for slot in 0..self.players.len() { if let Some(ref mut player) = self.players[slot] { let msg_for_player = Server::postprocess_message(slot, &msg); // don't poll ready; we give the channel enough buffer that an overflow indicates // the client has fallen hopelessly behind. if player.sender.start_send(msg_for_player).is_err() { info!("Client#{} fell behind", slot); self.remove_player(slot); } } } } /// given a reference to a ServerMessage, do any per-player "customization" that // may be called for, returning an owned instance fn postprocess_message(player: PlayerId, msg: &ServerMessage) -> ServerMessage { match msg { ServerMessage::SetState { player_id: _, state, } => ServerMessage::SetState { player_id: Some(player), state: state.clone(), }, msg => msg.clone(), } } pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { error!( "Tried to disconnect Client#{} but there was no record for them", player_id ); } // check if no players left if self.players.iter().all(Option::is_none) { self.shutdown(); } } /// Start the heartbeat task, if it's not running /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. fn spinup(&mut self, server_handle: &Handle) { if let None = self.heartbeat_task { info!("Starting heartbeat task"); // Take a reference to the server so the server tick task can access its state + send messages; // A strong reference is fine, because we manually abort (& drop) this task when // no more players are connected. let server_handle = server_handle.clone(); let (task, abort_handle) = abortable(async move { info!("Heartbeat task started"); let mut clock = interval(TICK_LENGTH); loop { let tick = clock.tick().await; server_handle.server.lock().await.tick(tick); } }); self.heartbeat_task = Some(abort_handle); tokio::spawn(task); } } /// Stop any active heartbeat task fn shutdown(&mut self) { if let Some(handle) = self.heartbeat_task.take() { info!("Stopping heartbeat task"); handle.abort(); } } } pub async fn run_client( handle: Handle, socket: &mut (impl Stream> + Sink + Send + Unpin), ) -> Result<()> { let (sender, mut receiver) = channel(CHANNEL_BUFFER); // register player let player_id = handle .server .lock() .await .add_player(sender, &handle) .await?; // main message loop let result: Result<()> = async { loop { tokio::select! { client_message = socket.next() => { match client_message { Some(msg) => handle.server.lock().await.process_message(player_id, msg?), None => break Ok(()), } }, Some(server_message) = receiver.next() => { socket.send(server_message).await? }, else => break Ok(()), } }}.await; // deregister player, whether normally or due to error handle.server.lock().await.remove_player(player_id); result }