diff --git a/src/stream_parser.rs b/src/stream_parser.rs index de70643..a4a02fa 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -1,24 +1,14 @@ -use bytes::{ - Buf, - BufMut, - BytesMut -}; -use futures::{ - Async, - stream::Stream -}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures::{stream::Stream, Async}; -use crate::ebml::{ - EbmlEventSource, - FromEbml -}; +use crate::ebml::{EbmlEventSource, FromEbml}; use crate::error::WebmetroError; pub struct EbmlStreamingParser { stream: S, buffer: BytesMut, buffer_size_limit: Option, - last_read: usize + borrowed: Bytes, } impl EbmlStreamingParser { @@ -31,13 +21,17 @@ impl EbmlStreamingParser { } } -pub trait StreamEbml where Self: Sized + Stream, Self::Item: Buf { +pub trait StreamEbml +where + Self: Sized + Stream, + Self::Item: Buf, +{ fn parse_ebml(self) -> EbmlStreamingParser { EbmlStreamingParser { stream: self, buffer: BytesMut::new(), buffer_size_limit: None, - last_read: 0 + borrowed: Bytes::new(), } } } @@ -45,26 +39,23 @@ pub trait StreamEbml where Self: Sized + Stream, Self::Item: Buf { impl> StreamEbml for S {} impl> EbmlStreamingParser { - pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> { - // release buffer from previous event - self.buffer.advance(self.last_read); - self.last_read = 0; - + pub fn poll_event<'a, T: FromEbml<'a>>( + &'a mut self, + ) -> Result>, WebmetroError> { loop { - match T::check_space(&self.buffer) { - Ok(None) => { + match T::check_space(&self.buffer)? { + None => { // need to refill buffer, below - }, - other => return other.map_err(WebmetroError::from).and_then(move |_| { - match T::decode_element(&self.buffer) { - Err(err) => Err(err.into()), - Ok(None) => panic!("Buffer was supposed to have enough data to parse element, somehow did not."), - Ok(Some((element, element_size))) => { - self.last_read = element_size; - Ok(Async::Ready(Some(element))) - } - } - }) + } + Some(info) => { + let mut bytes = self.buffer.split_to(info.element_len).freeze(); + bytes.advance(info.body_offset); + self.borrowed = bytes; + return Ok(Async::Ready(Some(T::decode( + info.element_id, + &self.borrowed, + )?))); + } } if let Some(limit) = self.buffer_size_limit { @@ -73,19 +64,21 @@ impl> EbmlStreamingParser } } - match self.stream.poll() { - Ok(Async::Ready(Some(buf))) => { + match self.stream.poll()? { + Async::Ready(Some(buf)) => { self.buffer.reserve(buf.remaining()); self.buffer.put(buf); // ok can retry decoding now - }, - other => return other.map(|async_status| async_status.map(|_| None)) + } + other => return Ok(other.map(|_| None)), } } } } -impl> EbmlEventSource for EbmlStreamingParser { +impl> EbmlEventSource + for EbmlStreamingParser +{ type Error = WebmetroError; fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> {