From 161c9de4722a97a6c37f8f66adbd7b8d7453cf54 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sun, 21 Oct 2018 19:09:37 -0400 Subject: [PATCH] Make EbmlStreamingParser operate on Buf items instead of AsRef --- src/commands/mod.rs | 16 +++++++--------- src/commands/relay.rs | 5 +++-- src/stream_parser.rs | 33 ++++++++++++++++++++------------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 7e1abfd..e6decf5 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,13 +1,10 @@ -use std::io::{ - Error as IoError, - stdin, - Stdin -}; +use std::io::stdin; -use futures::{ - prelude::*, - stream::MapErr +use bytes::{ + Buf, + IntoBuf }; +use futures::prelude::*; use tokio_io::io::AllowStdIo; use tokio_codec::{ BytesCodec, @@ -23,7 +20,8 @@ pub mod send; /// An adapter that makes chunks of bytes from stdin available as a Stream; /// is NOT actually async, and just uses blocking read. Don't use more than /// one at once, who knows who gets which bytes. -pub fn stdin_stream() -> MapErr, BytesCodec>, fn(IoError) -> WebmetroError> { +pub fn stdin_stream() -> impl Stream { FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) + .map(|bytes| bytes.into_buf()) .map_err(WebmetroError::IoError) } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 6257afb..a2acc65 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -1,10 +1,11 @@ +use std::error::Error; use std::net::ToSocketAddrs; use std::sync::{ Arc, Mutex }; -use bytes::Bytes; +use bytes::{Bytes, Buf}; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::{ Future, @@ -62,7 +63,7 @@ impl RelayServer { .map_err(|err| match err {}) } - fn post_stream(&self, stream: Body) -> impl Stream { + fn post_stream(&self, stream: impl Stream) -> impl Stream { let source = stream .map_err(WebmetroError::from_err) .parse_ebml().with_soft_limit(BUFFER_LIMIT) diff --git a/src/stream_parser.rs b/src/stream_parser.rs index 0d3c9cb..4dcf2fe 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -1,10 +1,17 @@ -use bytes::BytesMut; -use bytes::BufMut; -use futures::Async; -use futures::stream::Stream; +use bytes::{ + Buf, + BufMut, + BytesMut +}; +use futures::{ + Async, + stream::Stream +}; -use ebml::EbmlEventSource; -use ebml::FromEbml; +use ebml::{ + EbmlEventSource, + FromEbml +}; use error::WebmetroError; pub struct EbmlStreamingParser { @@ -24,7 +31,7 @@ impl EbmlStreamingParser { } } -pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> { +pub trait StreamEbml where Self: Sized + Stream, Self::Item: Buf { fn parse_ebml(self) -> EbmlStreamingParser { EbmlStreamingParser { stream: self, @@ -35,9 +42,9 @@ pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> { } } -impl, S: Stream> StreamEbml for S {} +impl> StreamEbml for S {} -impl, S: Stream> EbmlStreamingParser { +impl> EbmlStreamingParser { pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> { // release buffer from previous event self.buffer.advance(self.last_read); @@ -67,9 +74,9 @@ impl, S: Stream> EbmlStreamingPa } match self.stream.poll() { - Ok(Async::Ready(Some(chunk))) => { - self.buffer.reserve(chunk.as_ref().len()); - self.buffer.put_slice(chunk.as_ref()); + Ok(Async::Ready(Some(buf))) => { + self.buffer.reserve(buf.remaining()); + self.buffer.put(buf); // ok can retry decoding now }, other => return other.map(|async| async.map(|_| None)) @@ -78,7 +85,7 @@ impl, S: Stream> EbmlStreamingPa } } -impl, S: Stream> EbmlEventSource for EbmlStreamingParser { +impl> EbmlEventSource for EbmlStreamingParser { type Error = WebmetroError; fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> {