Update Tokio/Hyper/Warp-related dependencies

This commit is contained in:
Tangent Wantwight 2022-05-22 00:56:53 -04:00
parent fc4e0577c8
commit 15520f26bd
6 changed files with 527 additions and 693 deletions

1114
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,18 +6,17 @@ edition = "2018"
[dependencies] [dependencies]
byteorder = "1" byteorder = "1"
bytes = "^0.5" bytes = "1"
clap = "^2.33" clap = "^2.33"
custom_error = "^1.7" custom_error = "^1.7"
env_logger = "^0.9" env_logger = "^0.9"
futures = "^0.3" futures = "^0.3"
http = "^0.2" http = "^0.2"
hyper = "^0.13" hyper = "^0.14"
log = "^0.4.8" log = "^0.4.8"
matches = "^0.1" matches = "^0.1"
tokio = { version="^0.2", features = ["io-std", "tcp", "macros", "rt-threaded", "time"] } pin-project = "1"
tokio-util = "^0.3" tokio = { version="^1.18", features = ["io-std", "macros", "net", "rt", "rt-multi-thread", "time"] }
#tokio = { version="^1.18", features = ["io-std", "macros", "net", "rt", "time"] } tokio-util = { version="^0.7", features=["codec"] }
#tokio-util = { version="^0.6.9", features=["codec"] } warp = "^0.3"
warp = "^0.2"
weak-table = "^0.3" weak-table = "^0.3"

View File

@ -1,4 +1,4 @@
use std::{io, io::prelude::*}; use std::{io, io::prelude::*, pin::Pin};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
@ -22,8 +22,8 @@ pub fn options() -> App<'static, 'static> {
#[tokio::main] #[tokio::main]
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Unpin> = let mut chunk_stream: Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send>> =
Box::new( Box::pin(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
@ -31,7 +31,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
); );
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream)); chunk_stream = Box::pin(Throttle::new(chunk_stream));
} }
while let Some(chunk) = chunk_stream.next().await { while let Some(chunk) = chunk_stream.next().await {

View File

@ -2,7 +2,7 @@ use bytes::Bytes;
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
use hyper::{client::HttpConnector, Body, Client, Request}; use hyper::{client::HttpConnector, Body, Client, Request};
use std::io::{stdout, Write}; use std::{io::{stdout, Write}, pin::Pin};
use stream::iter; use stream::iter;
use super::{parse_time, stdin_stream}; use super::{parse_time, stdin_stream};
@ -34,7 +34,7 @@ pub fn options() -> App<'static, 'static> {
.help("Stop uploading after approximately n seconds of content")) .help("Stop uploading after approximately n seconds of content"))
} }
type BoxedChunkStream = Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync + Unpin>; type BoxedChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync>>;
#[tokio::main] #[tokio::main]
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
@ -49,7 +49,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
// build pipeline // build pipeline
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: BoxedChunkStream = Box::new( let mut chunk_stream: BoxedChunkStream = Box::pin(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
@ -58,7 +58,7 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
); );
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream)); chunk_stream = Box::pin(Throttle::new(chunk_stream));
} }
let chunk_stream = chunk_stream let chunk_stream = chunk_stream

View File

@ -263,7 +263,7 @@ pub trait FromEbml<'a>: Sized {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::{BytesMut, buf::ext::BufMutExt}; use bytes::BytesMut;
use crate::ebml::*; use crate::ebml::*;
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId}; use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
use crate::ebml::Varint::{Unknown, Value}; use crate::ebml::Varint::{Unknown, Value};

View File

