Start breaking the handle-client task into its own module

This commit is contained in:
Tangent Wantwight 2021-02-13 15:00:11 -05:00
parent 2c4753e1b9
commit 3f6b55dcc4
4 changed files with 69 additions and 52 deletions

View file

@ -4,10 +4,6 @@ extern crate log;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use future::ready; use future::ready;
use futures::prelude::*; use futures::prelude::*;
use net::{
server::{run_client, Handle, Server},
ClientMessage, ServerMessage,
};
use rust_embed::RustEmbed; use rust_embed::RustEmbed;
use serde_json::{from_str, to_string}; use serde_json::{from_str, to_string};
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
@ -19,6 +15,12 @@ use warp::{
use ws::{Message, WebSocket}; use ws::{Message, WebSocket};
use path::Tail; use path::Tail;
use net::{
agent::run_client,
server::{Handle, Server},
ClientMessage, ServerMessage,
};
pub mod net; pub mod net;
#[derive(StructOpt)] #[derive(StructOpt)]

43
src/net/agent.rs Normal file
View file

@ -0,0 +1,43 @@
use anyhow::{Error, Result};
use futures::{channel::mpsc::*, prelude::*, Stream};
use super::server::Handle;
use super::*;
pub async fn run_client(
handle: Handle,
socket: &mut (impl Stream<Item = Result<ClientMessage, Error>>
+ Sink<ServerMessage, Error = Error>
+ Send
+ Unpin),
) -> Result<()> {
let (sender, mut receiver) = channel(CHANNEL_BUFFER);
// register player
let player_id = handle.lock().await.add_player(sender, &handle).await?;
// main message loop
let result: Result<()> = async {
loop {
tokio::select! {
client_message = socket.next() => {
match client_message {
Some(msg) =>
handle.lock().await.process_message(player_id, msg?),
None => break Ok(()),
}
},
Some(server_message) = receiver.next() => {
socket.send(server_message).await?
},
else => break Ok(()),
}
}
}
.await;
// deregister player, whether normally or due to error
handle.lock().await.remove_player(player_id);
result
}

View file

@ -2,8 +2,15 @@ use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::time::Duration; use std::time::Duration;
pub mod agent;
pub mod server; pub mod server;
/// There is a point at which a client falls far enough behind
/// that it's probably not worth trying to catch them up; for now,
/// implement this as a buffer size limit and disconnect a client if
/// the cap is reached. More elegant solutions may be reached in the future.
const CHANNEL_BUFFER: usize = 200;
/// Roughly 30 fps /// Roughly 30 fps
pub static TICK_LENGTH: Duration = Duration::from_millis(33); pub static TICK_LENGTH: Duration = Duration::from_millis(33);

View file

@ -1,17 +1,16 @@
use super::*; use anyhow::{Context, Result};
use anyhow::{Context, Error, Result};
use future::{abortable, AbortHandle}; use future::{abortable, AbortHandle};
use futures::{channel::mpsc::*, lock::Mutex, prelude::*}; use futures::{
channel::mpsc::*,
lock::{Mutex, MutexGuard},
prelude::*,
};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use stream::iter; use stream::iter;
use tokio::time::{interval, Instant}; use tokio::time::{interval, Instant};
/// There is a point at which a client falls far enough behind use super::*;
/// that it's probably not worth trying to catch them up; for now,
/// implement this as a buffer size limit and disconnect a client if
/// the cap is reached. More elegant solutions may be reached in the future.
const CHANNEL_BUFFER: usize = 200;
pub async fn greet(sink: &mut Sender<ServerMessage>, player_id: Option<PlayerId>) -> Result<()> { pub async fn greet(sink: &mut Sender<ServerMessage>, player_id: Option<PlayerId>) -> Result<()> {
let mut greeting = iter(vec![ let mut greeting = iter(vec![
@ -48,6 +47,12 @@ pub struct Handle {
server: Arc<Mutex<Server>>, server: Arc<Mutex<Server>>,
} }
impl Handle {
pub async fn lock<'a>(&'a self) -> MutexGuard<'a, Server> {
self.server.lock().await
}
}
impl Server { impl Server {
pub fn create() -> Handle { pub fn create() -> Handle {
Handle { Handle {
@ -202,43 +207,3 @@ impl Server {
} }
} }
} }
pub async fn run_client(
handle: Handle,
socket: &mut (impl Stream<Item = Result<ClientMessage, Error>>
+ Sink<ServerMessage, Error = Error>
+ Send
+ Unpin),
) -> Result<()> {
let (sender, mut receiver) = channel(CHANNEL_BUFFER);
// register player
let player_id = handle
.server
.lock()
.await
.add_player(sender, &handle)
.await?;
// main message loop
let result: Result<()> = async { loop {
tokio::select! {
client_message = socket.next() => {
match client_message {
Some(msg) =>
handle.server.lock().await.process_message(player_id, msg?),
None => break Ok(()),
}
},
Some(server_message) = receiver.next() => {
socket.send(server_message).await?
},
else => break Ok(()),
}
}}.await;
// deregister player, whether normally or due to error
handle.server.lock().await.remove_player(player_id);
result
}