Convert parser & chunker APIs to work with Futures 0.3-style streams

This commit is contained in:
Tangent 128 2019-10-21 03:18:51 -04:00
parent eda2e4f7be
commit bc8e45936b
7 changed files with 139 additions and 118 deletions

View file

@ -1,8 +1,11 @@
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use futures::{Async, Stream}; use futures::{Async};
use futures3::prelude::*;
use std::{ use std::{
io::Cursor, io::Cursor,
mem mem,
pin::Pin,
task::{Context, Poll, Poll::*},
}; };
use crate::stream_parser::EbmlStreamingParser; use crate::stream_parser::EbmlStreamingParser;
use crate::error::WebmetroError; use crate::error::WebmetroError;
@ -128,22 +131,22 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
encode_webm_element(element, buffer).map_err(|err| err.into()) encode_webm_element(element, buffer).map_err(|err| err.into())
} }
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<S> impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
{ {
type Item = Chunk; type Item = Result<Chunk, WebmetroError>;
type Error = WebmetroError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut();
loop { loop {
let mut return_value = None; let mut return_value = None;
let mut new_state = None; let mut new_state = None;
match self.state { match chunker.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
match self.source.poll_event() { match chunker.source.poll_event(cx) {
Err(passthru) => return Err(passthru.into()), Err(passthru) => return Ready(Some(Err(passthru))),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Pending,
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(None)) => return Ready(None),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
@ -158,7 +161,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
return_value = Some(Err(err)); return_value = Some(Err(err));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
}); });
@ -166,16 +169,16 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
} }
}, },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match self.source.poll_event() { match chunker.source.poll_event(cx) {
Err(passthru) => return Err(passthru.into()), Err(passthru) => return Ready(Some(Err(passthru))),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Pending,
Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) Ok(Async::Ready(Some(element @ WebmElement::EbmlHead)))
| Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, self.buffer_size_limit) { match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
Ok(_) => { Ok(_) => {
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))));
new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{
@ -205,7 +208,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| { encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
return_value = Some(Err(err)); return_value = Some(Err(err));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
}); });
@ -214,7 +217,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
return_value = Some(Err(err)); return_value = Some(Err(err));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
}); });
@ -252,14 +255,19 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
}, },
ChunkerState::End => return Ok(Async::Ready(None)) ChunkerState::End => return Ready(None)
}; };
if let Some(new_state) = new_state { if let Some(new_state) = new_state {
self.state = new_state; chunker.state = new_state;
} }
if let Some(return_value) = return_value { if let Some(return_value) = return_value {
return return_value; return match return_value {
Ok(Async::Ready(Some(chunk))) => Ready(Some(Ok(chunk))),
Ok(Async::Ready(None)) => Ready(None),
Ok(Async::NotReady) => Pending,
Err(err) => Ready(Some(Err(err))),
};
} }
} }
} }

View file

@ -1,5 +1,7 @@
use clap::{App, AppSettings, ArgMatches, SubCommand}; use clap::{App, AppSettings, ArgMatches, SubCommand};
use futures::prelude::*; use futures::Async;
use futures3::future::{FutureExt, poll_fn};
use std::task::Poll;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -21,15 +23,17 @@ pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
let mut events = stdin_stream().parse_ebml(); let mut events = stdin_stream().parse_ebml();
// stdin is sync so Async::NotReady will never happen Ok(poll_fn(|cx| {
while let Ok(Async::Ready(Some(element))) = events.poll_event() { // stdin is sync so Async::NotReady will never happen on this tokio version
match element { while let Ok(Async::Ready(Some(element))) = events.poll_event(cx) {
// suppress printing byte arrays match element {
Tracks(slice) => println!("Tracks[{}]", slice.len()), // suppress printing byte arrays
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), Tracks(slice) => println!("Tracks[{}]", slice.len()),
other => println!("{:?}", other) SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
other => println!("{:?}", other)
}
} }
}
Ok(()) Poll::Ready(())
}).now_or_never().expect("Stdin should never go async"))
} }

View file

