diff --git a/src/chunk.rs b/src/chunk.rs index 09645f4..7ccdfa4 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -75,9 +75,13 @@ impl> AsRef<[u8]> for Chunk { #[derive(Debug)] enum ChunkerState { BuildingHeader(Cursor>), - // WIP ClusterHead & body buffer + // ClusterHead & body buffer BuildingCluster(ClusterHead, Cursor>), EmittingClusterBody(Vec), + EmittingClusterBodyBeforeNewHeader { + body: Vec, + new_header: Cursor> + }, EmittingFinalClusterBody(Vec), End } @@ -136,6 +140,26 @@ impl Stream for WebmChunker match self.source.poll_event() { Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) + | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { + let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + + let mut new_header_cursor = Cursor::new(Vec::new()); + match encode_webm_element(element, &mut new_header_cursor) { + Ok(_) => { + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); + new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ + body: liberated_buffer.into_inner(), + new_header: new_header_cursor + }); + }, + Err(err) => { + return_value = Some(Err(ChunkingError::IoError(err))); + new_state = Some(ChunkerState::End); + } + } + } Ok(Async::Ready(Some(WebmElement::Cluster))) => { let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); @@ -191,6 +215,13 @@ impl Stream for WebmChunker Cursor::new(Vec::new()) )); }, + ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { + let liberated_body = mem::replace(body, Vec::new()); + let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_body)})))); + new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); + }, ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { // flush final Cluster on end of stream let liberated_buffer = mem::replace(buffer, Vec::new());