@ -1,23 +1,16 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{ use std::task::{Context, Poll};
Context,
Poll
};
use futures::prelude::*; use futures::prelude::*;
use tokio::time::{ use pin_project::pin_project;
delay_until, use tokio::time::{sleep_until, Duration, Instant, Sleep};
Delay,
Duration,
Instant,
};
use crate::chunk::Chunk; use crate::chunk::Chunk;
pub struct ChunkTimecodeFixer { pub struct ChunkTimecodeFixer {
current_offset: u64, current_offset: u64,
last_observed_timecode: u64, last_observed_timecode: u64,
assumed_duration: u64 assumed_duration: u64,
} }
impl ChunkTimecodeFixer { impl ChunkTimecodeFixer {
@ -25,7 +18,7 @@ impl ChunkTimecodeFixer {
ChunkTimecodeFixer { ChunkTimecodeFixer {
current_offset: 0, current_offset: 0,
last_observed_timecode: 0, last_observed_timecode: 0,
assumed_duration: 33 assumed_duration: 33,
} }
} }
pub fn process(&mut self, mut chunk: Chunk) -> Chunk { pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
@ -49,14 +42,16 @@ impl ChunkTimecodeFixer {
pub struct StartingPointFinder<S> { pub struct StartingPointFinder<S> {
stream: S, stream: S,
seen_header: bool, seen_header: bool,
seen_keyframe: bool seen_keyframe: bool,
} }
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> {
{
type Item = Result<Chunk, S::Error>; type Item = Result<Chunk, S::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> { fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
loop { loop {
return match self.stream.try_poll_next_unpin(cx) { return match self.stream.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => { Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => {
@ -69,8 +64,8 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
} else { } else {
continue; continue;
} }
}, }
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => { chunk @ Poll::Ready(Some(Ok(Chunk::Headers { .. }))) => {
if self.seen_header { if self.seen_header {
// new stream starting, we don't need a new header but should wait for a safe spot to resume // new stream starting, we don't need a new header but should wait for a safe spot to resume
self.seen_keyframe = false; self.seen_keyframe = false;
@ -79,17 +74,20 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
self.seen_header = true; self.seen_header = true;
chunk chunk
} }
}, }
chunk => chunk chunk => chunk,
} };
}; }
} }
} }
#[pin_project]
pub struct Throttle<S> { pub struct Throttle<S> {
#[pin]
stream: S, stream: S,
start_time: Option<Instant>, start_time: Option<Instant>,
sleep: Delay #[pin]
sleep: Sleep,
} }
impl<S> Throttle<S> { impl<S> Throttle<S> {
@ -98,36 +96,45 @@ impl<S> Throttle<S> {
Throttle { Throttle {
stream: wrap, stream: wrap,
start_time: None, start_time: None,
sleep: delay_until(now) sleep: sleep_until(now),
} }
} }
} }
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S> {
{
type Item = Result<Chunk, S::Error>; type Item = Result<Chunk, S::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> { fn poll_next(
match self.sleep.poll_unpin(cx) { self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
let mut this = self.project();
match this.sleep.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(()) => { /* can continue */ }, Poll::Ready(()) => { /* can continue */ }
} }
let next_chunk = self.stream.try_poll_next_unpin(cx); let next_chunk = this.stream.try_poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk { if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk {
let offset = Duration::from_millis(cluster_head.end); let offset = Duration::from_millis(cluster_head.end);
// we have actual data, so start the clock if we haven't yet; // we have actual data, so start the clock if we haven't yet;
// if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero // if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero
let start_time = self.start_time.get_or_insert_with(|| Instant::now() - offset); let start_time = this
.start_time
.get_or_insert_with(|| Instant::now() - offset);
// snooze until real time has "caught up" to the stream // snooze until real time has "caught up" to the stream
let sleep_until = *start_time + offset; let sleep_until = *start_time + offset;
self.sleep.reset(sleep_until); this.sleep.reset(sleep_until);
} }
next_chunk next_chunk
} }
} }
pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> { pub trait ChunkStream
where
Self: Sized + TryStream<Ok = Chunk>,
{
/*fn fix_timecodes(self) -> Map<_> { /*fn fix_timecodes(self) -> Map<_> {
let fixer = ; let fixer = ;
self.map(move |chunk| { self.map(move |chunk| {
@ -140,7 +147,7 @@ pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
StartingPointFinder { StartingPointFinder {
stream: self, stream: self,
seen_header: false, seen_header: false,
seen_keyframe: false seen_keyframe: false,
} }
} }