From 319f082d53e4914242f4296ec0718e5930981c68 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Mon, 25 May 2020 22:17:44 -0400 Subject: [PATCH] move client logic into one async fn --- src/main.rs | 20 +++++--------------- src/net/server.rs | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6027f07..0ff6d44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ +// TODO: try doing without this every now and then, hopefully it's eventually unnecessary +#![type_length_limit="1272053"] #[macro_use] extern crate log; use anyhow::{Context, Result}; use future::ready; use futures::prelude::*; -use futures::try_join; use net::{server::greet, ClientMessage, ServerMessage}; use serde_json::{from_str, to_string}; use std::net::ToSocketAddrs; @@ -58,7 +59,7 @@ async fn main() -> Result<()> { async fn handle_socket(websocket: WebSocket) -> Result<()> { let (sink, source) = websocket.split(); - let mut sink = sink.with(|msg: ServerMessage| { + let sink = sink.with(|msg: ServerMessage| { ready( to_string(&msg) .context("JSON encoding shouldn't fail") @@ -66,9 +67,7 @@ async fn handle_socket(websocket: WebSocket) -> Result<()> { ) }); - greet(&mut sink).await?; - - let mut source = source.map_err(Into::into).try_filter_map(|msg| { + let source = source.map_err(Into::into).try_filter_map(|msg| { ready(match msg.to_str() { Ok(json) => from_str::(json) .context("Parsing JSON") @@ -80,14 +79,5 @@ async fn handle_socket(websocket: WebSocket) -> Result<()> { }) }); - let input_task = async { - loop { - match source.next().await { - Some(Ok(msg)) => debug!("Client message: {:?}", &msg), - Some(Err(error)) => return Err(error), - None => break Ok(()), - } - } - }; - try_join!(input_task).map(|((),)| ()) + run_client(source, sink).await } diff --git a/src/net/server.rs b/src/net/server.rs index 503c44e..4f048db 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -1,11 +1,13 @@ -use anyhow::{Context, Error, Result}; use super::*; -use futures::prelude::*; +use anyhow::{Context, Error, Result}; +use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*}; use serde_json::json; +use std::sync::Arc; use stream::iter; pub async fn greet(sink: &mut T) -> Result<()> -where T: Sink + Unpin +where + T: Sink + Unpin, { let mut greeting = iter(vec![ ServerMessage::Meta { @@ -14,12 +16,33 @@ where T: Sink + Unpin helo: Some("Dedicated base2020 server".into()), }, }, - ServerMessage::SetState { - u: 0, - s: json!({}), - }, + ServerMessage::SetState { u: 0, s: json!({}) }, ]) .map(Ok); - sink.send_all(&mut greeting).await.context("Greeting client") + sink.send_all(&mut greeting) + .await + .context("Greeting client") +} + +pub async fn run_client(mut source: I, mut sink: O) -> Result<()> +where + I: Stream> + Unpin, + O: Sink + Unpin, +{ + let output_task = async { + greet(&mut sink).await?; + Ok::<(), Error>(()) + }; + + let input_task = async { + loop { + match source.next().await { + Some(Ok(msg)) => debug!("Client message: {:?}", &msg), + Some(Err(error)) => return Err(error), + None => break Ok(()), + } + } + }; + try_join(output_task, input_task).await.map(|((), ())| ()) }