Abort the "send to client" task when the websocket is closed

This commit is contained in:
Tangent Wantwight 2020-06-03 00:50:12 -04:00
parent dacb031443
commit 9239c19cbe

View file

@ -4,6 +4,7 @@ use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
use stream::iter; use stream::iter;
use future::abortable;
/// There is a point at which a client falls far enough behind /// There is a point at which a client falls far enough behind
/// that it's probably not worth trying to catch them up; for now, /// that it's probably not worth trying to catch them up; for now,
@ -97,17 +98,25 @@ pub async fn run_client(
// register player // register player
let player_id = handle.server.lock().await.add_player(sender).await?; let player_id = handle.server.lock().await.add_player(sender).await?;
let output_task = receiver.map(Ok).forward(sink); let (output_task, output_handle) = abortable(receiver.map(Ok).forward(sink));
let input_task = async { let input_task = async {
loop { loop {
match source.try_next().await? { match source.try_next().await? {
Some(msg) => handle.server.lock().await.process_message(player_id, msg), Some(msg) => handle.server.lock().await.process_message(player_id, msg),
None => break Ok(()), None => break,
}
} }
}; };
let result = try_join(output_task, input_task).await.map(|_| ()); // stop the sink so it won't error
output_handle.abort();
Ok(())
};
// intentional aborting is not a reportable error
let output_task = output_task
.map(|result| result.unwrap_or(Ok(())));
let result = try_join(output_task, input_task).await.map(|((),())| ());
// deregister player, whether normally or due to error // deregister player, whether normally or due to error
handle.server.lock().await.remove_player(player_id); handle.server.lock().await.remove_player(player_id);