@ -4,12 +4,9 @@ use std::{
}; };
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures3::prelude::*;
use futures3::compat::{ use futures3::future::ready;
Compat, use tokio2::runtime::Runtime;
Compat01As03
};
use tokio::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -19,8 +16,8 @@ use webmetro::{
}, },
error::WebmetroError, error::WebmetroError,
fixers::{ fixers::{
ChunkStream,
ChunkTimecodeFixer, ChunkTimecodeFixer,
Throttle,
}, },
stream_parser::StreamEbml stream_parser::StreamEbml
}; };
@ -35,18 +32,18 @@ pub fn options() -> App<'static, 'static> {
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: Box<dyn Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new( let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.map(move |chunk| timecode_fixer.process(chunk)) .map_ok(move |chunk| timecode_fixer.process(chunk))
); );
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle())); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
Runtime::new().unwrap().block_on(chunk_stream.for_each(|chunk| { Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| {
io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from) ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from))
})) }))
} }

View file

@ -1,15 +1,7 @@
use std::io::stdin; use std::io::Cursor;
use bytes::{ use bytes::Bytes;
Buf, use futures3::TryStreamExt;
IntoBuf
};
use futures::prelude::*;
use tokio_io::io::AllowStdIo;
use tokio_codec::{
BytesCodec,
FramedRead
};
use webmetro::error::WebmetroError; use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
@ -20,8 +12,13 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> impl Stream<Item = impl Buf, Error = WebmetroError> { pub fn stdin_stream() -> impl futures3::TryStream<
FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) Item = Result<Cursor<Bytes>, WebmetroError>,
.map(|bytes| bytes.into_buf()) Ok = Cursor<Bytes>,
.map_err(WebmetroError::from) Error = WebmetroError,
> + Sized
+ Unpin {
tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
.map_ok(|bytes| Cursor::new(bytes.freeze()))
.map_err(WebmetroError::from)
} }

View file

@ -17,6 +17,7 @@ use futures3::{
compat::{ compat::{
Compat, Compat,
CompatSink, CompatSink,
Compat01As03,
}, },
Never, Never,
prelude::*, prelude::*,
@ -66,13 +67,13 @@ fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroErro
} }
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
let source = stream let source = Compat01As03::new(stream
.map_err(WebmetroError::from) .map_err(WebmetroError::from))
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT); .chunk_webm().with_soft_limit(BUFFER_LIMIT);
let sink = CompatSink::new(Transmitter::new(channel)); let sink = CompatSink::new(Transmitter::new(channel));
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.into_stream() .into_stream()
.map(|_| empty()) .map(|_| empty())
.map_err(|err| { .map_err(|err| {

View file

@ -2,9 +2,9 @@ use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures::{
prelude::* prelude::*
}; };
use futures3::prelude::*;
use futures3::compat::{ use futures3::compat::{
Compat, Compat,
Compat01As03
}; };
use hyper::{ use hyper::{
Body, Body,
@ -24,8 +24,8 @@ use webmetro::{
}, },
error::WebmetroError, error::WebmetroError,
fixers::{ fixers::{
ChunkStream,
ChunkTimecodeFixer, ChunkTimecodeFixer,
Throttle,
}, },
stream_parser::StreamEbml stream_parser::StreamEbml
}; };
@ -41,7 +41,7 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
type BoxedChunkStream = Box<dyn Stream<Item = Chunk, Error = WebmetroError> + Send>; type BoxedChunkStream = Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin>;
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
@ -49,7 +49,7 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.map(move |chunk| timecode_fixer.process(chunk)) .map_ok(move |chunk| timecode_fixer.process(chunk))
); );
let url_str = match args.value_of("url") { let url_str = match args.value_of("url") {
@ -58,15 +58,15 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
}; };
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle())); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
let request_payload = Body::wrap_stream(chunk_stream.map( let request_payload = Body::wrap_stream(Compat::new(chunk_stream.map_ok(
|webm_chunk| webm_chunk.into_bytes() |webm_chunk| webm_chunk.into_bytes()
).map_err(|err| { ).map_err(|err| {
eprintln!("{}", &err); eprintln!("{}", &err);
err err
})); })));
let request = Request::put(url_str) let request = Request::put(url_str)

