diff --git a/Cargo.lock b/Cargo.lock index 662bc3a..6a24bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,15 +11,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - [[package]] name = "atty" version = "0.2.14" @@ -97,17 +88,41 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "2.34.0" +version = "3.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b" dependencies = [ - "ansi_term", "atty", "bitflags", + "clap_derive", + "clap_lex", + "indexmap", + "lazy_static", "strsim", + "termcolor", "textwrap", - "unicode-width", - "vec_map", +] + +[[package]] +name = "clap_derive" +version = "3.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" +dependencies = [ + "os_str_bytes", ] [[package]] @@ -352,6 +367,12 @@ dependencies = [ "http", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -562,6 +583,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "os_str_bytes" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -606,6 +633,30 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.39" @@ -784,9 +835,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" @@ -824,12 +875,9 @@ dependencies = [ [[package]] name = "textwrap" -version = "0.11.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" @@ -1056,12 +1104,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-width" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" - [[package]] name = "url" version = "2.2.2" @@ -1080,12 +1122,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index a704e85..2789dbd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] byteorder = "1" bytes = "1" -clap = "^2.33" +clap = { version="^3.1.18", features=["cargo", "derive"] } custom_error = "^1.7" env_logger = "^0.9" futures = "^0.3" diff --git a/src/commands/dump.rs b/src/commands/dump.rs index e4ddfa7..dfa3781 100644 --- a/src/commands/dump.rs +++ b/src/commands/dump.rs @@ -1,32 +1,26 @@ -use clap::{App, AppSettings, ArgMatches, SubCommand}; +use clap::Args; use super::stdin_stream; use webmetro::{ error::WebmetroError, stream_parser::StreamEbml, - webm::{ - SimpleBlock, - WebmElement::* - } + webm::{SimpleBlock, WebmElement::*}, }; -pub fn options() -> App<'static, 'static> { - SubCommand::with_name("dump") - .setting(AppSettings::Hidden) - .about("Dumps WebM parsing events from parsing stdin") -} +/// Dumps WebM parsing events from parsing stdin +#[derive(Args, Debug)] +pub struct DumpArgs; #[tokio::main] -pub async fn run(_args: &ArgMatches) -> Result<(), WebmetroError> { - +pub async fn run(_args: DumpArgs) -> Result<(), WebmetroError> { let mut events = stdin_stream().parse_ebml(); while let Some(element) = events.next().await? { match element { // suppress printing byte arrays Tracks(slice) => println!("Tracks[{}]", slice.len()), - SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), - other => println!("{:?}", other) + SimpleBlock(SimpleBlock { timecode, .. }) => println!("SimpleBlock@{}", timecode), + other => println!("{:?}", other), } } Ok(()) diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 16c12ac..a120e0c 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -1,6 +1,6 @@ use std::{io, io::prelude::*, pin::Pin}; -use clap::{App, Arg, ArgMatches, SubCommand}; +use clap::Args; use futures::prelude::*; use super::stdin_stream; @@ -11,16 +11,16 @@ use webmetro::{ stream_parser::StreamEbml, }; -pub fn options() -> App<'static, 'static> { - SubCommand::with_name("filter") - .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") - .arg(Arg::with_name("throttle") - .long("throttle") - .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) +/// Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does. +#[derive(Args, Debug)] +pub struct FilterArgs { + /// Slow down output to "real time" speed as determined by the timestamps (useful for streaming static files) + #[clap(long, short)] + throttle: bool, } #[tokio::main] -pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { +pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> { let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut chunk_stream: Pin> + Send>> = Box::pin( @@ -30,7 +30,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { .map_ok(move |chunk| timecode_fixer.process(chunk)), ); - if args.is_present("throttle") { + if args.throttle { chunk_stream = Box::pin(Throttle::new(chunk_stream)); } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 662a1de..2022473 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -17,14 +17,11 @@ pub fn stdin_stream() -> impl Stream> + Siz FramedRead::new(tokio::io::stdin(), BytesCodec::new()).map_ok(|bytes| bytes.freeze()) } -pub fn parse_time(arg: Option<&str>) -> Result, WebmetroError> { - match arg { - Some(string) => match string.parse() { - Ok(secs) => Ok(Some(Duration::from_secs(secs))), - Err(err) => Err(WebmetroError::ApplicationError { - message: err.to_string(), - }), - }, - None => Ok(None), +pub fn parse_time(arg: &str) -> Result { + match arg.parse() { + Ok(secs) => Ok(Duration::from_secs(secs)), + Err(err) => Err(WebmetroError::ApplicationError { + message: err.to_string(), + }), } } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 2ff2c2b..b02979b 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -1,74 +1,53 @@ use std::net::ToSocketAddrs; -use std::sync::{ - Arc, - Mutex, - Weak -}; +use std::sync::{Arc, Mutex, Weak}; -use bytes::{Bytes, Buf}; -use clap::{App, Arg, ArgMatches, SubCommand}; -use futures::{ - prelude::*, - Stream, - stream::FuturesUnordered, -}; +use bytes::{Buf, Bytes}; +use clap::Args; +use futures::{prelude::*, stream::FuturesUnordered, Stream}; use hyper::{ - Body, - Response, - header::{ - CACHE_CONTROL, - CONTENT_TYPE - } + header::{CACHE_CONTROL, CONTENT_TYPE}, + Body, Response, }; use stream::iter; -use warp::{ - self, - Filter, - path -}; -use weak_table::{ - WeakValueHashMap -}; +use warp::{self, path, Filter}; +use weak_table::WeakValueHashMap; use webmetro::{ - channel::{ - Channel, - Handle, - Listener, - Transmitter - }, + channel::{Channel, Handle, Listener, Transmitter}, + chunk::Chunk, chunk::WebmStream, error::WebmetroError, - fixers::{ - ChunkStream, - ChunkTimecodeFixer, - }, - stream_parser::StreamEbml -, chunk::Chunk}; + fixers::{ChunkStream, ChunkTimecodeFixer}, + stream_parser::StreamEbml, +}; const BUFFER_LIMIT: usize = 2 * 1024 * 1024; fn get_stream(channel: Handle) -> impl Stream> { let mut timecode_fixer = ChunkTimecodeFixer::new(); - Listener::new(channel).map(|c| Result::::Ok(c)) - .map_ok(move |chunk| timecode_fixer.process(chunk)) - .find_starting_point() - .map_ok(|webm_chunk| iter(webm_chunk).map(Result::::Ok)) - .try_flatten() + Listener::new(channel) + .map(|c| Result::::Ok(c)) + .map_ok(move |chunk| timecode_fixer.process(chunk)) + .find_starting_point() + .map_ok(|webm_chunk| iter(webm_chunk).map(Result::::Ok)) + .try_flatten() } -fn post_stream(channel: Handle, stream: impl Stream> + Unpin) -> impl Stream> { +fn post_stream( + channel: Handle, + stream: impl Stream> + Unpin, +) -> impl Stream> { let channel = Transmitter::new(channel); stream .map_err(WebmetroError::from) - .parse_ebml().with_soft_limit(BUFFER_LIMIT) - .chunk_webm().with_soft_limit(BUFFER_LIMIT) + .parse_ebml() + .with_soft_limit(BUFFER_LIMIT) + .chunk_webm() + .with_soft_limit(BUFFER_LIMIT) .map_ok(move |chunk| { channel.send(chunk); Bytes::new() }) - .inspect_err(|err| { - warn!("{}", err) - }) + .inspect_err(|err| warn!("{}", err)) } fn media_response(body: Body) -> Response { @@ -80,18 +59,19 @@ fn media_response(body: Body) -> Response { .unwrap() } -pub fn options() -> App<'static, 'static> { - SubCommand::with_name("relay") - .about("Hosts an HTTP-based relay server") - .arg(Arg::with_name("listen") - .help("The address:port to listen to") - .required(true)) +/// Hosts an HTTP-based relay server +#[derive(Args, Debug)] +pub struct RelayArgs { + /// The address:port to listen to + listen: String, } #[tokio::main] -pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { - let channel_map = Arc::new(Mutex::new(WeakValueHashMap::>>::new())); - let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; +pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> { + let channel_map = Arc::new(Mutex::new( + WeakValueHashMap::>>::new(), + )); + let addr_str = args.listen; let addrs = addr_str.to_socket_addrs()?; info!("Binding to {:?}", addrs); @@ -100,37 +80,40 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { } let channel = path!("live" / String).map(move |name: String| { - let channel = channel_map.lock().unwrap() + let channel = channel_map + .lock() + .unwrap() .entry(name.clone()) .or_insert_with(|| Channel::new(name.clone())); (channel, name) }); - let head = channel.clone().and(warp::head()) - .map(|(_, name)| { - info!("HEAD Request For Channel {}", name); - media_response(Body::empty()) - }); + let head = channel.clone().and(warp::head()).map(|(_, name)| { + info!("HEAD Request For Channel {}", name); + media_response(Body::empty()) + }); - let get = channel.clone().and(warp::get()) - .map(|(channel, name)| { - info!("Listener Connected On Channel {}", name); - media_response(Body::wrap_stream(get_stream(channel))) - }); + let get = channel.clone().and(warp::get()).map(|(channel, name)| { + info!("Listener Connected On Channel {}", name); + media_response(Body::wrap_stream(get_stream(channel))) + }); - let post_put = channel.clone().and(warp::post().or(warp::put()).unify()) - .and(warp::body::stream()).map(|(channel, name), stream| { + let post_put = channel + .clone() + .and(warp::post().or(warp::put()).unify()) + .and(warp::body::stream()) + .map(|(channel, name), stream| { info!("Source Connected On Channel {}", name); Response::new(Body::wrap_stream(post_stream(channel, stream))) }); - let routes = head - .or(get) - .or(post_put); + let routes = head.or(get).or(post_put); - let mut server_futures: FuturesUnordered<_> = addrs.map(|addr| warp::serve(routes.clone()).try_bind(addr)).collect(); + let mut server_futures: FuturesUnordered<_> = addrs + .map(|addr| warp::serve(routes.clone()).try_bind(addr)) + .collect(); - while let Some(_) = server_futures.next().await {}; + while let Some(_) = server_futures.next().await {} Ok(()) } diff --git a/src/commands/send.rs b/src/commands/send.rs index 20b51e5..c4378d4 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -1,8 +1,12 @@ use bytes::Bytes; -use clap::{App, Arg, ArgMatches, SubCommand}; +use clap::Args; use futures::prelude::*; use hyper::{client::HttpConnector, Body, Client, Request}; -use std::{io::{stdout, Write}, pin::Pin}; +use std::{ + io::{stdout, Write}, + pin::Pin, + time::Duration, +}; use stream::iter; use super::{parse_time, stdin_stream}; @@ -13,39 +17,30 @@ use webmetro::{ stream_parser::StreamEbml, }; -pub fn options() -> App<'static, 'static> { - SubCommand::with_name("send") - .about("PUTs WebM from stdin to a relay server.") - .arg(Arg::with_name("url") - .help("The location to upload to") - .required(true)) - .arg(Arg::with_name("throttle") - .long("throttle") - .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) - .arg(Arg::with_name("skip") - .takes_value(true) - .short("s") - .long("skip") - .help("Skip approximately n seconds of content before uploading or throttling")) - .arg(Arg::with_name("take") - .takes_value(true) - .short("t") - .long("take") - .help("Stop uploading after approximately n seconds of content")) -} - type BoxedChunkStream = Pin> + Send + Sync>>; -#[tokio::main] -pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { - // parse args - let url_str = match args.value_of("url") { - Some(url) => String::from(url), - _ => return Err("Listen address wasn't provided".into()), - }; +/// PUTs WebM from stdin to a relay server. +#[derive(Args, Debug)] +pub struct SendArgs { + /// The location to upload to + url: String, + /// Slow down upload to "real time" speed as determined by the timestamps (useful for streaming static files) + #[clap(long)] + throttle: bool, + /// Skip approximately n seconds of content before uploading or throttling + #[clap(long, short, parse(try_from_str = parse_time))] + skip: Option, + /// Stop uploading after approximately n seconds of content + #[clap(long, short, parse(try_from_str = parse_time))] + take: Option, +} - let start_time = parse_time(args.value_of("skip"))?.map_or(0, |s| s.as_millis()); - let stop_time = parse_time(args.value_of("take"))?.map_or(std::u128::MAX, |t| t.as_millis() + start_time); +#[tokio::main] +pub async fn run(args: SendArgs) -> Result<(), WebmetroError> { + let start_time = args.skip.map_or(0, |s| s.as_millis()); + let stop_time = args + .take + .map_or(std::u128::MAX, |t| t.as_millis() + start_time); // build pipeline let mut timecode_fixer = ChunkTimecodeFixer::new(); @@ -57,7 +52,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { .try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))), ); - if args.is_present("throttle") { + if args.throttle { chunk_stream = Box::pin(Throttle::new(chunk_stream)); } @@ -71,7 +66,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let request_payload = Body::wrap_stream(chunk_stream); - let request = Request::put(url_str).body(request_payload)?; + let request = Request::put(args.url).body(request_payload)?; let client = Client::builder().build(HttpConnector::new()); let response = client.request(request).await?; diff --git a/src/main.rs b/src/main.rs index 3dd8696..32b6fbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,44 +1,37 @@ - -#[macro_use] extern crate log; +#[macro_use] +extern crate log; mod commands; -use clap::{App, AppSettings, crate_version}; +use clap::{Parser, Subcommand}; -use crate::commands::{ - relay, - filter, - send, - dump -}; +/// Utilities for broadcasting & relaying live WebM video/audio streams +#[derive(Parser, Debug)] +#[clap(version)] +struct Args { + #[clap(subcommand)] + command: Command, +} -fn options() -> App<'static, 'static> { - App::new("webmetro") - .version(crate_version!()) - .about("Utilities for broadcasting & relaying live WebM video/audio streams") - .setting(AppSettings::DisableHelpSubcommand) - .setting(AppSettings::VersionlessSubcommands) - .subcommand(relay::options()) - .subcommand(filter::options()) - .subcommand(send::options()) - .subcommand(dump::options()) +#[derive(Subcommand, Debug)] +enum Command { + Dump(commands::dump::DumpArgs), + Filter(commands::filter::FilterArgs), + Relay(commands::relay::RelayArgs), + Send(commands::send::SendArgs), } fn main() { env_logger::init(); - let args = options().get_matches(); + let args = Args::parse(); - match args.subcommand() { - ("filter", Some(sub_args)) => filter::run(sub_args), - ("relay", Some(sub_args)) => relay::run(sub_args), - ("send", Some(sub_args)) => send::run(sub_args), - ("dump", Some(sub_args)) => dump::run(sub_args), - _ => { - options().print_help().unwrap(); - println!(""); - Ok(()) - } - }.unwrap_or_else(|err| { + match args.command { + Command::Dump(args) => commands::dump::run(args), + Command::Filter(args) => commands::filter::run(args), + Command::Relay(args) => commands::relay::run(args), + Command::Send(args) => commands::send::run(args), + } + .unwrap_or_else(|err| { error!("{}", err); }); }