From 43acf93fa92d28c6ca0106b5875f7674f1cebb69 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Fri, 11 Sep 2020 20:34:40 -0400 Subject: [PATCH] Fuse ClusterHead & ClusterBody chunks into one chunk type (there is still a technical body chunk variant for iteration purposes, but it won't be produced by the parser) --- src/chunk.rs | 36 +++++++++++++++--------------------- src/fixers.rs | 15 ++++----------- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index b368690..cc0838d 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -58,15 +58,16 @@ pub enum Chunk { Headers { bytes: Bytes }, - ClusterHead(ClusterHead), - ClusterBody { - bytes: Bytes - }, + Cluster(ClusterHead, Bytes), + // for iteration only + #[doc(hidden)] + RemainingBody(Bytes), + // for iteration only + #[doc(hidden)] Empty } -pub struct Iter(Chunk); - +// TODO: make an external iterator type so we can remove Chunk::RemainingBody & Chunk::Empty impl Iterator for Chunk { type Item = Bytes; @@ -77,12 +78,13 @@ impl Iterator for Chunk { *self = Chunk::Empty; Some(bytes) }, - Chunk::ClusterHead(ClusterHead {bytes, ..}) => { + Chunk::Cluster(ClusterHead {bytes, ..}, body) => { let bytes = mem::replace(bytes, BytesMut::new()); - *self = Chunk::Empty; + let body = mem::replace(body, Bytes::new()); + *self = Chunk::RemainingBody(body); Some(bytes.freeze()) }, - Chunk::ClusterBody {bytes, ..} => { + Chunk::RemainingBody(bytes) => { let bytes = mem::replace(bytes, Bytes::new()); *self = Chunk::Empty; Some(bytes) @@ -104,7 +106,6 @@ pub struct WebmChunker { source: EbmlStreamingParser, buffer_size_limit: Option, state: ChunkerState, - pending_chunk: Option, } impl WebmChunker { @@ -135,9 +136,6 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { let mut chunker = self.get_mut(); - if chunker.pending_chunk.is_some() { - return Ready(chunker.pending_chunk.take().map(Ok)); - } loop { match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { @@ -180,9 +178,8 @@ where let mut new_header_cursor = Cursor::new(Vec::new()); match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { Ok(_) => { - chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); chunker.state = ChunkerState::BuildingHeader(new_header_cursor); - return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); + return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); }, Err(err) => { chunker.state = ChunkerState::End; @@ -194,8 +191,7 @@ where let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); - return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); + return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); }, WebmElement::Timecode(timecode) => { cluster_head.update_timecode(timecode); @@ -226,9 +222,8 @@ where let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); chunker.state = ChunkerState::End; - return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); + return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); } } }, @@ -249,8 +244,7 @@ impl WebmStream for EbmlStreamingParser { WebmChunker { source: self, buffer_size_limit: None, - state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), - pending_chunk: None + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) } } } diff --git a/src/fixers.rs b/src/fixers.rs index 0c02272..9605d34 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -30,7 +30,7 @@ impl ChunkTimecodeFixer { } pub fn process(&mut self, mut chunk: Chunk) -> Chunk { match chunk { - Chunk::ClusterHead(ref mut cluster_head) => { + Chunk::Cluster(ref mut cluster_head, _) => { let start = cluster_head.start; if start < self.last_observed_timecode { let next_timecode = self.last_observed_timecode + self.assumed_duration; @@ -59,20 +59,13 @@ impl + Unpin> Stream for StartingPointFinder fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { loop { return match self.stream.try_poll_next_unpin(cx) { - Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => { + Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => { if cluster_head.keyframe { self.seen_keyframe = true; } if self.seen_keyframe { - Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) - } else { - continue; - } - }, - chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => { - if self.seen_keyframe { - chunk + Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) } else { continue; } @@ -121,7 +114,7 @@ impl + Unpin> Stream for Throttle } let next_chunk = self.stream.try_poll_next_unpin(cx); - if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { + if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk { // we have actual data, so start the clock if we haven't yet let start_time = self.start_time.get_or_insert_with(Instant::now); // snooze until real time has "caught up" to the stream