diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 663a2bb..4e6d7bd 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -13,6 +13,7 @@ use webmetro::{ Chunk, WebmStream }, + error::WebmetroError, fixers::ChunkStream, stream_parser::StreamEbml }; @@ -28,11 +29,10 @@ pub fn options() -> App<'static, 'static> { pub fn run(args: &ArgMatches) -> Result<(), Box> { let stdin = io::stdin(); - let mut chunk_stream: Box>> = Box::new( + let mut chunk_stream: Box> = Box::new( StdinStream::new(stdin.lock()) .parse_ebml() .chunk_webm() - .map_err(|err| Box::new(err) as Box) .fix_timecodes() ); @@ -40,8 +40,9 @@ pub fn run(args: &ArgMatches) -> Result<(), Box> { chunk_stream = Box::new(chunk_stream.throttle()); } - let result = chunk_stream.fold((), |_, chunk| { + chunk_stream.fold((), |_, chunk| { io::stdout().write_all(chunk.as_ref()) - }).wait(); - result + }).wait()?; + + Ok(()) } diff --git a/src/fixers.rs b/src/fixers.rs index 5925c26..dc6fd24 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -1,9 +1,10 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; -use futures::Async; -use futures::stream::Stream; +use futures::prelude::*; +use tokio::timer::Delay; use chunk::Chunk; +use error::WebmetroError; pub struct ChunkTimecodeFixer { stream: S, @@ -86,16 +87,29 @@ impl> Stream for StartingPointFinder pub struct Throttle { stream: S, - start_time: Instant + start_time: Instant, + sleep: Delay } -impl> Stream for Throttle +impl> Stream for Throttle { type Item = S::Item; - type Error = S::Error; + type Error = WebmetroError; - fn poll(&mut self) -> Result>, Self::Error> { - self.stream.poll() + fn poll(&mut self) -> Result>, WebmetroError> { + match self.sleep.poll() { + Err(err) => return Err(WebmetroError::Unknown(Box::new(err))), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => { /* can continue */ } + } + + let next_chunk = self.stream.poll(); + if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { + // snooze until real time has "caught up" to the stream + let offset = Duration::from_millis(cluster_head.end); + self.sleep.reset(self.start_time + offset); + } + next_chunk } } @@ -118,9 +132,11 @@ pub trait ChunkStream where Self : Sized + Stream { } fn throttle(self) -> Throttle { + let now = Instant::now(); Throttle { stream: self, - start_time: Instant::now() + start_time: now, + sleep: Delay::new(now) } } } diff --git a/src/lib.rs b/src/lib.rs index 1f064cb..b15d184 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ extern crate bytes; extern crate futures; extern crate odds; +extern crate tokio; pub mod ebml; pub mod error; diff --git a/src/main.rs b/src/main.rs index d2f70d8..eb6d5ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate clap; extern crate futures; extern crate hyper; +extern crate tokio; extern crate webmetro; mod commands;