From 3f6b55dcc4762df81090d436a64e852a892d5fd3 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Sat, 13 Feb 2021 15:00:11 -0500 Subject: [PATCH] Start breaking the handle-client task into its own module --- src/main.rs | 10 ++++---- src/net/agent.rs | 43 +++++++++++++++++++++++++++++++++ src/net/mod.rs | 7 ++++++ src/net/server.rs | 61 ++++++++++------------------------------------- 4 files changed, 69 insertions(+), 52 deletions(-) create mode 100644 src/net/agent.rs diff --git a/src/main.rs b/src/main.rs index 56b830b..48ae34b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,6 @@ extern crate log; use anyhow::{Context, Result}; use future::ready; use futures::prelude::*; -use net::{ - server::{run_client, Handle, Server}, - ClientMessage, ServerMessage, -}; use rust_embed::RustEmbed; use serde_json::{from_str, to_string}; use std::net::ToSocketAddrs; @@ -19,6 +15,12 @@ use warp::{ use ws::{Message, WebSocket}; use path::Tail; +use net::{ + agent::run_client, + server::{Handle, Server}, + ClientMessage, ServerMessage, +}; + pub mod net; #[derive(StructOpt)] diff --git a/src/net/agent.rs b/src/net/agent.rs new file mode 100644 index 0000000..ee261d3 --- /dev/null +++ b/src/net/agent.rs @@ -0,0 +1,43 @@ +use anyhow::{Error, Result}; +use futures::{channel::mpsc::*, prelude::*, Stream}; + +use super::server::Handle; +use super::*; + +pub async fn run_client( + handle: Handle, + socket: &mut (impl Stream> + + Sink + + Send + + Unpin), +) -> Result<()> { + let (sender, mut receiver) = channel(CHANNEL_BUFFER); + + // register player + let player_id = handle.lock().await.add_player(sender, &handle).await?; + + // main message loop + let result: Result<()> = async { + loop { + tokio::select! { + client_message = socket.next() => { + match client_message { + Some(msg) => + handle.lock().await.process_message(player_id, msg?), + None => break Ok(()), + } + }, + Some(server_message) = receiver.next() => { + socket.send(server_message).await? + }, + else => break Ok(()), + } + } + } + .await; + + // deregister player, whether normally or due to error + handle.lock().await.remove_player(player_id); + + result +} diff --git a/src/net/mod.rs b/src/net/mod.rs index db7e4f6..635c0fb 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2,8 +2,15 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::time::Duration; +pub mod agent; pub mod server; +/// There is a point at which a client falls far enough behind +/// that it's probably not worth trying to catch them up; for now, +/// implement this as a buffer size limit and disconnect a client if +/// the cap is reached. More elegant solutions may be reached in the future. +const CHANNEL_BUFFER: usize = 200; + /// Roughly 30 fps pub static TICK_LENGTH: Duration = Duration::from_millis(33); diff --git a/src/net/server.rs b/src/net/server.rs index b0edd57..5f72e48 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -1,17 +1,16 @@ -use super::*; -use anyhow::{Context, Error, Result}; +use anyhow::{Context, Result}; use future::{abortable, AbortHandle}; -use futures::{channel::mpsc::*, lock::Mutex, prelude::*}; +use futures::{ + channel::mpsc::*, + lock::{Mutex, MutexGuard}, + prelude::*, +}; use serde_json::json; use std::sync::Arc; use stream::iter; use tokio::time::{interval, Instant}; -/// There is a point at which a client falls far enough behind -/// that it's probably not worth trying to catch them up; for now, -/// implement this as a buffer size limit and disconnect a client if -/// the cap is reached. More elegant solutions may be reached in the future. -const CHANNEL_BUFFER: usize = 200; +use super::*; pub async fn greet(sink: &mut Sender, player_id: Option) -> Result<()> { let mut greeting = iter(vec![ @@ -48,6 +47,12 @@ pub struct Handle { server: Arc>, } +impl Handle { + pub async fn lock<'a>(&'a self) -> MutexGuard<'a, Server> { + self.server.lock().await + } +} + impl Server { pub fn create() -> Handle { Handle { @@ -202,43 +207,3 @@ impl Server { } } } - -pub async fn run_client( - handle: Handle, - socket: &mut (impl Stream> - + Sink - + Send - + Unpin), -) -> Result<()> { - let (sender, mut receiver) = channel(CHANNEL_BUFFER); - - // register player - let player_id = handle - .server - .lock() - .await - .add_player(sender, &handle) - .await?; - - // main message loop - let result: Result<()> = async { loop { - tokio::select! { - client_message = socket.next() => { - match client_message { - Some(msg) => - handle.server.lock().await.process_message(player_id, msg?), - None => break Ok(()), - } - }, - Some(server_message) = receiver.next() => { - socket.send(server_message).await? - }, - else => break Ok(()), - } - }}.await; - - // deregister player, whether normally or due to error - handle.server.lock().await.remove_player(player_id); - - result -}