Add send subcommand for uploading WebM

This commit is contained in:
Tangent 128 2018-04-15 01:43:23 -04:00
parent 885681f009
commit 739862bc35
4 changed files with 85 additions and 1 deletions

View file

@ -22,7 +22,7 @@ pub fn options() -> App<'static, 'static> {
.about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.")
.arg(Arg::with_name("throttle") .arg(Arg::with_name("throttle")
.long("throttle") .long("throttle")
.help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)")) .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send> { pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send> {

View file

@ -23,6 +23,7 @@ use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
pub mod filter; pub mod filter;
pub mod relay; pub mod relay;
pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than

79
src/commands/send.rs Normal file
View file

@ -0,0 +1,79 @@
use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{
future,
prelude::*
};
use hyper::{
Error as HyperError,
Method,
client::{
Config,
Request
}
};
use tokio_core::reactor::{
Handle
};
use super::{
stdin_stream,
to_hyper_error
};
use webmetro::{
chunk::{
Chunk,
WebmStream
},
error::WebmetroError,
fixers::ChunkStream,
stream_parser::StreamEbml
};
pub fn options() -> App<'static, 'static> {
SubCommand::with_name("send")
.about("PUTs WebM from stdin to a relay server.")
.arg(Arg::with_name("url")
.help("The location to upload to")
.required(true))
.arg(Arg::with_name("throttle")
.long("throttle")
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
}
type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError>>;
type BoxedHyperStream = Box<Stream<Item = Chunk, Error = HyperError>>;
pub fn run(handle: Handle, args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError>> {
let mut chunk_stream: BoxedChunkStream = Box::new(
stdin_stream()
.parse_ebml()
.chunk_webm()
.fix_timecodes()
);
let url_str = match args.value_of("url") {
Some(url) => String::from(url),
_ => return Box::new(Err(WebmetroError::from_str("Listen address wasn't provided")).into_future())
};
if args.is_present("throttle") {
chunk_stream = Box::new(chunk_stream.throttle());
}
let request_body_stream = Box::new(chunk_stream.map_err(to_hyper_error)) as BoxedHyperStream;
Box::new(future::lazy(move || {
url_str.parse().map_err(WebmetroError::from_err)
}).and_then(move |uri| {
let client = Config::default()
.body::<BoxedHyperStream>()
.build(&handle);
let mut request: Request<BoxedHyperStream> = Request::new(Method::Put, uri);
request.set_body(request_body_stream);
client.request(request)
.map(|_response| ())
.map_err(WebmetroError::from_err)
}))
}

View file

@ -16,6 +16,7 @@ use webmetro::error::WebmetroError;
use commands::{ use commands::{
relay, relay,
filter, filter,
send,
dump dump
}; };
@ -23,9 +24,11 @@ fn options() -> App<'static, 'static> {
App::new("webmetro") App::new("webmetro")
.version(crate_version!()) .version(crate_version!())
.about("Utilities for broadcasting & relaying live WebM video/audio streams") .about("Utilities for broadcasting & relaying live WebM video/audio streams")
.setting(AppSettings::DisableHelpSubcommand)
.setting(AppSettings::VersionlessSubcommands) .setting(AppSettings::VersionlessSubcommands)
.subcommand(relay::options()) .subcommand(relay::options())
.subcommand(filter::options()) .subcommand(filter::options())
.subcommand(send::options())
.subcommand(dump::options()) .subcommand(dump::options())
} }
@ -38,6 +41,7 @@ fn main() {
tokio_run(core, match args.subcommand() { tokio_run(core, match args.subcommand() {
("filter", Some(sub_args)) => box_up(filter::run(sub_args)), ("filter", Some(sub_args)) => box_up(filter::run(sub_args)),
("relay", Some(sub_args)) => box_up(relay::run(sub_args)), ("relay", Some(sub_args)) => box_up(relay::run(sub_args)),
("send", Some(sub_args)) => box_up(send::run(handle, sub_args)),
("dump", Some(sub_args)) => box_up(dump::run(sub_args)), ("dump", Some(sub_args)) => box_up(dump::run(sub_args)),
_ => box_up(futures::lazy(|| { _ => box_up(futures::lazy(|| {
options().print_help().unwrap(); options().print_help().unwrap();