use bytes::BytesMut; use bytes::BufMut; use futures::Async; use futures::stream::Stream; use ebml::*; use webm::*; pub enum ParsingError { EbmlError(::ebml::Error), OtherError(E) } pub struct WebmStream { stream: S, buffer: BytesMut, last_read: usize } impl, S: Stream> WebmStream { pub fn new(stream: S) -> Self { WebmStream { stream: stream, buffer: BytesMut::new(), last_read: 0 } } pub fn try_decode(&mut self) -> Result>, ParsingError> { match WebmElement::decode_element(&self.buffer) { Err(err) => return Err(ParsingError::EbmlError(err)), Ok(None) => { // need to refill buffer return Ok(Async::NotReady); }, Ok(Some((element, element_size))) => { self.last_read += element_size; return Ok(Async::Ready(Some(element))) } }; } pub fn can_decode(&mut self) -> bool { match self.try_decode() { Ok(Async::NotReady) => false, _ => true } } pub fn poll_event<'a>(&'a mut self) -> Result>>, ParsingError> { // release buffer from previous event self.buffer.advance(self.last_read); self.last_read = 0; loop { if self.can_decode() { return self.try_decode() } match self.stream.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(chunk))) => { self.buffer.reserve(chunk.as_ref().len()); self.buffer.put_slice(chunk.as_ref()); // ok can retry decoding now } Err(err) => return Err(ParsingError::OtherError(err)) }; } } } impl, S: Stream> WebmEventSource for WebmStream { type Error = ParsingError; fn poll_event<'a>(&'a mut self) -> Result>>, Self::Error> { return WebmStream::poll_event(self); } } #[cfg(test)] mod tests { //#[test] }