From 3bc46210e4b190aa4a3d1131b3c3b2ae6b8d70b9 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Fri, 11 Sep 2020 22:57:23 -0400 Subject: [PATCH] Teach send subcommand to recognize --skip and --take options --- CHANGELOG.md | 1 + src/chunk.rs | 9 +++++++++ src/commands/mod.rs | 18 ++++++++++++++++-- src/commands/send.rs | 29 ++++++++++++++++++++++------- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0ffb37..808f144 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ - forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs. - Add INFO logging for channel creation/garbage-collection - Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start) +- Teach send subcommand to recognize --skip and --take options ## v0.3.0 - update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1 diff --git a/src/chunk.rs b/src/chunk.rs index cc0838d..69dca20 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -67,6 +67,15 @@ pub enum Chunk { Empty } +impl Chunk { + pub fn overlaps(&self, start: u128, stop: u128) -> bool { + match self { + Chunk::Cluster(head, _) => head.start as u128 <= stop && head.end as u128 >= start, + _ => true, + } + } +} + // TODO: make an external iterator type so we can remove Chunk::RemainingBody & Chunk::Empty impl Iterator for Chunk { type Item = Bytes; diff --git a/src/commands/mod.rs b/src/commands/mod.rs index c442c07..662a1de 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,6 +1,9 @@ +use std::time::Duration; + use bytes::Bytes; use futures::{Stream, TryStreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; +use webmetro::error::WebmetroError; pub mod dump; pub mod filter; @@ -11,6 +14,17 @@ pub mod send; /// is NOT actually async, and just uses blocking read. Don't use more than /// one at once, who knows who gets which bytes. pub fn stdin_stream() -> impl Stream> + Sized + Unpin { - FramedRead::new(tokio::io::stdin(), BytesCodec::new()) - .map_ok(|bytes| bytes.freeze()) + FramedRead::new(tokio::io::stdin(), BytesCodec::new()).map_ok(|bytes| bytes.freeze()) +} + +pub fn parse_time(arg: Option<&str>) -> Result, WebmetroError> { + match arg { + Some(string) => match string.parse() { + Ok(secs) => Ok(Some(Duration::from_secs(secs))), + Err(err) => Err(WebmetroError::ApplicationError { + message: err.to_string(), + }), + }, + None => Ok(None), + } } diff --git a/src/commands/send.rs b/src/commands/send.rs index a65395a..26344e4 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -5,7 +5,7 @@ use hyper::{client::HttpConnector, Body, Client, Request}; use std::io::{stdout, Write}; use stream::iter; -use super::stdin_stream; +use super::{parse_time, stdin_stream}; use webmetro::{ chunk::{Chunk, WebmStream}, error::WebmetroError, @@ -22,26 +22,41 @@ pub fn options() -> App<'static, 'static> { .arg(Arg::with_name("throttle") .long("throttle") .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) + .arg(Arg::with_name("skip") + .takes_value(true) + .short("s") + .long("skip") + .help("Skip approximately n seconds of content before uploading or throttling")) + .arg(Arg::with_name("take") + .takes_value(true) + .short("t") + .long("take") + .help("Stop uploading after approximately n seconds of content")) } type BoxedChunkStream = Box> + Send + Sync + Unpin>; #[tokio::main] pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { + // parse args + let url_str = match args.value_of("url") { + Some(url) => String::from(url), + _ => return Err("Listen address wasn't provided".into()), + }; + + let start_time = parse_time(args.value_of("skip"))?.map_or(0, |s| s.as_millis()); + let stop_time = parse_time(args.value_of("take"))?.map_or(std::u128::MAX, |t| t.as_millis() + start_time); + // build pipeline let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut chunk_stream: BoxedChunkStream = Box::new( stdin_stream() .parse_ebml() .chunk_webm() - .map_ok(move |chunk| timecode_fixer.process(chunk)), + .map_ok(move |chunk| timecode_fixer.process(chunk)) + .try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))), ); - let url_str = match args.value_of("url") { - Some(url) => String::from(url), - _ => return Err("Listen address wasn't provided".into()), - }; - if args.is_present("throttle") { chunk_stream = Box::new(Throttle::new(chunk_stream)); }