Stub out throttle filter

This commit is contained in:
Tangent 128 2018-04-12 23:29:12 -04:00
parent 413375102e
commit 6434db0f82
2 changed files with 34 additions and 3 deletions

View file

@ -4,7 +4,7 @@ use std::{
io::prelude::* io::prelude::*
}; };
use clap::{App, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::Stream; use futures::Stream;
use super::StdinStream; use super::StdinStream;
@ -20,12 +20,15 @@ use webmetro::{
pub fn options() -> App<'static, 'static> { pub fn options() -> App<'static, 'static> {
SubCommand::with_name("filter") SubCommand::with_name("filter")
.about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.")
.arg(Arg::with_name("throttle")
.long("throttle")
.help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)"))
} }
pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
let stdin = io::stdin(); let stdin = io::stdin();
let chunk_stream: Box<Stream<Item = Chunk, Error = Box<Error>>> = Box::new( let mut chunk_stream: Box<Stream<Item = Chunk, Error = Box<Error>>> = Box::new(
StdinStream::new(stdin.lock()) StdinStream::new(stdin.lock())
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
@ -33,6 +36,10 @@ pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> {
.fix_timecodes() .fix_timecodes()
); );
if args.is_present("throttle") {
chunk_stream = Box::new(chunk_stream.throttle());
}
let stdout = io::stdout(); let stdout = io::stdout();
let mut stdout_writer = stdout.lock(); let mut stdout_writer = stdout.lock();
for chunk in chunk_stream.wait() { for chunk in chunk_stream.wait() {

View file

@ -1,3 +1,5 @@
use std::time::Instant;
use futures::Async; use futures::Async;
use futures::stream::Stream; use futures::stream::Stream;
@ -82,6 +84,21 @@ impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
} }
} }
pub struct Throttle<S> {
stream: S,
start_time: Instant
}
impl<S: Stream<Item = Chunk>> Stream for Throttle<S>
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.stream.poll()
}
}
pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> { pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> { fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> {
ChunkTimecodeFixer { ChunkTimecodeFixer {
@ -99,6 +116,13 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
seen_keyframe: false seen_keyframe: false
} }
} }
fn throttle(self) -> Throttle<Self> {
Throttle {
stream: self,
start_time: Instant::now()
}
}
} }
impl<T: Stream<Item = Chunk>> ChunkStream for T {} impl<T: Stream<Item = Chunk>> ChunkStream for T {}