Handle errors in chunking code in some fashion

This commit is contained in:
Tangent 128 2017-10-06 00:17:18 -04:00
parent 59a179f9e1
commit 972a88c35b
2 changed files with 34 additions and 10 deletions

View file

@ -4,7 +4,7 @@ extern crate lab_ebml;
use futures::future::FutureResult;
use futures::stream::{iter, Stream};
use lab_ebml::chunk::{Chunk, WebmStream};
use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError};
use lab_ebml::Schema;
use lab_ebml::timecode_fixer::ChunkStream;
use lab_ebml::webm::*;
@ -33,6 +33,10 @@ impl Service for WebmServer {
.chunk_webm()
.chain(iter(Webm.parse(SRC_FILE).into_iter().map(|x| Ok(x))).chunk_webm())
.fix_timecodes()
.map_err(|err| match err {
ChunkingError::IoError(io_err) => hyper::Error::Io(io_err),
ChunkingError::OtherError(otx_err) => otx_err
})
.boxed();
Response::new()
.with_header(ContentType("video/webm".parse().unwrap()))

View file

@ -81,6 +81,11 @@ enum ChunkerState {
End
}
pub enum ChunkingError<E> {
IoError(::std::io::Error),
OtherError(E)
}
pub struct WebmChunker<S> {
stream: S,
state: ChunkerState
@ -89,14 +94,14 @@ pub struct WebmChunker<S> {
impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
{
type Item = Chunk;
type Error = S::Error;
type Error = ChunkingError<S::Error>;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
let (return_value, next_state) = match self.state {
ChunkerState::BuildingHeader(ref mut buffer) => {
match self.stream.poll() {
Err(passthru) => return Err(passthru),
Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
@ -111,14 +116,19 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
)
},
Ok(Async::Ready(Some(element @ _))) => {
encode_webm_element(&element, buffer);
continue;
match encode_webm_element(&element, buffer) {
Ok(_) => continue,
Err(err) => (
Err(ChunkingError::IoError(err)),
ChunkerState::End
)
}
}
}
},
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match self.stream.poll() {
Err(passthru) => return Err(passthru),
Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let cluster_head_chunk = mem::replace(cluster_head, Chunk::<Vec<u8>>::new_cluster_head(0));
@ -138,15 +148,25 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
cluster_head.mark_keyframe(true);
}
cluster_head.observe_simpleblock_timecode(block.timecode);
encode_webm_element(&WebmElement::SimpleBlock(*block), buffer);
continue;
match encode_webm_element(&WebmElement::SimpleBlock(*block), buffer) {
Ok(_) => continue,
Err(err) => (
Err(ChunkingError::IoError(err)),
ChunkerState::End
)
}
},
Ok(Async::Ready(Some(WebmElement::Info))) => continue,
Ok(Async::Ready(Some(WebmElement::Void))) => continue,
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => continue,
Ok(Async::Ready(Some(element @ _))) => {
encode_webm_element(&element, buffer);
continue;
match encode_webm_element(&element, buffer) {
Ok(_) => continue,
Err(err) => (
Err(ChunkingError::IoError(err)),
ChunkerState::End
)
}
},
Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream