move client logic into one async fn

This commit is contained in:
Tangent Wantwight 2020-05-25 22:17:44 -04:00
parent f83aab0c9e
commit 319f082d53
2 changed files with 36 additions and 23 deletions

View file

@ -1,10 +1,11 @@
// TODO: try doing without this every now and then, hopefully it's eventually unnecessary
#![type_length_limit="1272053"]
#[macro_use] #[macro_use]
extern crate log; extern crate log;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use future::ready; use future::ready;
use futures::prelude::*; use futures::prelude::*;
use futures::try_join;
use net::{server::greet, ClientMessage, ServerMessage}; use net::{server::greet, ClientMessage, ServerMessage};
use serde_json::{from_str, to_string}; use serde_json::{from_str, to_string};
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
@ -58,7 +59,7 @@ async fn main() -> Result<()> {
async fn handle_socket(websocket: WebSocket) -> Result<()> { async fn handle_socket(websocket: WebSocket) -> Result<()> {
let (sink, source) = websocket.split(); let (sink, source) = websocket.split();
let mut sink = sink.with(|msg: ServerMessage| { let sink = sink.with(|msg: ServerMessage| {
ready( ready(
to_string(&msg) to_string(&msg)
.context("JSON encoding shouldn't fail") .context("JSON encoding shouldn't fail")
@ -66,9 +67,7 @@ async fn handle_socket(websocket: WebSocket) -> Result<()> {
) )
}); });
greet(&mut sink).await?; let source = source.map_err(Into::into).try_filter_map(|msg| {
let mut source = source.map_err(Into::into).try_filter_map(|msg| {
ready(match msg.to_str() { ready(match msg.to_str() {
Ok(json) => from_str::<ClientMessage>(json) Ok(json) => from_str::<ClientMessage>(json)
.context("Parsing JSON") .context("Parsing JSON")
@ -80,14 +79,5 @@ async fn handle_socket(websocket: WebSocket) -> Result<()> {
}) })
}); });
let input_task = async { run_client(source, sink).await
loop {
match source.next().await {
Some(Ok(msg)) => debug!("Client message: {:?}", &msg),
Some(Err(error)) => return Err(error),
None => break Ok(()),
}
}
};
try_join!(input_task).map(|((),)| ())
} }

View file

@ -1,11 +1,13 @@
use anyhow::{Context, Error, Result};
use super::*; use super::*;
use futures::prelude::*; use anyhow::{Context, Error, Result};
use futures::{channel::mpsc::*, future::try_join, lock::Mutex, prelude::*};
use serde_json::json; use serde_json::json;
use std::sync::Arc;
use stream::iter; use stream::iter;
pub async fn greet<T>(sink: &mut T) -> Result<()> pub async fn greet<T>(sink: &mut T) -> Result<()>
where T: Sink<ServerMessage, Error = Error> + Unpin where
T: Sink<ServerMessage, Error = Error> + Unpin,
{ {
let mut greeting = iter(vec![ let mut greeting = iter(vec![
ServerMessage::Meta { ServerMessage::Meta {
@ -14,12 +16,33 @@ where T: Sink<ServerMessage, Error = Error> + Unpin
helo: Some("Dedicated base2020 server".into()), helo: Some("Dedicated base2020 server".into()),
}, },
}, },
ServerMessage::SetState { ServerMessage::SetState { u: 0, s: json!({}) },
u: 0,
s: json!({}),
},
]) ])
.map(Ok); .map(Ok);
sink.send_all(&mut greeting).await.context("Greeting client") sink.send_all(&mut greeting)
.await
.context("Greeting client")
}
pub async fn run_client<I, O>(mut source: I, mut sink: O) -> Result<()>
where
I: Stream<Item = Result<ClientMessage, Error>> + Unpin,
O: Sink<ServerMessage, Error = Error> + Unpin,
{
let output_task = async {
greet(&mut sink).await?;
Ok::<(), Error>(())
};
let input_task = async {
loop {
match source.next().await {
Some(Ok(msg)) => debug!("Client message: {:?}", &msg),
Some(Err(error)) => return Err(error),
None => break Ok(()),
}
}
};
try_join(output_task, input_task).await.map(|((), ())| ())
} }