From 413375102e89ab503595f6419372d969c38384a6 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Thu, 12 Apr 2018 02:03:46 -0400 Subject: [PATCH] Implement core of filter subcommand --- src/commands/filter.rs | 43 ++++++++++++++++++++++++++++++++++++++++++ src/commands/mod.rs | 1 + src/main.rs | 3 +++ 3 files changed, 47 insertions(+) create mode 100644 src/commands/filter.rs diff --git a/src/commands/filter.rs b/src/commands/filter.rs new file mode 100644 index 0000000..2e85c53 --- /dev/null +++ b/src/commands/filter.rs @@ -0,0 +1,43 @@ +use std::{ + error::Error, + io, + io::prelude::* +}; + +use clap::{App, ArgMatches, SubCommand}; +use futures::Stream; + +use super::StdinStream; +use webmetro::{ + chunk::{ + Chunk, + WebmStream + }, + fixers::ChunkStream, + stream_parser::StreamEbml +}; + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("filter") + .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") +} + +pub fn run(_args: &ArgMatches) -> Result<(), Box> { + + let stdin = io::stdin(); + let chunk_stream: Box>> = Box::new( + StdinStream::new(stdin.lock()) + .parse_ebml() + .chunk_webm() + .map_err(|err| Box::new(err) as Box) + .fix_timecodes() + ); + + let stdout = io::stdout(); + let mut stdout_writer = stdout.lock(); + for chunk in chunk_stream.wait() { + stdout_writer.write_all(chunk?.as_ref())?; + } + + Ok(()) +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 3ff2c18..a4ca1a1 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -10,6 +10,7 @@ use futures::{ }; pub mod dump; +pub mod filter; pub mod relay; /// A hackish adapter that makes chunks of bytes from stdin available as a Stream; diff --git a/src/main.rs b/src/main.rs index d91be28..d2f70d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod commands; use clap::{App, AppSettings}; use commands::{ relay, + filter, dump }; @@ -17,6 +18,7 @@ fn options() -> App<'static, 'static> { .about("Utilities for broadcasting & relaying live WebM video/audio streams") .setting(AppSettings::VersionlessSubcommands) .subcommand(relay::options()) + .subcommand(filter::options()) .subcommand(dump::options()) } @@ -24,6 +26,7 @@ fn main() { let args = options().get_matches(); match args.subcommand() { + ("filter", Some(sub_args)) => filter::run(sub_args), ("relay", Some(sub_args)) => relay::run(sub_args), ("dump", Some(sub_args)) => dump::run(sub_args), _ => {