Split "parsing" and "processing" phases of input stream

This commit is contained in:
Tangent Wantwight 2020-05-25 20:06:50 -04:00
parent 2f64708e4e
commit f83aab0c9e

View file

@ -56,7 +56,7 @@ async fn main() -> Result<()> {
} }
async fn handle_socket(websocket: WebSocket) -> Result<()> { async fn handle_socket(websocket: WebSocket) -> Result<()> {
let (sink, mut source) = websocket.split(); let (sink, source) = websocket.split();
let mut sink = sink.with(|msg: ServerMessage| { let mut sink = sink.with(|msg: ServerMessage| {
ready( ready(
@ -68,17 +68,23 @@ async fn handle_socket(websocket: WebSocket) -> Result<()> {
greet(&mut sink).await?; greet(&mut sink).await?;
let mut source = source.map_err(Into::into).try_filter_map(|msg| {
ready(match msg.to_str() {
Ok(json) => from_str::<ClientMessage>(json)
.context("Parsing JSON")
.map(Some),
Err(()) => {
debug!("Non-text message");
Ok(None)
}
})
});
let input_task = async { let input_task = async {
loop { loop {
match source.next().await { match source.next().await {
Some(Ok(msg)) => match msg.to_str() { Some(Ok(msg)) => debug!("Client message: {:?}", &msg),
Ok(json) => { Some(Err(error)) => return Err(error),
let msg = from_str::<ClientMessage>(json).context("Parsing JSON")?;
debug!("Client message: {:?}", &msg);
}
Err(()) => debug!("Non-text message"),
},
Some(Err(error)) => return Err(error.into()),
None => break Ok(()), None => break Ok(()),
} }
} }