View file

@ -1,5 +1,7 @@
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{stream::Stream, Async}; use futures::Async;
use futures3::stream::{Stream, StreamExt, TryStream};
use std::task::{Context, Poll};
use crate::ebml::FromEbml; use crate::ebml::FromEbml;
use crate::error::WebmetroError; use crate::error::WebmetroError;
@ -21,10 +23,10 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml pub trait StreamEbml: Sized + TryStream + Unpin
where where
Self: Sized + Stream, Self: Sized + TryStream + Unpin,
Self::Item: Buf, Self::Ok: Buf,
{ {
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
@ -36,11 +38,12 @@ where
} }
} }
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> StreamEbml for S {} impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S> { impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
pub fn poll_event<'a, T: FromEbml<'a>>( pub fn poll_event<'a, T: FromEbml<'a>>(
&'a mut self, &'a mut self,
cx: &mut Context,
) -> Result<Async<Option<T>>, WebmetroError> { ) -> Result<Async<Option<T>>, WebmetroError> {
loop { loop {
match T::check_space(&self.buffer)? { match T::check_space(&self.buffer)? {
@ -64,13 +67,14 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S>
} }
} }
match self.stream.poll()? { match self.stream.poll_next_unpin(cx)? {
Async::Ready(Some(buf)) => { Poll::Ready(Some(buf)) => {
self.buffer.reserve(buf.remaining()); self.buffer.reserve(buf.remaining());
self.buffer.put(buf); self.buffer.put(buf);
// ok can retry decoding now // ok can retry decoding now
} }
other => return Ok(other.map(|_| None)), Poll::Ready(None) => return Ok(Async::Ready(None)),
Poll::Pending => return Ok(Async::NotReady),
} }
} }
} }
@ -79,8 +83,12 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S>
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::IntoBuf; use bytes::IntoBuf;
use futures::prelude::*;
use futures::Async::*; use futures::Async::*;
use futures3::{
future::poll_fn,
stream::StreamExt,
FutureExt,
};
use matches::assert_matches; use matches::assert_matches;
use crate::stream_parser::*; use crate::stream_parser::*;
@ -89,47 +97,53 @@ mod tests {
#[test] #[test]
fn stream_webm_test() { fn stream_webm_test() {
let pieces = vec![ poll_fn(|cx| {
&ENCODE_WEBM_TEST_FILE[0..20], let pieces = vec![
&ENCODE_WEBM_TEST_FILE[20..40], &ENCODE_WEBM_TEST_FILE[0..20],
&ENCODE_WEBM_TEST_FILE[40..], &ENCODE_WEBM_TEST_FILE[20..40],
]; &ENCODE_WEBM_TEST_FILE[40..],
];
let mut stream_parser = futures::stream::iter_ok(pieces.iter()) let mut stream_parser = futures3::stream::iter(pieces.iter())
.map(|bytes| bytes.into_buf()) .map(|bytes| Ok(bytes.into_buf()))
.parse_ebml(); .parse_ebml();
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::EbmlHead))) Ok(Ready(Some(WebmElement::EbmlHead)))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Segment))) Ok(Ready(Some(WebmElement::Segment)))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Tracks(_)))) Ok(Ready(Some(WebmElement::Tracks(_))))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Cluster))) Ok(Ready(Some(WebmElement::Cluster)))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Timecode(0)))) Ok(Ready(Some(WebmElement::Timecode(0))))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::SimpleBlock(_)))) Ok(Ready(Some(WebmElement::SimpleBlock(_))))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Cluster))) Ok(Ready(Some(WebmElement::Cluster)))
); );
assert_matches!( assert_matches!(
stream_parser.poll_event(), stream_parser.poll_event(cx),
Ok(Ready(Some(WebmElement::Timecode(1000)))) Ok(Ready(Some(WebmElement::Timecode(1000))))
); );
std::task::Poll::Ready(())
})
.now_or_never()
.expect("Test succeeded without blocking");
} }
} }