Make control flow of chunker state machine slightly clearer.
This commit is contained in:
parent
2bdbe21a73
commit
409e6eb029
1 changed files with 53 additions and 52 deletions
105
src/chunk.rs
105
src/chunk.rs
|
@ -100,7 +100,10 @@ impl<'a, S: EbmlEventSource> Stream for WebmChunker<S>
|
||||||
|
|
||||||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
||||||
loop {
|
loop {
|
||||||
let (return_value, next_state) = match self.state {
|
let mut return_value = None;
|
||||||
|
let mut new_state = None;
|
||||||
|
|
||||||
|
match self.state {
|
||||||
ChunkerState::BuildingHeader(ref mut buffer) => {
|
ChunkerState::BuildingHeader(ref mut buffer) => {
|
||||||
match self.source.poll_event() {
|
match self.source.poll_event() {
|
||||||
Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
|
Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
|
||||||
|
@ -109,23 +112,22 @@ impl<'a, S: EbmlEventSource> Stream for WebmChunker<S>
|
||||||
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
|
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())};
|
let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())};
|
||||||
(
|
|
||||||
Ok(Async::Ready(Some(header_chunk))),
|
return_value = Some(Ok(Async::Ready(Some(header_chunk))));
|
||||||
ChunkerState::BuildingCluster(
|
new_state = Some(ChunkerState::BuildingCluster(
|
||||||
ClusterHead::new(0),
|
ClusterHead::new(0),
|
||||||
Cursor::new(Vec::new())
|
Cursor::new(Vec::new())
|
||||||
)
|
));
|
||||||
)
|
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(Some(WebmElement::Info))) => continue,
|
Ok(Async::Ready(Some(WebmElement::Info))) => {},
|
||||||
Ok(Async::Ready(Some(WebmElement::Void))) => continue,
|
Ok(Async::Ready(Some(WebmElement::Void))) => {},
|
||||||
Ok(Async::Ready(Some(element @ _))) => {
|
Ok(Async::Ready(Some(element))) => {
|
||||||
match encode_webm_element(element, buffer) {
|
match encode_webm_element(element, buffer) {
|
||||||
Ok(_) => continue,
|
Ok(_) => {},
|
||||||
Err(err) => (
|
Err(err) => {
|
||||||
Err(ChunkingError::IoError(err)),
|
return_value = Some(Err(ChunkingError::IoError(err)));
|
||||||
ChunkerState::End
|
new_state = Some(ChunkerState::End);
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,14 +139,12 @@ impl<'a, S: EbmlEventSource> Stream for WebmChunker<S>
|
||||||
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
|
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
(
|
|
||||||
Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))),
|
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))));
|
||||||
ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())
|
new_state = Some(ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()));
|
||||||
)
|
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => {
|
Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => {
|
||||||
cluster_head.update_timecode(timecode);
|
cluster_head.update_timecode(timecode);
|
||||||
continue;
|
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => {
|
Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => {
|
||||||
if (block.flags & 0b10000000) != 0 {
|
if (block.flags & 0b10000000) != 0 {
|
||||||
|
@ -153,59 +153,60 @@ impl<'a, S: EbmlEventSource> Stream for WebmChunker<S>
|
||||||
}
|
}
|
||||||
cluster_head.observe_simpleblock_timecode(block.timecode);
|
cluster_head.observe_simpleblock_timecode(block.timecode);
|
||||||
match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) {
|
match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) {
|
||||||
Ok(_) => continue,
|
Ok(_) => {},
|
||||||
Err(err) => (
|
Err(err) => {
|
||||||
Err(ChunkingError::IoError(err)),
|
return_value = Some(Err(ChunkingError::IoError(err)));
|
||||||
ChunkerState::End
|
new_state = Some(ChunkerState::End);
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(Some(WebmElement::Info))) => continue,
|
Ok(Async::Ready(Some(WebmElement::Info))) => {},
|
||||||
Ok(Async::Ready(Some(WebmElement::Void))) => continue,
|
Ok(Async::Ready(Some(WebmElement::Void))) => {},
|
||||||
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => continue,
|
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
|
||||||
Ok(Async::Ready(Some(element @ _))) => {
|
Ok(Async::Ready(Some(element))) => {
|
||||||
match encode_webm_element(element, buffer) {
|
match encode_webm_element(element, buffer) {
|
||||||
Ok(_) => continue,
|
Ok(_) => {},
|
||||||
Err(err) => (
|
Err(err) => {
|
||||||
Err(ChunkingError::IoError(err)),
|
return_value = Some(Err(ChunkingError::IoError(err)));
|
||||||
ChunkerState::End
|
new_state = Some(ChunkerState::End);
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
// flush final Cluster on end of stream
|
// flush final Cluster on end of stream
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
(
|
|
||||||
Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))),
|
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))));
|
||||||
ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())
|
new_state = Some(ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()));
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ChunkerState::EmittingClusterBody(ref mut buffer) => {
|
ChunkerState::EmittingClusterBody(ref mut buffer) => {
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
let liberated_buffer = mem::replace(buffer, Vec::new());
|
||||||
(
|
|
||||||
Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))),
|
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))));
|
||||||
ChunkerState::BuildingCluster(
|
new_state = Some(ChunkerState::BuildingCluster(
|
||||||
ClusterHead::new(0),
|
ClusterHead::new(0),
|
||||||
Cursor::new(Vec::new())
|
Cursor::new(Vec::new())
|
||||||
)
|
));
|
||||||
)
|
|
||||||
},
|
},
|
||||||
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
|
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
|
||||||
// flush final Cluster on end of stream
|
// flush final Cluster on end of stream
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
let liberated_buffer = mem::replace(buffer, Vec::new());
|
||||||
(
|
|
||||||
Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))),
|
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))));
|
||||||
ChunkerState::End
|
new_state = Some(ChunkerState::End);
|
||||||
)
|
|
||||||
},
|
},
|
||||||
ChunkerState::End => return Ok(Async::Ready(None))
|
ChunkerState::End => return Ok(Async::Ready(None))
|
||||||
};
|
};
|
||||||
|
|
||||||
self.state = next_state;
|
if let Some(new_state) = new_state {
|
||||||
return return_value;
|
self.state = new_state;
|
||||||
|
}
|
||||||
|
if let Some(return_value) = return_value {
|
||||||
|
return return_value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue