Make the heartbeat task call the server's tick() method regularly.

The plumbing to get the tick task a reference to a server handle is a bit ugly;
some of the server methods should probably move to the handle impl instead.
This commit is contained in:
Tangent Wantwight 2020-06-08 00:02:15 -04:00
parent 3f07748abd
commit c199e14435
1 changed files with 38 additions and 16 deletions

View File

@ -1,10 +1,11 @@
use super::*; use super::*;
use anyhow::{Context, Error, Result}; use anyhow::{Context, Error, Result};
use future::{abortable, AbortHandle};
use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use stream::iter; use stream::iter;
use future::{AbortHandle, abortable}; use tokio::time::{interval, Instant};
/// There is a point at which a client falls far enough behind /// There is a point at which a client falls far enough behind
/// that it's probably not worth trying to catch them up; for now, /// that it's probably not worth trying to catch them up; for now,
@ -56,11 +57,13 @@ impl Server {
} }
} }
pub fn process_message(&mut self, player: usize, msg: ClientMessage) { /// connect a client to this server
debug!("Client#{} message: {:?}", player, &msg); /// TODO: currently needs to be passed a handle to this server instance, which feels flakey.
} pub async fn add_player(
&mut self,
pub async fn add_player(&mut self, mut sender: Sender<ServerMessage>) -> Result<PlayerId> { mut sender: Sender<ServerMessage>,
handle: &Handle,
) -> Result<PlayerId> {
// TODO: limit total number of players // TODO: limit total number of players
// allot player ID // allot player ID
@ -79,17 +82,28 @@ impl Server {
info!("Client#{} connected", player_id); info!("Client#{} connected", player_id);
// ensure server task is running // ensure server task is running
self.spinup(); self.spinup(handle);
Ok(player_id) Ok(player_id)
} }
pub fn process_message(&mut self, player: usize, msg: ClientMessage) {
trace!("Client#{} message: {:?}", player, &msg);
}
fn tick(&mut self, tick: Instant) {
trace!("Tick {:?}", tick)
}
pub fn remove_player(&mut self, player_id: PlayerId) { pub fn remove_player(&mut self, player_id: PlayerId) {
if player_id < self.players.len() && self.players[player_id].is_some() { if player_id < self.players.len() && self.players[player_id].is_some() {
self.players[player_id] = None; self.players[player_id] = None;
info!("Client#{} disconnected", player_id); info!("Client#{} disconnected", player_id);
} else { } else {
error!("Tried to disconnect Client#{} but there was no record for them", player_id); error!(
"Tried to disconnect Client#{} but there was no record for them",
player_id
);
} }
// check if no players left // check if no players left
@ -99,13 +113,22 @@ impl Server {
} }
/// Start the heartbeat task, if it's not running /// Start the heartbeat task, if it's not running
fn spinup(&mut self) { /// TODO: currently needs to be passed a handle to this server instance, which feels flakey.
fn spinup(&mut self, server_handle: &Handle) {
if let None = self.heartbeat_task { if let None = self.heartbeat_task {
info!("Starting heartbeat task"); info!("Starting heartbeat task");
let (task, handle) = abortable(async { // Take a reference to the server so the server tick task can access its state + send messages;
// A strong reference is fine, because we manually abort (& drop) this task when
// no more players are connected.
let server_handle = server_handle.clone();
let (task, abort_handle) = abortable(async move {
info!("Heartbeat task started"); info!("Heartbeat task started");
let mut ticks = interval(TICK_LENGTH);
while let Some(tick) = ticks.next().await {
server_handle.server.lock().await.tick(tick);
}
}); });
self.heartbeat_task = Some(handle); self.heartbeat_task = Some(abort_handle);
tokio::spawn(task); tokio::spawn(task);
} }
} }
@ -127,7 +150,7 @@ pub async fn run_client(
let (sender, receiver) = channel(CHANNEL_BUFFER); let (sender, receiver) = channel(CHANNEL_BUFFER);
// register player // register player
let player_id = handle.server.lock().await.add_player(sender).await?; let player_id = handle.server.lock().await.add_player(sender, &handle).await?;
let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink));
@ -137,17 +160,16 @@ pub async fn run_client(
Some(msg) => handle.server.lock().await.process_message(player_id, msg), Some(msg) => handle.server.lock().await.process_message(player_id, msg),
None => break, None => break,
} }
}; }
// stop the sink so it won't error // stop the sink so it won't error
output_handle.abort(); output_handle.abort();
Ok(()) Ok(())
}; };
// intentional aborting is not a reportable error // intentional aborting is not a reportable error
let output_task = output_task let output_task = output_task.map(|result| result.unwrap_or(Ok(())));
.map(|result| result.unwrap_or(Ok(())));
let result = try_join(output_task, input_task).await.map(|((),())| ()); let result = try_join(output_task, input_task).await.map(|((), ())| ());
// deregister player, whether normally or due to error // deregister player, whether normally or due to error
handle.server.lock().await.remove_player(player_id); handle.server.lock().await.remove_player(player_id);