Switch to processing connections via unified message select-loop

This commit is contained in:
Tangent Wantwight 2020-07-09 22:02:30 -04:00
parent 9b6c54900c
commit f624148464

View file

@ -1,7 +1,7 @@
use super::*; use super::*;
use anyhow::{Context, Error, Result}; use anyhow::{Context, Error, Result};
use future::{abortable, AbortHandle}; use future::{abortable, AbortHandle};
use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use futures::{channel::mpsc::*, lock::Mutex, prelude::*};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use stream::iter; use stream::iter;
@ -105,7 +105,7 @@ impl Server {
player_id: None, player_id: None,
state: new_state, state: new_state,
}), }),
ClientMessage::GetState {..} => {} ClientMessage::GetState { .. } => {}
} }
} }
@ -204,10 +204,12 @@ impl Server {
pub async fn run_client( pub async fn run_client(
handle: Handle, handle: Handle,
socket: &mut (impl Stream<Item = Result<ClientMessage, Error>> + Sink<ServerMessage, Error = Error> + Send + Unpin), socket: &mut (impl Stream<Item = Result<ClientMessage, Error>>
+ Sink<ServerMessage, Error = Error>
+ Send
+ Unpin),
) -> Result<()> { ) -> Result<()> {
let (sink, mut source) = socket.split(); let (sender, mut receiver) = channel(CHANNEL_BUFFER);
let (sender, receiver) = channel(CHANNEL_BUFFER);
// register player // register player
let player_id = handle let player_id = handle
@ -217,24 +219,22 @@ pub async fn run_client(
.add_player(sender, &handle) .add_player(sender, &handle)
.await?; .await?;
let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink)); // main message loop
let result: Result<()> = async { loop {
let input_task = async { tokio::select! {
loop { client_message = socket.next() => {
match source.try_next().await? { match client_message {
Some(msg) => handle.server.lock().await.process_message(player_id, msg), Some(msg) =>
None => break, handle.server.lock().await.process_message(player_id, msg?),
} None => break Ok(()),
}
},
Some(server_message) = receiver.next() => {
socket.send(server_message).await?
},
else => break Ok(()),
} }
// stop the sink so it won't error }}.await;
output_handle.abort();
Ok(())
};
// intentional aborting is not a reportable error
let output_task = output_task.map(|result| result.unwrap_or(Ok(())));
let result = try_join(output_task, input_task).await.map(|((), ())| ());
// deregister player, whether normally or due to error // deregister player, whether normally or due to error
handle.server.lock().await.remove_player(player_id); handle.server.lock().await.remove_player(player_id);