diff --git a/src/chunk.rs b/src/chunk.rs index 2813ce3..bbd5df4 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,8 +1,11 @@ use bytes::{Buf, Bytes}; -use futures::{Async, Stream}; +use futures::{Async}; +use futures3::prelude::*; use std::{ io::Cursor, - mem + mem, + pin::Pin, + task::{Context, Poll, Poll::*}, }; use crate::stream_parser::EbmlStreamingParser; use crate::error::WebmetroError; @@ -128,22 +131,22 @@ fn encode(element: WebmElement, buffer: &mut Cursor>, limit: Option> Stream for WebmChunker +impl> + Unpin> Stream for WebmChunker { - type Item = Chunk; - type Error = WebmetroError; + type Item = Result; - fn poll(&mut self) -> Result>, WebmetroError> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + let mut chunker = self.get_mut(); loop { let mut return_value = None; let mut new_state = None; - match self.state { + match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { - match self.source.poll_event() { - Err(passthru) => return Err(passthru.into()), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + match chunker.source.poll_event(cx) { + Err(passthru) => return Ready(Some(Err(passthru))), + Ok(Async::NotReady) => return Pending, + Ok(Async::Ready(None)) => return Ready(None), Ok(Async::Ready(Some(WebmElement::Cluster))) => { let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; @@ -158,7 +161,7 @@ impl> Stream for WebmChunker< Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { return_value = Some(Err(err)); new_state = Some(ChunkerState::End); }); @@ -166,16 +169,16 @@ impl> Stream for WebmChunker< } }, ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { - match self.source.poll_event() { - Err(passthru) => return Err(passthru.into()), - Ok(Async::NotReady) => return Ok(Async::NotReady), + match chunker.source.poll_event(cx) { + Err(passthru) => return Ready(Some(Err(passthru))), + Ok(Async::NotReady) => return Pending, Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let mut new_header_cursor = Cursor::new(Vec::new()); - match encode(element, &mut new_header_cursor, self.buffer_size_limit) { + match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { Ok(_) => { return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ @@ -205,7 +208,7 @@ impl> Stream for WebmChunker< cluster_head.keyframe = true; } cluster_head.observe_simpleblock_timecode(block.timecode); - encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| { + encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { return_value = Some(Err(err)); new_state = Some(ChunkerState::End); }); @@ -214,7 +217,7 @@ impl> Stream for WebmChunker< Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { return_value = Some(Err(err)); new_state = Some(ChunkerState::End); }); @@ -252,14 +255,19 @@ impl> Stream for WebmChunker< return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); new_state = Some(ChunkerState::End); }, - ChunkerState::End => return Ok(Async::Ready(None)) + ChunkerState::End => return Ready(None) }; if let Some(new_state) = new_state { - self.state = new_state; + chunker.state = new_state; } if let Some(return_value) = return_value { - return return_value; + return match return_value { + Ok(Async::Ready(Some(chunk))) => Ready(Some(Ok(chunk))), + Ok(Async::Ready(None)) => Ready(None), + Ok(Async::NotReady) => Pending, + Err(err) => Ready(Some(Err(err))), + }; } } } diff --git a/src/commands/dump.rs b/src/commands/dump.rs index 8b691e6..55b8d69 100644 --- a/src/commands/dump.rs +++ b/src/commands/dump.rs @@ -1,5 +1,7 @@ use clap::{App, AppSettings, ArgMatches, SubCommand}; -use futures::prelude::*; +use futures::Async; +use futures3::future::{FutureExt, poll_fn}; +use std::task::Poll; use super::stdin_stream; use webmetro::{ @@ -21,15 +23,17 @@ pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> { let mut events = stdin_stream().parse_ebml(); - // stdin is sync so Async::NotReady will never happen - while let Ok(Async::Ready(Some(element))) = events.poll_event() { - match element { - // suppress printing byte arrays - Tracks(slice) => println!("Tracks[{}]", slice.len()), - SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), - other => println!("{:?}", other) + Ok(poll_fn(|cx| { + // stdin is sync so Async::NotReady will never happen on this tokio version + while let Ok(Async::Ready(Some(element))) = events.poll_event(cx) { + match element { + // suppress printing byte arrays + Tracks(slice) => println!("Tracks[{}]", slice.len()), + SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), + other => println!("{:?}", other) + } } - } - Ok(()) + Poll::Ready(()) + }).now_or_never().expect("Stdin should never go async")) } diff --git a/src/commands/filter.rs b/src/commands/filter.rs index c5d907d..40c271f 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -4,12 +4,9 @@ use std::{ }; use clap::{App, Arg, ArgMatches, SubCommand}; -use futures::prelude::*; -use futures3::compat::{ - Compat, - Compat01As03 -}; -use tokio::runtime::Runtime; +use futures3::prelude::*; +use futures3::future::ready; +use tokio2::runtime::Runtime; use super::stdin_stream; use webmetro::{ @@ -19,8 +16,8 @@ use webmetro::{ }, error::WebmetroError, fixers::{ - ChunkStream, ChunkTimecodeFixer, + Throttle, }, stream_parser::StreamEbml }; @@ -35,18 +32,18 @@ pub fn options() -> App<'static, 'static> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let mut timecode_fixer = ChunkTimecodeFixer::new(); - let mut chunk_stream: Box + Send> = Box::new( + let mut chunk_stream: Box, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new( stdin_stream() .parse_ebml() .chunk_webm() - .map(move |chunk| timecode_fixer.process(chunk)) + .map_ok(move |chunk| timecode_fixer.process(chunk)) ); if args.is_present("throttle") { - chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle())); + chunk_stream = Box::new(Throttle::new(chunk_stream)); } - Runtime::new().unwrap().block_on(chunk_stream.for_each(|chunk| { - io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from) + Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { + ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)) })) } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 3acf871..f59349e 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,15 +1,7 @@ -use std::io::stdin; +use std::io::Cursor; -use bytes::{ - Buf, - IntoBuf -}; -use futures::prelude::*; -use tokio_io::io::AllowStdIo; -use tokio_codec::{ - BytesCodec, - FramedRead -}; +use bytes::Bytes; +use futures3::TryStreamExt; use webmetro::error::WebmetroError; pub mod dump; @@ -20,8 +12,13 @@ pub mod send; /// An adapter that makes chunks of bytes from stdin available as a Stream; /// is NOT actually async, and just uses blocking read. Don't use more than /// one at once, who knows who gets which bytes. -pub fn stdin_stream() -> impl Stream { - FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) - .map(|bytes| bytes.into_buf()) - .map_err(WebmetroError::from) +pub fn stdin_stream() -> impl futures3::TryStream< + Item = Result, WebmetroError>, + Ok = Cursor, + Error = WebmetroError, +> + Sized + + Unpin { + tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new()) + .map_ok(|bytes| Cursor::new(bytes.freeze())) + .map_err(WebmetroError::from) } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index ae4820c..fd1c4bc 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -17,6 +17,7 @@ use futures3::{ compat::{ Compat, CompatSink, + Compat01As03, }, Never, prelude::*, @@ -66,13 +67,13 @@ fn get_stream(channel: Handle) -> impl Stream) -> impl Stream { - let source = stream - .map_err(WebmetroError::from) + let source = Compat01As03::new(stream + .map_err(WebmetroError::from)) .parse_ebml().with_soft_limit(BUFFER_LIMIT) .chunk_webm().with_soft_limit(BUFFER_LIMIT); let sink = CompatSink::new(Transmitter::new(channel)); - source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) + Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) .into_stream() .map(|_| empty()) .map_err(|err| { diff --git a/src/commands/send.rs b/src/commands/send.rs index 4bf9fa3..eb02d7a 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -2,9 +2,9 @@ use clap::{App, Arg, ArgMatches, SubCommand}; use futures::{ prelude::* }; +use futures3::prelude::*; use futures3::compat::{ Compat, - Compat01As03 }; use hyper::{ Body, @@ -24,8 +24,8 @@ use webmetro::{ }, error::WebmetroError, fixers::{ - ChunkStream, ChunkTimecodeFixer, + Throttle, }, stream_parser::StreamEbml }; @@ -41,7 +41,7 @@ pub fn options() -> App<'static, 'static> { .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) } -type BoxedChunkStream = Box + Send>; +type BoxedChunkStream = Box, Ok = Chunk, Error = WebmetroError> + Send + Unpin>; pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let mut timecode_fixer = ChunkTimecodeFixer::new(); @@ -49,7 +49,7 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { stdin_stream() .parse_ebml() .chunk_webm() - .map(move |chunk| timecode_fixer.process(chunk)) + .map_ok(move |chunk| timecode_fixer.process(chunk)) ); let url_str = match args.value_of("url") { @@ -58,15 +58,15 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { }; if args.is_present("throttle") { - chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle())); + chunk_stream = Box::new(Throttle::new(chunk_stream)); } - let request_payload = Body::wrap_stream(chunk_stream.map( + let request_payload = Body::wrap_stream(Compat::new(chunk_stream.map_ok( |webm_chunk| webm_chunk.into_bytes() ).map_err(|err| { eprintln!("{}", &err); err - })); + }))); let request = Request::put(url_str) diff --git a/src/stream_parser.rs b/src/stream_parser.rs index b0bfe20..ed4a0a4 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -1,5 +1,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::{stream::Stream, Async}; +use futures::Async; +use futures3::stream::{Stream, StreamExt, TryStream}; +use std::task::{Context, Poll}; use crate::ebml::FromEbml; use crate::error::WebmetroError; @@ -21,10 +23,10 @@ impl EbmlStreamingParser { } } -pub trait StreamEbml +pub trait StreamEbml: Sized + TryStream + Unpin where - Self: Sized + Stream, - Self::Item: Buf, + Self: Sized + TryStream + Unpin, + Self::Ok: Buf, { fn parse_ebml(self) -> EbmlStreamingParser { EbmlStreamingParser { @@ -36,11 +38,12 @@ where } } -impl> StreamEbml for S {} +impl> + Unpin> StreamEbml for S {} -impl> EbmlStreamingParser { +impl> + Unpin> EbmlStreamingParser { pub fn poll_event<'a, T: FromEbml<'a>>( &'a mut self, + cx: &mut Context, ) -> Result>, WebmetroError> { loop { match T::check_space(&self.buffer)? { @@ -64,13 +67,14 @@ impl> EbmlStreamingParser } } - match self.stream.poll()? { - Async::Ready(Some(buf)) => { + match self.stream.poll_next_unpin(cx)? { + Poll::Ready(Some(buf)) => { self.buffer.reserve(buf.remaining()); self.buffer.put(buf); // ok can retry decoding now } - other => return Ok(other.map(|_| None)), + Poll::Ready(None) => return Ok(Async::Ready(None)), + Poll::Pending => return Ok(Async::NotReady), } } } @@ -79,8 +83,12 @@ impl> EbmlStreamingParser #[cfg(test)] mod tests { use bytes::IntoBuf; - use futures::prelude::*; use futures::Async::*; + use futures3::{ + future::poll_fn, + stream::StreamExt, + FutureExt, + }; use matches::assert_matches; use crate::stream_parser::*; @@ -89,47 +97,53 @@ mod tests { #[test] fn stream_webm_test() { - let pieces = vec![ - &ENCODE_WEBM_TEST_FILE[0..20], - &ENCODE_WEBM_TEST_FILE[20..40], - &ENCODE_WEBM_TEST_FILE[40..], - ]; + poll_fn(|cx| { + let pieces = vec![ + &ENCODE_WEBM_TEST_FILE[0..20], + &ENCODE_WEBM_TEST_FILE[20..40], + &ENCODE_WEBM_TEST_FILE[40..], + ]; - let mut stream_parser = futures::stream::iter_ok(pieces.iter()) - .map(|bytes| bytes.into_buf()) - .parse_ebml(); + let mut stream_parser = futures3::stream::iter(pieces.iter()) + .map(|bytes| Ok(bytes.into_buf())) + .parse_ebml(); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::EbmlHead))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Segment))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Tracks(_)))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Cluster))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Timecode(0)))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::SimpleBlock(_)))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Cluster))) - ); - assert_matches!( - stream_parser.poll_event(), - Ok(Ready(Some(WebmElement::Timecode(1000)))) - ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::EbmlHead))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Segment))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Tracks(_)))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Cluster))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Timecode(0)))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::SimpleBlock(_)))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Cluster))) + ); + assert_matches!( + stream_parser.poll_event(cx), + Ok(Ready(Some(WebmElement::Timecode(1000)))) + ); + + std::task::Poll::Ready(()) + }) + .now_or_never() + .expect("Test succeeded without blocking"); } }