base2020/src/net/server.rs

157 lines
4.4 KiB
Rust

use super::*;
use anyhow::{Context, Error, Result};
use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*};
use serde_json::json;
use std::sync::Arc;
use stream::iter;
use future::{AbortHandle, abortable};
/// There is a point at which a client falls far enough behind
/// that it's probably not worth trying to catch them up; for now,
/// implement this as a buffer size limit and disconnect a client if
/// the cap is reached. More elegant solutions may be reached in the future.
const CHANNEL_BUFFER: usize = 200;
pub async fn greet(sink: &mut Sender<ServerMessage>) -> Result<()> {
let mut greeting = iter(vec![
ServerMessage::Meta {
m: Meta {
version: "Unstable",
helo: Some("Dedicated base2020 server".into()),
},
},
ServerMessage::SetState {
u: Some(0),
s: json!({}),
},
])
.map(Ok);
sink.send_all(&mut greeting)
.await
.context("Greeting client")
}
pub struct PlayerState {
sender: Sender<ServerMessage>,
}
pub struct Server {
players: Vec<Option<PlayerState>>,
heartbeat_task: Option<AbortHandle>,
}
#[derive(Clone)]
pub struct Handle {
server: Arc<Mutex<Server>>,
}
impl Server {
pub fn create() -> Handle {
Handle {
server: Arc::new(Mutex::new(Server {
players: Vec::new(),
heartbeat_task: None,
})),
}
}
pub fn process_message(&mut self, player: usize, msg: ClientMessage) {
debug!("Client#{} message: {:?}", player, &msg);
}
pub async fn add_player(&mut self, mut sender: Sender<ServerMessage>) -> Result<PlayerId> {
// TODO: limit total number of players
// allot player ID
let player_id = self
.players
.iter()
.position(|slot| slot.is_none())
.unwrap_or_else(|| {
self.players.push(None);
self.players.len() - 1
});
// connect player
greet(&mut sender).await?;
self.players[player_id] = Some(PlayerState { sender });
info!("Client#{} connected", player_id);
// ensure server task is running
self.spinup();
Ok(player_id)
}
pub fn remove_player(&mut self, player_id: PlayerId) {
if player_id < self.players.len() && self.players[player_id].is_some() {
self.players[player_id] = None;
info!("Client#{} disconnected", player_id);
} else {
error!("Tried to disconnect Client#{} but there was no record for them", player_id);
}
// check if no players left
if self.players.iter().all(Option::is_none) {
self.shutdown();
}
}
/// Start the heartbeat task, if it's not running
fn spinup(&mut self) {
if let None = self.heartbeat_task {
info!("Starting heartbeat task");
let (task, handle) = abortable(async {
info!("Heartbeat task started");
});
self.heartbeat_task = Some(handle);
tokio::spawn(task);
}
}
/// Stop any active heartbeat task
fn shutdown(&mut self) {
if let Some(handle) = self.heartbeat_task.take() {
info!("Stopping heartbeat task");
handle.abort();
}
}
}
pub async fn run_client(
handle: Handle,
source: &mut (impl Stream<Item = Result<ClientMessage, Error>> + Send + Unpin),
sink: &mut (impl Sink<ServerMessage, Error = Error> + Send + Unpin),
) -> Result<()> {
let (sender, receiver) = channel(CHANNEL_BUFFER);
// register player
let player_id = handle.server.lock().await.add_player(sender).await?;
let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink));
let input_task = async {
loop {
match source.try_next().await? {
Some(msg) => handle.server.lock().await.process_message(player_id, msg),
None => break,
}
};
// stop the sink so it won't error
output_handle.abort();
Ok(())
};
// intentional aborting is not a reportable error
let output_task = output_task
.map(|result| result.unwrap_or(Ok(())));
let result = try_join(output_task, input_task).await.map(|((),())| ());
// deregister player, whether normally or due to error
handle.server.lock().await.remove_player(player_id);
result
}