Move greeting into its own function, tidy up error path.

This commit is contained in:
Tangent Wantwight 2020-05-24 20:53:30 -04:00
parent cf513ace26
commit 03e4512839
3 changed files with 43 additions and 35 deletions

View file

@ -4,10 +4,10 @@ extern crate log;
use anyhow::{Context, Error, Result}; use anyhow::{Context, Error, Result};
use future::ok; use future::ok;
use futures::prelude::*; use futures::prelude::*;
use net::{ClientMessage, Meta, ServerMessage}; use net::{ClientMessage, ServerMessage, server::greet};
use serde_json::{from_str, json, to_string, Value}; use serde_json::{from_str, to_string, Value};
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use stream::{iter, FuturesUnordered}; use stream::FuturesUnordered;
use structopt::StructOpt; use structopt::StructOpt;
use warp::{serve, ws, ws::Ws, Filter}; use warp::{serve, ws, ws::Ws, Filter};
use ws::{Message, WebSocket}; use ws::{Message, WebSocket};
@ -29,7 +29,11 @@ async fn main() -> Result<()> {
let args = Args::from_args(); let args = Args::from_args();
// dispatch websockets // dispatch websockets
let socket_handler = ws().map(|upgrade: Ws| upgrade.on_upgrade(handle_socket)); let socket_handler = ws().map(|upgrade: Ws| upgrade.on_upgrade(|ws| async {
if let Err(error) = handle_socket(ws).await {
warn!("Websocket connection lost: {:#}", error);
}
}));
let addrs = args let addrs = args
.listen .listen
@ -46,7 +50,7 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
async fn handle_socket(websocket: WebSocket) { async fn handle_socket(websocket: WebSocket) -> Result<()> {
let (sink, mut source) = websocket.split(); let (sink, mut source) = websocket.split();
let mut sink = sink.with(|msg: ServerMessage<Vec<Value>, Value>| { let mut sink = sink.with(|msg: ServerMessage<Vec<Value>, Value>| {
@ -54,43 +58,20 @@ async fn handle_socket(websocket: WebSocket) {
ok::<Message, Error>(Message::text(json)) ok::<Message, Error>(Message::text(json))
}); });
let mut greeting = iter(vec![ greet(&mut sink).await?;
ServerMessage::Meta {
m: Meta {
version: "Unstable",
helo: Some("Dedicated base2020 server".into()),
},
},
ServerMessage::SetState {
u: 0,
s: json!({}),
},
])
.map(Ok);
if let Err(err) = sink.send_all(&mut greeting).await { loop {
warn!("Websocket send error: {:#}", err);
return;
}
let error: Option<Error> = loop {
match source.next().await { match source.next().await {
Some(Ok(msg)) => match msg.to_str() { Some(Ok(msg)) => match msg.to_str() {
Ok(json) => { Ok(json) => {
match from_str::<ClientMessage<Value, Value>>(json).context("Parsing JSON") { let msg = from_str::<ClientMessage<Value, Value>>(json).context("Parsing JSON")?;
Ok(msg) => { debug!("Client message: {:?}", &msg);
debug!("Client message: {:?}", &msg);
}
Err(error) => break Some(error),
}
} }
Err(()) => debug!("Non-text message"), Err(()) => debug!("Non-text message"),
}, },
Some(Err(error)) => break Some(error.into()), Some(Err(error)) => return Err(error.into()),
None => break None, None => break,
} }
}; };
if let Some(error) = error { Ok(())
warn!("Websocket connection lost: {:#}", error);
}
} }

View file

@ -1,5 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub mod server;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(tag = "t")] #[serde(tag = "t")]
pub enum ClientMessage<I, S> { pub enum ClientMessage<I, S> {

25
src/net/server.rs Normal file
View file

@ -0,0 +1,25 @@
use anyhow::{Context, Error, Result};
use super::*;
use futures::prelude::*;
use serde_json::{Value, json};
use stream::iter;
pub async fn greet<T>(sink: &mut T) -> Result<()>
where T: Sink<ServerMessage<Vec<Value>, Value>, Error = Error> + Unpin
{
let mut greeting = iter(vec![
ServerMessage::Meta {
m: Meta {
version: "Unstable",
helo: Some("Dedicated base2020 server".into()),
},
},
ServerMessage::SetState {
u: 0,
s: json!({}),
},
])
.map(Ok);
sink.send_all(&mut greeting).await.context("Greeting client")
}