From 739862bc3597db7be99b338ca1691248efac94d5 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sun, 15 Apr 2018 01:43:23 -0400 Subject: [PATCH] Add send subcommand for uploading WebM --- src/commands/filter.rs | 2 +- src/commands/mod.rs | 1 + src/commands/send.rs | 79 ++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 +++ 4 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 src/commands/send.rs diff --git a/src/commands/filter.rs b/src/commands/filter.rs index a276d88..1cdecde 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -22,7 +22,7 @@ pub fn options() -> App<'static, 'static> { .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") .arg(Arg::with_name("throttle") .long("throttle") - .help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)")) + .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) } pub fn run(args: &ArgMatches) -> Box + Send> { diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 1daf601..9ebb8b4 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -23,6 +23,7 @@ use webmetro::error::WebmetroError; pub mod dump; pub mod filter; pub mod relay; +pub mod send; /// An adapter that makes chunks of bytes from stdin available as a Stream; /// is NOT actually async, and just uses blocking read. Don't use more than diff --git a/src/commands/send.rs b/src/commands/send.rs new file mode 100644 index 0000000..c070444 --- /dev/null +++ b/src/commands/send.rs @@ -0,0 +1,79 @@ +use clap::{App, Arg, ArgMatches, SubCommand}; +use futures::{ + future, + prelude::* +}; +use hyper::{ + Error as HyperError, + Method, + client::{ + Config, + Request + } +}; +use tokio_core::reactor::{ + Handle +}; + +use super::{ + stdin_stream, + to_hyper_error +}; +use webmetro::{ + chunk::{ + Chunk, + WebmStream + }, + error::WebmetroError, + fixers::ChunkStream, + stream_parser::StreamEbml +}; + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("send") + .about("PUTs WebM from stdin to a relay server.") + .arg(Arg::with_name("url") + .help("The location to upload to") + .required(true)) + .arg(Arg::with_name("throttle") + .long("throttle") + .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) +} + +type BoxedChunkStream = Box>; +type BoxedHyperStream = Box>; + +pub fn run(handle: Handle, args: &ArgMatches) -> Box> { + let mut chunk_stream: BoxedChunkStream = Box::new( + stdin_stream() + .parse_ebml() + .chunk_webm() + .fix_timecodes() + ); + + let url_str = match args.value_of("url") { + Some(url) => String::from(url), + _ => return Box::new(Err(WebmetroError::from_str("Listen address wasn't provided")).into_future()) + }; + + if args.is_present("throttle") { + chunk_stream = Box::new(chunk_stream.throttle()); + } + + let request_body_stream = Box::new(chunk_stream.map_err(to_hyper_error)) as BoxedHyperStream; + + Box::new(future::lazy(move || { + url_str.parse().map_err(WebmetroError::from_err) + }).and_then(move |uri| { + let client = Config::default() + .body::() + .build(&handle); + + let mut request: Request = Request::new(Method::Put, uri); + request.set_body(request_body_stream); + + client.request(request) + .map(|_response| ()) + .map_err(WebmetroError::from_err) + })) +} diff --git a/src/main.rs b/src/main.rs index 71abbe0..a1232ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ use webmetro::error::WebmetroError; use commands::{ relay, filter, + send, dump }; @@ -23,9 +24,11 @@ fn options() -> App<'static, 'static> { App::new("webmetro") .version(crate_version!()) .about("Utilities for broadcasting & relaying live WebM video/audio streams") + .setting(AppSettings::DisableHelpSubcommand) .setting(AppSettings::VersionlessSubcommands) .subcommand(relay::options()) .subcommand(filter::options()) + .subcommand(send::options()) .subcommand(dump::options()) } @@ -38,6 +41,7 @@ fn main() { tokio_run(core, match args.subcommand() { ("filter", Some(sub_args)) => box_up(filter::run(sub_args)), ("relay", Some(sub_args)) => box_up(relay::run(sub_args)), + ("send", Some(sub_args)) => box_up(send::run(handle, sub_args)), ("dump", Some(sub_args)) => box_up(dump::run(sub_args)), _ => box_up(futures::lazy(|| { options().print_help().unwrap();