diff --git a/src/chunk.rs b/src/chunk.rs index bbd5df4..7775b45 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -138,9 +138,6 @@ impl> + Unpin> Stream for Webm fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { let mut chunker = self.get_mut(); loop { - let mut return_value = None; - let mut new_state = None; - match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { match chunker.source.poll_event(cx) { @@ -151,20 +148,20 @@ impl> + Unpin> Stream for Webm let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; - return_value = Some(Ok(Async::Ready(Some(header_chunk)))); - new_state = Some(ChunkerState::BuildingCluster( + chunker.state = ChunkerState::BuildingCluster( ClusterHead::new(0), Cursor::new(Vec::new()) - )); + ); + return Ready(Some(Ok(header_chunk))); }, Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { - return_value = Some(Err(err)); - new_state = Some(ChunkerState::End); - }); + if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { + chunker.state = ChunkerState::End; + return Ready(Some(Err(err))); + } } } }, @@ -180,15 +177,15 @@ impl> + Unpin> Stream for Webm let mut new_header_cursor = Cursor::new(Vec::new()); match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { Ok(_) => { - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); - new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ + chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{ body: liberated_buffer.into_inner(), new_header: new_header_cursor - }); + }; + return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, Err(err) => { - return_value = Some(Err(err)); - new_state = Some(ChunkerState::End); + chunker.state = ChunkerState::End; + return Ready(Some(Err(err))); } } } @@ -196,8 +193,8 @@ impl> + Unpin> Stream for Webm let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); - new_state = Some(ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())); + chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()); + return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => { cluster_head.update_timecode(timecode); @@ -208,67 +205,55 @@ impl> + Unpin> Stream for Webm cluster_head.keyframe = true; } cluster_head.observe_simpleblock_timecode(block.timecode); - encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { - return_value = Some(Err(err)); - new_state = Some(ChunkerState::End); - }); + if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) { + chunker.state = ChunkerState::End; + return Ready(Some(Err(err))); + } }, Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { - return_value = Some(Err(err)); - new_state = Some(ChunkerState::End); - }); + if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { + chunker.state = ChunkerState::End; + return Ready(Some(Err(err))); + } }, Ok(Async::Ready(None)) => { // flush final Cluster on end of stream let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); - new_state = Some(ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())); + chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()); + return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); } } }, ChunkerState::EmittingClusterBody(ref mut buffer) => { let liberated_buffer = mem::replace(buffer, Vec::new()); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); - new_state = Some(ChunkerState::BuildingCluster( + chunker.state = ChunkerState::BuildingCluster( ClusterHead::new(0), Cursor::new(Vec::new()) - )); + ); + return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); }, ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { let liberated_body = mem::replace(body, Vec::new()); let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})))); - new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); + chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor); + return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)}))); }, ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { // flush final Cluster on end of stream let liberated_buffer = mem::replace(buffer, Vec::new()); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); - new_state = Some(ChunkerState::End); + chunker.state = ChunkerState::End; + return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); }, ChunkerState::End => return Ready(None) }; - - if let Some(new_state) = new_state { - chunker.state = new_state; - } - if let Some(return_value) = return_value { - return match return_value { - Ok(Async::Ready(Some(chunk))) => Ready(Some(Ok(chunk))), - Ok(Async::Ready(None)) => Ready(None), - Ok(Async::NotReady) => Pending, - Err(err) => Ready(Some(Err(err))), - }; - } } } }