diff --git a/src/chunk.rs b/src/chunk.rs index 20b29cf..b8ce3f2 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -76,7 +76,9 @@ enum ChunkerState { BuildingHeader(Cursor>), // WIP ClusterHead & body buffer BuildingCluster(Chunk, Cursor>), - EmittingClusterBody(Chunk) + EmittingClusterBody(Vec), + EmittingFinalClusterBody(Vec), + End } pub struct WebmChunker { @@ -99,9 +101,9 @@ impl<'a, S: Stream>> Stream for WebmChunker Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(WebmElement::Cluster))) => { let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - let chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; + let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; ( - Ok(Async::Ready(Some(chunk))), + Ok(Async::Ready(Some(header_chunk))), ChunkerState::BuildingCluster( Chunk::>::new_cluster_head(0), Cursor::new(Vec::new()) @@ -115,11 +117,63 @@ impl<'a, S: Stream>> Stream for WebmChunker } }, ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { - return Ok(Async::Ready(None)); + match self.stream.poll() { + Err(passthru) => return Err(passthru), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(Some(WebmElement::Cluster))) => { + let cluster_head_chunk = mem::replace(cluster_head, Chunk::>::new_cluster_head(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + ( + Ok(Async::Ready(Some(cluster_head_chunk))), + ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()) + ) + }, + Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => { + cluster_head.update_timecode(timecode); + continue; + }, + Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => { + cluster_head.observe_simpleblock_timecode(block.timecode); + encode_webm_element(&WebmElement::SimpleBlock(*block), buffer); + continue; + }, + Ok(Async::Ready(Some(WebmElement::Info))) => continue, + Ok(Async::Ready(Some(WebmElement::Void))) => continue, + Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => continue, + Ok(Async::Ready(Some(element @ _))) => { + encode_webm_element(&element, buffer); + continue; + }, + Ok(Async::Ready(None)) => { + // flush final Cluster on end of stream + let cluster_head_chunk = mem::replace(cluster_head, Chunk::>::new_cluster_head(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + ( + Ok(Async::Ready(Some(cluster_head_chunk))), + ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()) + ) + } + } }, ChunkerState::EmittingClusterBody(ref mut buffer) => { - return Ok(Async::Ready(None)); - } + let liberated_buffer = mem::replace(buffer, Vec::new()); + ( + Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))), + ChunkerState::BuildingCluster( + Chunk::>::new_cluster_head(0), + Cursor::new(Vec::new()) + ) + ) + }, + ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { + // flush final Cluster on end of stream + let liberated_buffer = mem::replace(buffer, Vec::new()); + ( + Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))), + ChunkerState::End + ) + }, + ChunkerState::End => return Ok(Async::Ready(None)) }; self.state = next_state;