From 495ca0f55544415d19d18162146b58386fc69e9d Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sat, 31 Mar 2018 19:36:57 -0400 Subject: [PATCH] Stream buffer/parser experiment, lifetime issues are snarling though --- src/lib.rs | 1 + src/webm_stream.rs | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 src/webm_stream.rs diff --git a/src/lib.rs b/src/lib.rs index 635ee82..a3a7abe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ extern crate futures; pub mod chunk; pub mod ebml; mod iterator; +pub mod webm_stream; pub mod timecode_fixer; pub mod webm; diff --git a/src/webm_stream.rs b/src/webm_stream.rs new file mode 100644 index 0000000..5e0996d --- /dev/null +++ b/src/webm_stream.rs @@ -0,0 +1,93 @@ +use bytes::BytesMut; +use bytes::BufMut; +use futures::Async; +use futures::stream::Stream; + +use ebml::*; +use webm::*; + +pub enum ParsingError { + EbmlError(::ebml::Error), + OtherError(E) +} + +pub struct WebmStream { + stream: S, + buffer: BytesMut, + last_read: usize +} + +impl, S: Stream> WebmStream { + pub fn new(stream: S) -> Self { + WebmStream { + stream: stream, + buffer: BytesMut::new(), + last_read: 0 + } + } + + pub fn try_decode(&mut self) -> Result>, ParsingError> { + match WebmElement::decode_element(&self.buffer) { + Err(err) => return Err(ParsingError::EbmlError(err)), + Ok(None) => { + // need to refill buffer + return Ok(Async::NotReady); + }, + Ok(Some((element, element_size))) => { + self.last_read += element_size; + return Ok(Async::Ready(Some(element))) + } + }; + } + + pub fn can_decode(&mut self) -> bool { + match self.try_decode() { + Ok(Async::NotReady) => false, + _ => true + } + } + + pub fn poll_event2<'a>(&'a mut self) -> Result>>, ParsingError> { + // release buffer from previous event + self.buffer.advance(self.last_read); + self.last_read = 0; + + loop { + if self.can_decode() { + return self.try_decode() + } + + match self.stream.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(chunk))) => { + self.buffer.reserve(chunk.as_ref().len()); + self.buffer.put_slice(chunk.as_ref()); + // ok can retry decoding now + } + Err(err) => return Err(ParsingError::OtherError(err)) + }; + } + } +} + +/*fn umm<'a, I: AsRef<[u8]>, S: Stream>(webm_stream: &'a mut WebmStream) + -> Result>>, ParsingError> +{ + return webmStream.poll_event(); +}*/ + +/*impl<'a, I: AsRef<[u8]>, S: Stream> EbmlEventSource<'a> for WebmStream { + type Event = WebmElement<'a>; + type Error = ParsingError; + + fn poll_event(&'a mut self) -> Result>>, Self::Error> { + return self.poll_event2(); + } +}*/ + +#[cfg(test)] +mod tests { + //#[test] + +}