diff --git a/src/chunk.rs b/src/chunk.rs index 69dca20..f3a9323 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,3 +1,6 @@ +use crate::error::WebmetroError; +use crate::stream_parser::EbmlStreamingParser; +use crate::webm::*; use bytes::{Buf, Bytes, BytesMut}; use futures::prelude::*; use std::{ @@ -6,9 +9,6 @@ use std::{ pin::Pin, task::{Context, Poll, Poll::*}, }; -use crate::stream_parser::EbmlStreamingParser; -use crate::error::WebmetroError; -use crate::webm::*; #[derive(Clone, Debug)] pub struct ClusterHead { @@ -35,7 +35,7 @@ impl ClusterHead { let delta = self.end - self.start; self.start = timecode; self.end = self.start + delta; - let mut buffer = [0;15]; + let mut buffer = [0; 15]; let mut cursor = Cursor::new(buffer.as_mut()); // buffer is sized so these should never fail encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); @@ -55,16 +55,8 @@ impl ClusterHead { /// A chunk of WebM data #[derive(Clone, Debug)] pub enum Chunk { - Headers { - bytes: Bytes - }, + Headers { bytes: Bytes }, Cluster(ClusterHead, Bytes), - // for iteration only - #[doc(hidden)] - RemainingBody(Bytes), - // for iteration only - #[doc(hidden)] - Empty } impl Chunk { @@ -76,29 +68,36 @@ impl Chunk { } } -// TODO: make an external iterator type so we can remove Chunk::RemainingBody & Chunk::Empty -impl Iterator for Chunk { +impl IntoIterator for Chunk { + type Item = Bytes; + type IntoIter = Iter; + + fn into_iter(self) -> Self::IntoIter { + match self { + Chunk::Headers { bytes } => Iter::Buffer(bytes), + Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes), + } + } +} + +pub enum Iter { + Cluster(ClusterHead, Bytes), + Buffer(Bytes), + Empty, +} + +impl Iterator for Iter { type Item = Bytes; fn next(&mut self) -> Option { - match self { - Chunk::Headers {ref mut bytes, ..} => { - let bytes = mem::replace(bytes, Bytes::new()); - *self = Chunk::Empty; - Some(bytes) - }, - Chunk::Cluster(ClusterHead {bytes, ..}, body) => { - let bytes = mem::replace(bytes, BytesMut::new()); - let body = mem::replace(body, Bytes::new()); - *self = Chunk::RemainingBody(body); - Some(bytes.freeze()) - }, - Chunk::RemainingBody(bytes) => { - let bytes = mem::replace(bytes, Bytes::new()); - *self = Chunk::Empty; - Some(bytes) - }, - Chunk::Empty => None + let iter = mem::replace(self, Iter::Empty); + match iter { + Iter::Cluster(ClusterHead { bytes: head, .. }, body) => { + *self = Iter::Buffer(body); + Some(head.freeze()) + } + Iter::Buffer(bytes) => Some(bytes), + Iter::Empty => None, } } } @@ -108,7 +107,7 @@ enum ChunkerState { BuildingHeader(Cursor>), // ClusterHead & body buffer BuildingCluster(ClusterHead, Cursor>), - End + End, } pub struct WebmChunker { @@ -127,7 +126,11 @@ impl WebmChunker { } } -fn encode(element: WebmElement, buffer: &mut Cursor>, limit: Option) -> Result<(), WebmetroError> { +fn encode( + element: WebmElement, + buffer: &mut Cursor>, + limit: Option, +) -> Result<(), WebmetroError> { if let Some(limit) = limit { if limit <= buffer.get_ref().len() { return Err(WebmetroError::ResourcesExceeded); @@ -143,7 +146,10 @@ where { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { let mut chunker = self.get_mut(); loop { match chunker.state { @@ -154,89 +160,117 @@ where Ready(None) => return Ready(None), Ready(Some(Ok(element))) => match element { WebmElement::Cluster => { - let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; + let liberated_buffer = + mem::replace(buffer, Cursor::new(Vec::new())); + let header_chunk = Chunk::Headers { + bytes: Bytes::from(liberated_buffer.into_inner()), + }; chunker.state = ChunkerState::BuildingCluster( ClusterHead::new(0), - Cursor::new(Vec::new()) + Cursor::new(Vec::new()), ); return Ready(Some(Ok(header_chunk))); - }, - WebmElement::Info => {}, - WebmElement::Void => {}, - WebmElement::Unknown(_) => {}, + } + WebmElement::Info => {} + WebmElement::Void => {} + WebmElement::Unknown(_) => {} element => { - if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { + if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) + { chunker.state = ChunkerState::End; return Ready(Some(Err(err))); } } - } + }, } - }, + } ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { match chunker.source.poll_event(cx) { Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), Pending => return Pending, Ready(Some(Ok(element))) => match element { WebmElement::EbmlHead | WebmElement::Segment => { - let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); - let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + let liberated_cluster_head = + mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = + mem::replace(buffer, Cursor::new(Vec::new())); let mut new_header_cursor = Cursor::new(Vec::new()); - match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { + match encode( + element, + &mut new_header_cursor, + chunker.buffer_size_limit, + ) { Ok(_) => { - chunker.state = ChunkerState::BuildingHeader(new_header_cursor); - return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); - }, + chunker.state = + ChunkerState::BuildingHeader(new_header_cursor); + return Ready(Some(Ok(Chunk::Cluster( + liberated_cluster_head, + Bytes::from(liberated_buffer.into_inner()), + )))); + } Err(err) => { chunker.state = ChunkerState::End; return Ready(Some(Err(err))); } } - }, + } WebmElement::Cluster => { - let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); - let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + let liberated_cluster_head = + mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = + mem::replace(buffer, Cursor::new(Vec::new())); - return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); - }, + return Ready(Some(Ok(Chunk::Cluster( + liberated_cluster_head, + Bytes::from(liberated_buffer.into_inner()), + )))); + } WebmElement::Timecode(timecode) => { cluster_head.update_timecode(timecode); - }, + } WebmElement::SimpleBlock(ref block) => { if (block.flags & 0b10000000) != 0 { // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster cluster_head.keyframe = true; } cluster_head.observe_simpleblock_timecode(block.timecode); - if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) { + if let Err(err) = encode( + WebmElement::SimpleBlock(*block), + buffer, + chunker.buffer_size_limit, + ) { chunker.state = ChunkerState::End; return Ready(Some(Err(err))); } - }, - WebmElement::Info => {}, - WebmElement::Void => {}, - WebmElement::Unknown(_) => {}, + } + WebmElement::Info => {} + WebmElement::Void => {} + WebmElement::Unknown(_) => {} element => { - if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { + if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) + { chunker.state = ChunkerState::End; return Ready(Some(Err(err))); } - }, + } }, Ready(None) => { // flush final Cluster on end of stream - let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_cluster_head = + mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); chunker.state = ChunkerState::End; - return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); + return Ready(Some(Ok(Chunk::Cluster( + liberated_cluster_head, + Bytes::from(liberated_buffer.into_inner()), + )))); } } - }, - ChunkerState::End => return Ready(None) + } + ChunkerState::End => return Ready(None), }; } } @@ -253,7 +287,7 @@ impl WebmStream for EbmlStreamingParser { WebmChunker { source: self, buffer_size_limit: None, - state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), } } } diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 79180b6..6cd94f4 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -47,7 +47,7 @@ pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> { } while let Some(chunk) = chunk_stream.next().await { - chunk?.try_for_each(|buffer| io::stdout().write_all(&buffer))?; + chunk?.into_iter().try_for_each(|buffer| io::stdout().write_all(&buffer))?; } Ok(()) }