Port send subcommand to core futures & alpha hyper (fixes --throttle)

This commit is contained in:
Tangent 128 2019-10-16 00:15:44 -04:00
parent 8de9ffdb21
commit 32c72e1ee8
4 changed files with 125 additions and 49 deletions

89
Cargo.lock generated
View file

@ -337,6 +337,26 @@ dependencies = [
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "h2"
version = "0.2.0-alpha.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "headers"
version = "0.2.3"
@ -382,6 +402,15 @@ dependencies = [
"tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "http-body"
version = "0.2.0-alpha.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "httparse"
version = "1.3.4"
@ -416,6 +445,36 @@ dependencies = [
"want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "hyper"
version = "0.13.0-alpha.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-channel-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"http-body 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-net 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-make 0.3.0-alpha.2a (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)",
"want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "idna"
version = "0.2.0"
@ -1436,6 +1495,20 @@ dependencies = [
"tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tower-make"
version = "0.3.0-alpha.2a"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tower-service"
version = "0.3.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tracing"
version = "0.1.9"
@ -1589,6 +1662,15 @@ dependencies = [
"try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "warp"
version = "0.1.20"
@ -1635,6 +1717,7 @@ dependencies = [
"futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
"matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1726,12 +1809,15 @@ dependencies = [
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
"checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571"
"checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462"
"checksum h2 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0f107db1419ef8271686187b1a5d47c6431af4a7f4d98b495e7b7fc249bb0a78"
"checksum headers 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "882ca7d8722f33ce2c2db44f95425d6267ed59ca96ce02acbe58320054ceb642"
"checksum headers-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "967131279aaa9f7c20c7205b45a391638a83ab118e6509b2d0ccbe08de044237"
"checksum http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "372bcb56f939e449117fb0869c2e8fd8753a8223d92a172c6e808cf123a5b6e4"
"checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d"
"checksum http-body 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1f3aef6f3de2bd8585f5b366f3f550b5774500b4764d00cf00f903c95749eec3"
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
"checksum hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)" = "9dbe6ed1438e1f8ad955a4701e9a944938e9519f6888d12d8558b645e247d5f6"
"checksum hyper 0.13.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "2d05aa523087ac0b9d8b93dd80d5d482a697308ed3b0dca7b0667511a7fa7cdc"
"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
"checksum indexmap 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a61202fbe46c4a951e9404a720a0180bcf3212c750d735cb5c4ba4dc551299f3"
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
@ -1842,6 +1928,8 @@ dependencies = [
"checksum tokio-timer 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b97c1587fe71018eb245a4a9daa13a5a3b681bbc1f7fdadfe24720e141472c13"
"checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b"
"checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445"
"checksum tower-make 0.3.0-alpha.2a (registry+https://github.com/rust-lang/crates.io-index)" = "316d47dd40cde4ac5d88110eaf9a10a4e2a68612d9c056cd2aa24e37dcb484cd"
"checksum tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)" = "63ff37396cd966ce43bea418bfa339f802857495f797dafa00bea5b7221ebdfa"
"checksum tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c21ff9457accc293386c20e8f754d0b059e67e325edf2284f04230d125d7e5ff"
"checksum tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3ff978fd9c9afe2cc9c671e247713421c6406b3422305cbdce5de695d3ab4c3c"
"checksum tracing-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "528c8ebaaa16cdac34795180b046c031775b0d56402704d98c096788f33d646a"
@ -1862,6 +1950,7 @@ dependencies = [
"checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
"checksum want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230"
"checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
"checksum warp 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "3921463c44f680d24f1273ea55efd985f31206a22a02dee207a2ec72684285ca"
"checksum wasi 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"
"checksum weak-table 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a5862bb244c852a56c6f3c39668ff181271bda44513ef30d2073a3eedd9898d"

View file

@ -12,6 +12,7 @@ futures = "0.1.29"
futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
http = "0.1.18"
hyper = "0.12.35"
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
matches = "0.1.8"
odds = { version = "0.3.1", features = ["std-vec"] }
tokio = "0.1.22"

View file

@ -1,33 +1,15 @@
use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{
prelude::*
};
use futures3::prelude::*;
use futures3::compat::{
Compat,
};
use hyper::{
Body,
Client,
client::HttpConnector,
Request
};
use tokio::runtime::Runtime;
use hyper13::{client::HttpConnector, Body, Client, Request};
use std::io::{stdout, Write};
use tokio2::runtime::Runtime;
use super::{
stdin_stream
};
use super::stdin_stream;
use webmetro::{
chunk::{
Chunk,
WebmStream
},
chunk::{Chunk, WebmStream},
error::WebmetroError,
fixers::{
ChunkTimecodeFixer,
Throttle,
},
stream_parser::StreamEbml
fixers::{ChunkTimecodeFixer, Throttle},
stream_parser::StreamEbml,
};
pub fn options() -> App<'static, 'static> {
@ -41,46 +23,49 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
}
type BoxedChunkStream = Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin>;
type BoxedChunkStream = Box<
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
+ Send
+ Sync
+ Unpin,
>;
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: BoxedChunkStream = Box::new(
stdin_stream()
.parse_ebml()
.chunk_webm()
.map_ok(move |chunk| timecode_fixer.process(chunk))
.parse_ebml()
.chunk_webm()
.map_ok(move |chunk| timecode_fixer.process(chunk)),
);
let url_str = match args.value_of("url") {
Some(url) => String::from(url),
_ => return Err("Listen address wasn't provided".into())
_ => return Err("Listen address wasn't provided".into()),
};
if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream));
}
let request_payload = Body::wrap_stream(Compat::new(chunk_stream.map_ok(
|webm_chunk| webm_chunk.into_bytes()
).map_err(|err| {
eprintln!("{}", &err);
err
})));
let chunk_stream = chunk_stream
.map_ok(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err| {
eprintln!("{}", &err);
err
});
let request_payload = Body::wrap_stream(chunk_stream);
let request = Request::put(url_str)
.body(request_payload)
.map_err(WebmetroError::from)?;
let request = Request::put(url_str).body(request_payload)?;
let client = Client::builder().build(HttpConnector::new());
let client = Client::builder().build(HttpConnector::new(1));
let future = client.request(request)
.and_then(|response| {
response.into_body().for_each(|_chunk| {
Ok(())
})
Runtime::new().unwrap().block_on(async {
let response = client.request(request).await?;
let mut response_stream = response.into_body();
while let Some(response_chunk) = response_stream.next().await.transpose()? {
stdout().write_all(&response_chunk)?;
}
Ok(())
})
.map_err(WebmetroError::from);
Runtime::new().unwrap().block_on(future)
}

View file

@ -6,6 +6,7 @@ custom_error!{pub WebmetroError
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
HttpError{source: http::Error} = "HTTP error: {source}",
HyperError{source: hyper::Error} = "Hyper error: {source}",
Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
IoError{source: std::io::Error} = "IO error: {source}",
TimerError{source: tokio::timer::Error} = "Timer error: {source}",
WarpError{source: warp::Error} = "Warp error: {source}",