From 03e45128395e6a6b221b6aa2063319f2f257b6fb Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Sun, 24 May 2020 20:53:30 -0400 Subject: [PATCH] Move greeting into its own function, tidy up error path. --- src/main.rs | 51 +++++++++++++++-------------------------------- src/net/mod.rs | 2 ++ src/net/server.rs | 25 +++++++++++++++++++++++ 3 files changed, 43 insertions(+), 35 deletions(-) create mode 100644 src/net/server.rs diff --git a/src/main.rs b/src/main.rs index 37e742b..57d8ee4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,10 @@ extern crate log; use anyhow::{Context, Error, Result}; use future::ok; use futures::prelude::*; -use net::{ClientMessage, Meta, ServerMessage}; -use serde_json::{from_str, json, to_string, Value}; +use net::{ClientMessage, ServerMessage, server::greet}; +use serde_json::{from_str, to_string, Value}; use std::net::ToSocketAddrs; -use stream::{iter, FuturesUnordered}; +use stream::FuturesUnordered; use structopt::StructOpt; use warp::{serve, ws, ws::Ws, Filter}; use ws::{Message, WebSocket}; @@ -29,7 +29,11 @@ async fn main() -> Result<()> { let args = Args::from_args(); // dispatch websockets - let socket_handler = ws().map(|upgrade: Ws| upgrade.on_upgrade(handle_socket)); + let socket_handler = ws().map(|upgrade: Ws| upgrade.on_upgrade(|ws| async { + if let Err(error) = handle_socket(ws).await { + warn!("Websocket connection lost: {:#}", error); + } + })); let addrs = args .listen @@ -46,7 +50,7 @@ async fn main() -> Result<()> { Ok(()) } -async fn handle_socket(websocket: WebSocket) { +async fn handle_socket(websocket: WebSocket) -> Result<()> { let (sink, mut source) = websocket.split(); let mut sink = sink.with(|msg: ServerMessage, Value>| { @@ -54,43 +58,20 @@ async fn handle_socket(websocket: WebSocket) { ok::(Message::text(json)) }); - let mut greeting = iter(vec![ - ServerMessage::Meta { - m: Meta { - version: "Unstable", - helo: Some("Dedicated base2020 server".into()), - }, - }, - ServerMessage::SetState { - u: 0, - s: json!({}), - }, - ]) - .map(Ok); + greet(&mut sink).await?; - if let Err(err) = sink.send_all(&mut greeting).await { - warn!("Websocket send error: {:#}", err); - return; - } - - let error: Option = loop { + loop { match source.next().await { Some(Ok(msg)) => match msg.to_str() { Ok(json) => { - match from_str::>(json).context("Parsing JSON") { - Ok(msg) => { - debug!("Client message: {:?}", &msg); - } - Err(error) => break Some(error), - } + let msg = from_str::>(json).context("Parsing JSON")?; + debug!("Client message: {:?}", &msg); } Err(()) => debug!("Non-text message"), }, - Some(Err(error)) => break Some(error.into()), - None => break None, + Some(Err(error)) => return Err(error.into()), + None => break, } }; - if let Some(error) = error { - warn!("Websocket connection lost: {:#}", error); - } + Ok(()) } diff --git a/src/net/mod.rs b/src/net/mod.rs index 4477556..cf4bdb3 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; +pub mod server; + #[derive(Deserialize, Debug)] #[serde(tag = "t")] pub enum ClientMessage { diff --git a/src/net/server.rs b/src/net/server.rs new file mode 100644 index 0000000..a8c2e4c --- /dev/null +++ b/src/net/server.rs @@ -0,0 +1,25 @@ +use anyhow::{Context, Error, Result}; +use super::*; +use futures::prelude::*; +use serde_json::{Value, json}; +use stream::iter; + +pub async fn greet(sink: &mut T) -> Result<()> +where T: Sink, Value>, Error = Error> + Unpin +{ + let mut greeting = iter(vec![ + ServerMessage::Meta { + m: Meta { + version: "Unstable", + helo: Some("Dedicated base2020 server".into()), + }, + }, + ServerMessage::SetState { + u: 0, + s: json!({}), + }, + ]) + .map(Ok); + + sink.send_all(&mut greeting).await.context("Greeting client") +}