Make EbmlStreamingParser operate on Buf items instead of AsRef<u8>

This commit is contained in:
Tangent 128 2018-10-21 19:09:37 -04:00
parent a848502103
commit 161c9de472
3 changed files with 30 additions and 24 deletions

View file

@ -1,13 +1,10 @@
use std::io::{ use std::io::stdin;
Error as IoError,
stdin,
Stdin
};
use futures::{ use bytes::{
prelude::*, Buf,
stream::MapErr IntoBuf
}; };
use futures::prelude::*;
use tokio_io::io::AllowStdIo; use tokio_io::io::AllowStdIo;
use tokio_codec::{ use tokio_codec::{
BytesCodec, BytesCodec,
@ -23,7 +20,8 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(IoError) -> WebmetroError> { pub fn stdin_stream() -> impl Stream<Item = impl Buf, Error = WebmetroError> {
FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
.map(|bytes| bytes.into_buf())
.map_err(WebmetroError::IoError) .map_err(WebmetroError::IoError)
} }

View file

@ -1,10 +1,11 @@
use std::error::Error;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{ use std::sync::{
Arc, Arc,
Mutex Mutex
}; };
use bytes::Bytes; use bytes::{Bytes, Buf};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures::{
Future, Future,
@ -62,7 +63,7 @@ impl RelayServer {
.map_err(|err| match err {}) .map_err(|err| match err {})
} }
fn post_stream(&self, stream: Body) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn post_stream(&self, stream: impl Stream<Item = impl Buf, Error = impl Error + Send + Sync + 'static>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
let source = stream let source = stream
.map_err(WebmetroError::from_err) .map_err(WebmetroError::from_err)
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)

View file

@ -1,10 +1,17 @@
use bytes::BytesMut; use bytes::{
use bytes::BufMut; Buf,
use futures::Async; BufMut,
use futures::stream::Stream; BytesMut
};
use futures::{
Async,
stream::Stream
};
use ebml::EbmlEventSource; use ebml::{
use ebml::FromEbml; EbmlEventSource,
FromEbml
};
use error::WebmetroError; use error::WebmetroError;
pub struct EbmlStreamingParser<S> { pub struct EbmlStreamingParser<S> {
@ -24,7 +31,7 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> { pub trait StreamEbml where Self: Sized + Stream, Self::Item: Buf {
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
stream: self, stream: self,
@ -35,9 +42,9 @@ pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> {
} }
} }
impl<I: AsRef<[u8]>, S: Stream<Item = I, Error = WebmetroError>> StreamEbml for S {} impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> StreamEbml for S {}
impl<I: AsRef<[u8]>, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S> { impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S> {
pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> { pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> {
// release buffer from previous event // release buffer from previous event
self.buffer.advance(self.last_read); self.buffer.advance(self.last_read);
@ -67,9 +74,9 @@ impl<I: AsRef<[u8]>, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingPa
} }
match self.stream.poll() { match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => { Ok(Async::Ready(Some(buf))) => {
self.buffer.reserve(chunk.as_ref().len()); self.buffer.reserve(buf.remaining());
self.buffer.put_slice(chunk.as_ref()); self.buffer.put(buf);
// ok can retry decoding now // ok can retry decoding now
}, },
other => return other.map(|async| async.map(|_| None)) other => return other.map(|async| async.map(|_| None))
@ -78,7 +85,7 @@ impl<I: AsRef<[u8]>, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingPa
} }
} }
impl<I: AsRef<[u8]>, S: Stream<Item = I, Error = WebmetroError>> EbmlEventSource for EbmlStreamingParser<S> { impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlEventSource for EbmlStreamingParser<S> {
type Error = WebmetroError; type Error = WebmetroError;
fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> { fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> {