use std::time::{Duration, Instant}; use futures::prelude::*; use tokio::timer::Delay; use crate::chunk::Chunk; use crate::error::WebmetroError; pub struct ChunkTimecodeFixer { stream: S, current_offset: u64, last_observed_timecode: u64, assumed_duration: u64 } impl> Stream for ChunkTimecodeFixer { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Result>, Self::Error> { let mut poll_chunk = self.stream.poll(); match poll_chunk { Ok(Async::Ready(Some(Chunk::ClusterHead(ref mut cluster_head)))) => { let start = cluster_head.start; if start < self.last_observed_timecode { let next_timecode = self.last_observed_timecode + self.assumed_duration; self.current_offset = next_timecode - start; } cluster_head.update_timecode(start + self.current_offset); self.last_observed_timecode = cluster_head.end; }, _ => {} }; poll_chunk } } pub struct StartingPointFinder { stream: S, seen_header: bool, seen_keyframe: bool } impl> Stream for StartingPointFinder { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Result>, Self::Error> { loop { return match self.stream.poll() { Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => { if cluster_head.keyframe { self.seen_keyframe = true; } if self.seen_keyframe { Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) } else { continue; } }, chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => { if self.seen_keyframe { chunk } else { continue; } }, chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => { if self.seen_header { // new stream starting, we don't need a new header but should wait for a safe spot to resume self.seen_keyframe = false; continue; } else { self.seen_header = true; chunk } }, chunk => chunk } }; } } pub struct Throttle { stream: S, start_time: Instant, sleep: Delay } impl> Stream for Throttle { type Item = S::Item; type Error = WebmetroError; fn poll(&mut self) -> Result>, WebmetroError> { match self.sleep.poll() { Err(err) => return Err(WebmetroError::Unknown(Box::new(err))), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(())) => { /* can continue */ } } let next_chunk = self.stream.poll(); if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { // snooze until real time has "caught up" to the stream let offset = Duration::from_millis(cluster_head.end); self.sleep.reset(self.start_time + offset); } next_chunk } } pub trait ChunkStream where Self : Sized + Stream { fn fix_timecodes(self) -> ChunkTimecodeFixer { ChunkTimecodeFixer { stream: self, current_offset: 0, last_observed_timecode: 0, assumed_duration: 33 } } fn find_starting_point(self) -> StartingPointFinder { StartingPointFinder { stream: self, seen_header: false, seen_keyframe: false } } fn throttle(self) -> Throttle { let now = Instant::now(); Throttle { stream: self, start_time: now, sleep: Delay::new(now) } } } impl> ChunkStream for T {}