diff --git a/src/chunk.rs b/src/chunk.rs index f4c7599..5a0dc2d 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,5 +1,6 @@ use futures::{Async, Stream}; use std::io::Cursor; +use std::mem; use std::sync::Arc; use webm::*; @@ -70,8 +71,16 @@ impl> AsRef<[u8]> for Chunk { } } +enum ChunkerState { + BuildingHeader(Cursor>), + // WIP ClusterHead & body buffer + BuildingCluster(Chunk, Cursor>), + EmittingClusterBody(Chunk) +} + pub struct WebmChunker { - stream: S + stream: S, + state: ChunkerState } impl<'a, S: Stream>> Stream for WebmChunker @@ -80,7 +89,41 @@ impl<'a, S: Stream>> Stream for WebmChunker type Error = S::Error; fn poll(&mut self) -> Result>, Self::Error> { - Ok(Async::NotReady) + loop { + let (return_value, next_state) = match self.state { + ChunkerState::BuildingHeader(ref mut buffer) => { + match self.stream.poll() { + Err(passthru) => return Err(passthru), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(WebmElement::Cluster))) => { + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + let chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; + ( + Ok(Async::Ready(Some(chunk))), + ChunkerState::BuildingCluster( + Chunk::>::new_cluster_head(0), + Cursor::new(Vec::new()) + ) + ) + }, + Ok(Async::Ready(Some(element @ _))) => { + encode_webm_element(&element, buffer); + continue; + } + } + }, + ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { + return Ok(Async::Ready(None)); + }, + ChunkerState::EmittingClusterBody(ref mut buffer) => { + return Ok(Async::Ready(None)); + } + }; + + self.state = next_state; + return return_value; + } } } @@ -91,7 +134,8 @@ pub trait WebmStream { impl<'a, T: Stream>> WebmStream for T { fn chunk_webm(self) -> WebmChunker { WebmChunker { - stream: self + stream: self, + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) } } }