From 972a88c35bde5ee15b359d01a4b057b7755b356f Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Fri, 6 Oct 2017 00:17:18 -0400 Subject: [PATCH] Handle errors in chunking code in some fashion --- src/bin/loop_server.rs | 6 +++++- src/chunk.rs | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/bin/loop_server.rs b/src/bin/loop_server.rs index 23a90f2..2d9a27e 100644 --- a/src/bin/loop_server.rs +++ b/src/bin/loop_server.rs @@ -4,7 +4,7 @@ extern crate lab_ebml; use futures::future::FutureResult; use futures::stream::{iter, Stream}; -use lab_ebml::chunk::{Chunk, WebmStream}; +use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError}; use lab_ebml::Schema; use lab_ebml::timecode_fixer::ChunkStream; use lab_ebml::webm::*; @@ -33,6 +33,10 @@ impl Service for WebmServer { .chunk_webm() .chain(iter(Webm.parse(SRC_FILE).into_iter().map(|x| Ok(x))).chunk_webm()) .fix_timecodes() + .map_err(|err| match err { + ChunkingError::IoError(io_err) => hyper::Error::Io(io_err), + ChunkingError::OtherError(otx_err) => otx_err + }) .boxed(); Response::new() .with_header(ContentType("video/webm".parse().unwrap())) diff --git a/src/chunk.rs b/src/chunk.rs index f85c1f0..f42c925 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -81,6 +81,11 @@ enum ChunkerState { End } +pub enum ChunkingError { + IoError(::std::io::Error), + OtherError(E) +} + pub struct WebmChunker { stream: S, state: ChunkerState @@ -89,14 +94,14 @@ pub struct WebmChunker { impl<'a, S: Stream>> Stream for WebmChunker { type Item = Chunk; - type Error = S::Error; + type Error = ChunkingError; fn poll(&mut self) -> Result>, Self::Error> { loop { let (return_value, next_state) = match self.state { ChunkerState::BuildingHeader(ref mut buffer) => { match self.stream.poll() { - Err(passthru) => return Err(passthru), + Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(WebmElement::Cluster))) => { @@ -111,14 +116,19 @@ impl<'a, S: Stream>> Stream for WebmChunker ) }, Ok(Async::Ready(Some(element @ _))) => { - encode_webm_element(&element, buffer); - continue; + match encode_webm_element(&element, buffer) { + Ok(_) => continue, + Err(err) => ( + Err(ChunkingError::IoError(err)), + ChunkerState::End + ) + } } } }, ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { match self.stream.poll() { - Err(passthru) => return Err(passthru), + Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(Some(WebmElement::Cluster))) => { let cluster_head_chunk = mem::replace(cluster_head, Chunk::>::new_cluster_head(0)); @@ -138,15 +148,25 @@ impl<'a, S: Stream>> Stream for WebmChunker cluster_head.mark_keyframe(true); } cluster_head.observe_simpleblock_timecode(block.timecode); - encode_webm_element(&WebmElement::SimpleBlock(*block), buffer); - continue; + match encode_webm_element(&WebmElement::SimpleBlock(*block), buffer) { + Ok(_) => continue, + Err(err) => ( + Err(ChunkingError::IoError(err)), + ChunkerState::End + ) + } }, Ok(Async::Ready(Some(WebmElement::Info))) => continue, Ok(Async::Ready(Some(WebmElement::Void))) => continue, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => continue, Ok(Async::Ready(Some(element @ _))) => { - encode_webm_element(&element, buffer); - continue; + match encode_webm_element(&element, buffer) { + Ok(_) => continue, + Err(err) => ( + Err(ChunkingError::IoError(err)), + ChunkerState::End + ) + } }, Ok(Async::Ready(None)) => { // flush final Cluster on end of stream