From ff8d4e912661449e1ac7a55d6e9603211c660711 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sun, 8 Apr 2018 02:31:00 -0400 Subject: [PATCH] Break out "find starting point" fixer operator --- src/bin/loop_server.rs | 2 +- src/fixers.rs | 69 +++++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/src/bin/loop_server.rs b/src/bin/loop_server.rs index cdfca0e..2d38daa 100644 --- a/src/bin/loop_server.rs +++ b/src/bin/loop_server.rs @@ -32,7 +32,7 @@ impl Service for WebmServer { (&Get, "/loop") => { let stream: BodyStream> = Box::new( repeat::<&[u8], ()>(SRC_FILE).take(5) - .parse_ebml().chunk_webm().fix_timecodes() + .parse_ebml().chunk_webm().fix_timecodes().find_starting_point() .map_err(|err| match err { ChunkingError::IoError(io_err) => hyper::Error::Io(io_err), ChunkingError::OtherError(_) => hyper::Error::Incomplete diff --git a/src/fixers.rs b/src/fixers.rs index 4d73050..4957781 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -7,8 +7,7 @@ pub struct ChunkTimecodeFixer { stream: S, current_offset: u64, last_observed_timecode: u64, - assumed_duration: u64, - seen_header: bool + assumed_duration: u64 } impl> Stream for ChunkTimecodeFixer @@ -29,27 +28,75 @@ impl> Stream for ChunkTimecodeFixer cluster_head.update_timecode(start + self.current_offset); self.last_observed_timecode = cluster_head.end; }, - Ok(Async::Ready(Some(Chunk::Headers {..}))) => { - if self.seen_header { - return self.poll(); - } else { - self.seen_header = true; - } - }, _ => {} }; poll_chunk } } +pub struct StartingPointFinder { + stream: S, + seen_header: bool, + seen_keyframe: bool +} + +impl> Stream for StartingPointFinder +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Result>, Self::Error> { + loop { + return match self.stream.poll() { + Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => { + if cluster_head.keyframe { + self.seen_keyframe = true; + } + + if self.seen_keyframe { + Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) + } else { + continue; + } + }, + chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => { + if self.seen_keyframe { + chunk + } else { + continue; + } + }, + chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => { + if self.seen_header { + // new stream starting, we don't need a new header but should wait for a safe spot to resume + self.seen_keyframe = false; + continue; + } else { + self.seen_header = true; + chunk + } + }, + chunk => chunk + } + }; + } +} + pub trait ChunkStream where Self : Sized + Stream { fn fix_timecodes(self) -> ChunkTimecodeFixer { ChunkTimecodeFixer { stream: self, current_offset: 0, last_observed_timecode: 0, - assumed_duration: 33, - seen_header: false + assumed_duration: 33 + } + } + + fn find_starting_point(self) -> StartingPointFinder { + StartingPointFinder { + stream: self, + seen_header: false, + seen_keyframe: false } } }