Implement chunking Clusters, including flushing the last one

This commit is contained in:
Tangent 128 2017-10-04 01:49:41 -04:00
parent 9c9db1c505
commit 639dc50c35

View file

@ -76,7 +76,9 @@ enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// WIP ClusterHead & body buffer // WIP ClusterHead & body buffer
BuildingCluster(Chunk, Cursor<Vec<u8>>), BuildingCluster(Chunk, Cursor<Vec<u8>>),
EmittingClusterBody(Chunk) EmittingClusterBody(Vec<u8>),
EmittingFinalClusterBody(Vec<u8>),
End
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
@ -99,9 +101,9 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())};
( (
Ok(Async::Ready(Some(chunk))), Ok(Async::Ready(Some(header_chunk))),
ChunkerState::BuildingCluster( ChunkerState::BuildingCluster(
Chunk::<Vec<u8>>::new_cluster_head(0), Chunk::<Vec<u8>>::new_cluster_head(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
@ -115,11 +117,63 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
} }
}, },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
return Ok(Async::Ready(None)); match self.stream.poll() {
Err(passthru) => return Err(passthru),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let cluster_head_chunk = mem::replace(cluster_head, Chunk::<Vec<u8>>::new_cluster_head(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
(
Ok(Async::Ready(Some(cluster_head_chunk))),
ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())
)
},
Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => {
cluster_head.update_timecode(timecode);
continue;
},
Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => {
cluster_head.observe_simpleblock_timecode(block.timecode);
encode_webm_element(&WebmElement::SimpleBlock(*block), buffer);
continue;
},
Ok(Async::Ready(Some(WebmElement::Info))) => continue,
Ok(Async::Ready(Some(WebmElement::Void))) => continue,
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => continue,
Ok(Async::Ready(Some(element @ _))) => {
encode_webm_element(&element, buffer);
continue;
},
Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream
let cluster_head_chunk = mem::replace(cluster_head, Chunk::<Vec<u8>>::new_cluster_head(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
(
Ok(Async::Ready(Some(cluster_head_chunk))),
ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())
)
}
}
}, },
ChunkerState::EmittingClusterBody(ref mut buffer) => { ChunkerState::EmittingClusterBody(ref mut buffer) => {
return Ok(Async::Ready(None)); let liberated_buffer = mem::replace(buffer, Vec::new());
} (
Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))),
ChunkerState::BuildingCluster(
Chunk::<Vec<u8>>::new_cluster_head(0),
Cursor::new(Vec::new())
)
)
},
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new());
(
Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))),
ChunkerState::End
)
},
ChunkerState::End => return Ok(Async::Ready(None))
}; };
self.state = next_state; self.state = next_state;