From c199e144350614d99184a54d5efdbb41546026e6 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Mon, 8 Jun 2020 00:02:15 -0400 Subject: [PATCH] Make the heartbeat task call the server's tick() method regularly. The plumbing to get the tick task a reference to a server handle is a bit ugly; some of the server methods should probably move to the handle impl instead. --- src/net/server.rs | 54 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/src/net/server.rs b/src/net/server.rs index 3e12687..3a0e6d3 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -1,10 +1,11 @@ use super::*; use anyhow::{Context, Error, Result}; +use future::{abortable, AbortHandle}; use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; -use future::{AbortHandle, abortable}; +use tokio::time::{interval, Instant}; /// There is a point at which a client falls far enough behind /// that it's probably not worth trying to catch them up; for now, @@ -56,11 +57,13 @@ impl Server { } } - pub fn process_message(&mut self, player: usize, msg: ClientMessage) { - debug!("Client#{} message: {:?}", player, &msg); - } - - pub async fn add_player(&mut self, mut sender: Sender) -> Result { + /// connect a client to this server + /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. + pub async fn add_player( + &mut self, + mut sender: Sender, + handle: &Handle, + ) -> Result { // TODO: limit total number of players // allot player ID @@ -79,17 +82,28 @@ impl Server { info!("Client#{} connected", player_id); // ensure server task is running - self.spinup(); + self.spinup(handle); Ok(player_id) } + pub fn process_message(&mut self, player: usize, msg: ClientMessage) { + trace!("Client#{} message: {:?}", player, &msg); + } + + fn tick(&mut self, tick: Instant) { + trace!("Tick {:?}", tick) + } + pub fn remove_player(&mut self, player_id: PlayerId) { if player_id < self.players.len() && self.players[player_id].is_some() { self.players[player_id] = None; info!("Client#{} disconnected", player_id); } else { - error!("Tried to disconnect Client#{} but there was no record for them", player_id); + error!( + "Tried to disconnect Client#{} but there was no record for them", + player_id + ); } // check if no players left @@ -99,13 +113,22 @@ impl Server { } /// Start the heartbeat task, if it's not running - fn spinup(&mut self) { + /// TODO: currently needs to be passed a handle to this server instance, which feels flakey. + fn spinup(&mut self, server_handle: &Handle) { if let None = self.heartbeat_task { info!("Starting heartbeat task"); - let (task, handle) = abortable(async { + // Take a reference to the server so the server tick task can access its state + send messages; + // A strong reference is fine, because we manually abort (& drop) this task when + // no more players are connected. + let server_handle = server_handle.clone(); + let (task, abort_handle) = abortable(async move { info!("Heartbeat task started"); + let mut ticks = interval(TICK_LENGTH); + while let Some(tick) = ticks.next().await { + server_handle.server.lock().await.tick(tick); + } }); - self.heartbeat_task = Some(handle); + self.heartbeat_task = Some(abort_handle); tokio::spawn(task); } } @@ -127,7 +150,7 @@ pub async fn run_client( let (sender, receiver) = channel(CHANNEL_BUFFER); // register player - let player_id = handle.server.lock().await.add_player(sender).await?; + let player_id = handle.server.lock().await.add_player(sender, &handle).await?; let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); @@ -137,17 +160,16 @@ pub async fn run_client( Some(msg) => handle.server.lock().await.process_message(player_id, msg), None => break, } - }; + } // stop the sink so it won't error output_handle.abort(); Ok(()) }; // intentional aborting is not a reportable error - let output_task = output_task - .map(|result| result.unwrap_or(Ok(()))); + let output_task = output_task.map(|result| result.unwrap_or(Ok(()))); - let result = try_join(output_task, input_task).await.map(|((),())| ()); + let result = try_join(output_task, input_task).await.map(|((), ())| ()); // deregister player, whether normally or due to error handle.server.lock().await.remove_player(player_id);