diff --git a/src/net/server.rs b/src/net/server.rs index 0db53b9..204e2a1 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -1,7 +1,7 @@ use super::*; use anyhow::{Context, Error, Result}; use future::{abortable, AbortHandle}; -use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; +use futures::{channel::mpsc::*, lock::Mutex, prelude::*}; use serde_json::json; use std::sync::Arc; use stream::iter; @@ -105,7 +105,7 @@ impl Server { player_id: None, state: new_state, }), - ClientMessage::GetState {..} => {} + ClientMessage::GetState { .. } => {} } } @@ -204,10 +204,12 @@ impl Server { pub async fn run_client( handle: Handle, - socket: &mut (impl Stream> + Sink + Send + Unpin), + socket: &mut (impl Stream> + + Sink + + Send + + Unpin), ) -> Result<()> { - let (sink, mut source) = socket.split(); - let (sender, receiver) = channel(CHANNEL_BUFFER); + let (sender, mut receiver) = channel(CHANNEL_BUFFER); // register player let player_id = handle @@ -217,24 +219,22 @@ pub async fn run_client( .add_player(sender, &handle) .await?; - let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); - - let input_task = async { - loop { - match source.try_next().await? { - Some(msg) => handle.server.lock().await.process_message(player_id, msg), - None => break, - } + // main message loop + let result: Result<()> = async { loop { + tokio::select! { + client_message = socket.next() => { + match client_message { + Some(msg) => + handle.server.lock().await.process_message(player_id, msg?), + None => break Ok(()), + } + }, + Some(server_message) = receiver.next() => { + socket.send(server_message).await? + }, + else => break Ok(()), } - // stop the sink so it won't error - output_handle.abort(); - Ok(()) - }; - - // intentional aborting is not a reportable error - let output_task = output_task.map(|result| result.unwrap_or(Ok(()))); - - let result = try_join(output_task, input_task).await.map(|((), ())| ()); + }}.await; // deregister player, whether normally or due to error handle.server.lock().await.remove_player(player_id);