Merge branch 'nightly' - async stable now

This commit is contained in:
Tangent Wantwight 2019-11-17 22:44:13 -05:00
commit 77401e9f51
15 changed files with 1281 additions and 620 deletions

1018
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,13 +7,14 @@ edition = "2018"
[dependencies] [dependencies]
bytes = "0.4.12" bytes = "0.4.12"
clap = "2.33.0" clap = "2.33.0"
custom_error = "1.6.0" custom_error = "1.7"
futures = "0.1.28" futures = "0.1.29"
http = "0.1.17" futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
hyper = "0.12.31" http = "0.1.18"
hyper = "0.12.35"
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
matches = "0.1.8"
odds = { version = "0.3.1", features = ["std-vec"] } odds = { version = "0.3.1", features = ["std-vec"] }
tokio = "0.1.22" tokio2 = { package = "tokio", version="0.2.0-alpha.6" }
tokio-codec = "0.1.1" warp = "0.1.20"
tokio-io = "0.1.12"
warp = "0.1.16"
weak-table = "0.2.3" weak-table = "0.2.3"

View File

@ -1,25 +1,27 @@
use std::pin::Pin;
use std::task::{
Context,
Poll
};
use std::sync::{ use std::sync::{
Arc, Arc,
Mutex Mutex
}; };
use futures::{ use futures3::{
Async, channel::mpsc::{
AsyncSink,
Sink,
Stream,
sync::mpsc::{
channel as mpsc_channel, channel as mpsc_channel,
Sender, Sender,
Receiver Receiver
} },
Sink,
Stream,
Never
}; };
use odds::vec::VecExt; use odds::vec::VecExt;
use crate::chunk::Chunk; use crate::chunk::Chunk;
pub enum Never {}
/// A collection of listeners to a stream of WebM chunks. /// A collection of listeners to a stream of WebM chunks.
/// Sending a chunk may fail due to a client being disconnected, /// Sending a chunk may fail due to a client being disconnected,
/// or simply failing to keep up with the stream buffer. In either /// or simply failing to keep up with the stream buffer. In either
@ -55,11 +57,14 @@ impl Transmitter {
} }
} }
impl Sink for Transmitter { impl Sink<Chunk> for Transmitter {
type SinkItem = Chunk; type Error = Never; // never errors, slow clients are simply dropped
type SinkError = Never; // never errors, slow clients are simply dropped
fn start_send(&mut self, chunk: Chunk) -> Result<AsyncSink<Chunk>, Never> { fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk { if let Chunk::Headers { .. } = chunk {
@ -68,14 +73,27 @@ impl Sink for Transmitter {
channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
Ok(AsyncSink::Ready) Ok(())
} }
fn poll_complete(&mut self) -> Result<Async<()>, Never> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
let mut result = Poll::Ready(Ok(()));
channel.listeners.retain_mut(|listener| listener.poll_complete().is_ok()); // just disconnect any erroring listeners
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) {
Poll::Pending => {result = Poll::Pending; true},
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => false,
});
Ok(Async::Ready(())) result
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
// don't actually disconnect listeners, since other sources may want to transmit to this channel;
// just ensure we've sent everything we can out
self.poll_flush(cx)
} }
} }
@ -108,9 +126,9 @@ impl Listener {
impl Stream for Listener { impl Stream for Listener {
type Item = Chunk; type Item = Chunk;
type Error = Never; // no transmitter errors are exposed to the listeners
fn poll(&mut self) -> Result<Async<Option<Chunk>>, Never> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Chunk>> {
Ok(self.receiver.poll().expect("Channel receiving can't error")) let receiver = &mut self.get_mut().receiver;
Pin::new(receiver).poll_next(cx)
} }
} }

View File

@ -1,10 +1,12 @@
use bytes::Bytes; use bytes::{Buf, Bytes};
use futures::{Async, Stream}; use futures3::prelude::*;
use std::{ use std::{
io::Cursor, io::Cursor,
mem mem,
pin::Pin,
task::{Context, Poll, Poll::*},
}; };
use crate::ebml::EbmlEventSource; use crate::stream_parser::EbmlStreamingParser;
use crate::error::WebmetroError; use crate::error::WebmetroError;
use crate::webm::*; use crate::webm::*;
@ -103,7 +105,7 @@ enum ChunkerState {
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: S, source: EbmlStreamingParser<S>,
buffer_size_limit: Option<usize>, buffer_size_limit: Option<usize>,
state: ChunkerState state: ChunkerState
} }
@ -128,146 +130,144 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
encode_webm_element(element, buffer).map_err(|err| err.into()) encode_webm_element(element, buffer).map_err(|err| err.into())
} }
impl<S: EbmlEventSource> Stream for WebmChunker<S> impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
where S::Error: Into<WebmetroError>
{ {
type Item = Chunk; type Item = Result<Chunk, WebmetroError>;
type Error = WebmetroError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut();
loop { loop {
let mut return_value = None; match chunker.state {
let mut new_state = None;
match self.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
match self.source.poll_event() { match chunker.source.poll_event(cx) {
Err(passthru) => return Err(passthru.into()), Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Ok(Async::NotReady) => return Ok(Async::NotReady), Pending => return Pending,
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ready(None) => return Ready(None),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ready(Some(Ok(element))) => match element {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); WebmElement::Cluster => {
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
return_value = Some(Ok(Async::Ready(Some(header_chunk)))); chunker.state = ChunkerState::BuildingCluster(
new_state = Some(ChunkerState::BuildingCluster( ClusterHead::new(0),
ClusterHead::new(0), Cursor::new(Vec::new())
Cursor::new(Vec::new()) );
)); return Ready(Some(Ok(header_chunk)));
}, },
Ok(Async::Ready(Some(WebmElement::Info))) => {}, WebmElement::Info => {},
Ok(Async::Ready(Some(WebmElement::Void))) => {}, WebmElement::Void => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, WebmElement::Unknown(_) => {},
Ok(Async::Ready(Some(element))) => { element => {
encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
}); }
}
} }
} }
}, },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match self.source.poll_event() { match chunker.source.poll_event(cx) {
Err(passthru) => return Err(passthru.into()), Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Ok(Async::NotReady) => return Ok(Async::NotReady), Pending => return Pending,
Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) Ready(Some(Ok(element))) => match element {
| Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { WebmElement::EbmlHead | WebmElement::Segment => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, self.buffer_size_limit) { match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
Ok(_) => { Ok(_) => {
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{
new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ body: liberated_buffer.into_inner(),
body: liberated_buffer.into_inner(), new_header: new_header_cursor
new_header: new_header_cursor };
}); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
}, },
Err(err) => { Err(err) => {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
}
} }
} },
} WebmElement::Cluster => {
Ok(Async::Ready(Some(WebmElement::Cluster))) => { let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner());
new_state = Some(ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
},
WebmElement::Timecode(timecode) => {
cluster_head.update_timecode(timecode);
},
WebmElement::SimpleBlock(ref block) => {
if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true;
}
cluster_head.observe_simpleblock_timecode(block.timecode);
if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) {
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
},
WebmElement::Info => {},
WebmElement::Void => {},
WebmElement::Unknown(_) => {},
element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
},
}, },
Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => { Ready(None) => {
cluster_head.update_timecode(timecode);
},
Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => {
if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true;
}
cluster_head.observe_simpleblock_timecode(block.timecode);
encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| {
return_value = Some(Err(err));
new_state = Some(ChunkerState::End);
});
},
Ok(Async::Ready(Some(WebmElement::Info))) => {},
Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => {
encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| {
return_value = Some(Err(err));
new_state = Some(ChunkerState::End);
});
},
Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner());
new_state = Some(ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
} }
} }
}, },
ChunkerState::EmittingClusterBody(ref mut buffer) => { ChunkerState::EmittingClusterBody(ref mut buffer) => {
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); chunker.state = ChunkerState::BuildingCluster(
new_state = Some(ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
)); );
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
}, },
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
let liberated_body = mem::replace(body, Vec::new()); let liberated_body = mem::replace(body, Vec::new());
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})))); chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
}, },
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
}, },
ChunkerState::End => return Ok(Async::Ready(None)) ChunkerState::End => return Ready(None)
}; };
if let Some(new_state) = new_state {
self.state = new_state;
}
if let Some(return_value) = return_value {
return return_value;
}
} }
} }
} }
pub trait WebmStream where Self: Sized + EbmlEventSource { pub trait WebmStream {
fn chunk_webm(self) -> WebmChunker<Self> { type Stream;
fn chunk_webm(self) -> WebmChunker<Self::Stream>;
}
impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
type Stream = S;
fn chunk_webm(self) -> WebmChunker<S> {
WebmChunker { WebmChunker {
source: self, source: self,
buffer_size_limit: None, buffer_size_limit: None,
@ -276,8 +276,6 @@ pub trait WebmStream where Self: Sized + EbmlEventSource {
} }
} }
impl<T: EbmlEventSource> WebmStream for T {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -1,5 +1,5 @@
use clap::{App, AppSettings, ArgMatches, SubCommand}; use clap::{App, AppSettings, ArgMatches, SubCommand};
use futures::prelude::*; use tokio2::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -21,15 +21,15 @@ pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
let mut events = stdin_stream().parse_ebml(); let mut events = stdin_stream().parse_ebml();
// stdin is sync so Async::NotReady will never happen Runtime::new().unwrap().block_on(async {
while let Ok(Async::Ready(Some(element))) = events.poll_event() { while let Some(element) = events.next().await? {
match element { match element {
// suppress printing byte arrays // suppress printing byte arrays
Tracks(slice) => println!("Tracks[{}]", slice.len()), Tracks(slice) => println!("Tracks[{}]", slice.len()),
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
other => println!("{:?}", other) other => println!("{:?}", other)
}
} }
} Ok(())
})
Ok(())
} }

View File

@ -4,8 +4,9 @@ use std::{
}; };
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures3::prelude::*;
use tokio::runtime::Runtime; use futures3::future::ready;
use tokio2::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -14,7 +15,10 @@ use webmetro::{
WebmStream WebmStream
}, },
error::WebmetroError, error::WebmetroError,
fixers::ChunkStream, fixers::{
ChunkTimecodeFixer,
Throttle,
},
stream_parser::StreamEbml stream_parser::StreamEbml
}; };
@ -27,18 +31,19 @@ pub fn options() -> App<'static, 'static> {
} }
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new( let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.fix_timecodes() .map_ok(move |chunk| timecode_fixer.process(chunk))
); );
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(chunk_stream.throttle()); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
Runtime::new().unwrap().block_on(chunk_stream.for_each(|chunk| { Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| {
io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from) ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from))
})) }))
} }

View File

@ -1,15 +1,7 @@
use std::io::stdin; use std::io::Cursor;
use bytes::{ use bytes::Bytes;
Buf, use futures3::TryStreamExt;
IntoBuf
};
use futures::prelude::*;
use tokio_io::io::AllowStdIo;
use tokio_codec::{
BytesCodec,
FramedRead
};
use webmetro::error::WebmetroError; use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
@ -20,8 +12,13 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> impl Stream<Item = impl Buf, Error = WebmetroError> { pub fn stdin_stream() -> impl futures3::TryStream<
FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) Item = Result<Cursor<Bytes>, WebmetroError>,
.map(|bytes| bytes.into_buf()) Ok = Cursor<Bytes>,
.map_err(WebmetroError::from) Error = WebmetroError,
> + Sized
+ Unpin {
tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
.map_ok(|bytes| Cursor::new(bytes.freeze()))
.map_err(WebmetroError::from)
} }

View File

@ -13,6 +13,15 @@ use futures::{
Sink, Sink,
stream::empty stream::empty
}; };
use futures3::{
compat::{
Compat,
CompatSink,
Compat01As03,
},
Never,
prelude::*,
};
use hyper::{ use hyper::{
Body, Body,
Response, Response,
@ -38,28 +47,32 @@ use webmetro::{
}, },
chunk::WebmStream, chunk::WebmStream,
error::WebmetroError, error::WebmetroError,
fixers::ChunkStream, fixers::{
ChunkStream,
ChunkTimecodeFixer,
},
stream_parser::StreamEbml stream_parser::StreamEbml
}; };
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
Listener::new(channel) let mut timecode_fixer = ChunkTimecodeFixer::new();
.fix_timecodes() Compat::new(Listener::new(channel).map(|c| Ok(c))
.map_ok(move |chunk| timecode_fixer.process(chunk))
.find_starting_point() .find_starting_point()
.map(|webm_chunk| webm_chunk.into_bytes()) .map_ok(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err| match err {}) .map_err(|err: Never| match err {}))
} }
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
let source = stream let source = Compat01As03::new(stream
.map_err(WebmetroError::from) .map_err(WebmetroError::from))
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT); .chunk_webm().with_soft_limit(BUFFER_LIMIT);
let sink = Transmitter::new(channel); let sink = CompatSink::new(Transmitter::new(channel));
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.into_stream() .into_stream()
.map(|_| empty()) .map(|_| empty())
.map_err(|err| { .map_err(|err| {

View File

@ -1,26 +1,15 @@
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures3::prelude::*;
prelude::* use hyper13::{client::HttpConnector, Body, Client, Request};
}; use std::io::{stdout, Write};
use hyper::{ use tokio2::runtime::Runtime;
Body,
Client,
client::HttpConnector,
Request
};
use tokio::runtime::Runtime;
use super::{ use super::stdin_stream;
stdin_stream
};
use webmetro::{ use webmetro::{
chunk::{ chunk::{Chunk, WebmStream},
Chunk,
WebmStream
},
error::WebmetroError, error::WebmetroError,
fixers::ChunkStream, fixers::{ChunkTimecodeFixer, Throttle},
stream_parser::StreamEbml stream_parser::StreamEbml,
}; };
pub fn options() -> App<'static, 'static> { pub fn options() -> App<'static, 'static> {
@ -34,45 +23,49 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send>; type BoxedChunkStream = Box<
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
+ Send
+ Sync
+ Unpin,
>;
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: BoxedChunkStream = Box::new( let mut chunk_stream: BoxedChunkStream = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.fix_timecodes() .map_ok(move |chunk| timecode_fixer.process(chunk)),
); );
let url_str = match args.value_of("url") { let url_str = match args.value_of("url") {
Some(url) => String::from(url), Some(url) => String::from(url),
_ => return Err("Listen address wasn't provided".into()) _ => return Err("Listen address wasn't provided".into()),
}; };
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(chunk_stream.throttle()); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
let request_payload = Body::wrap_stream(chunk_stream.map( let chunk_stream = chunk_stream
|webm_chunk| webm_chunk.into_bytes() .map_ok(|webm_chunk| webm_chunk.into_bytes())
).map_err(|err| { .map_err(|err| {
eprintln!("{}", &err); eprintln!("{}", &err);
err err
})); });
let request_payload = Body::wrap_stream(chunk_stream);
let request = Request::put(url_str)
.body(request_payload)
.map_err(WebmetroError::from)?;
let client = Client::builder().build(HttpConnector::new(1)); let request = Request::put(url_str).body(request_payload)?;
let future = client.request(request) let client = Client::builder().build(HttpConnector::new());
.and_then(|response| {
response.into_body().for_each(|_chunk| { Runtime::new().unwrap().block_on(async {
Ok(()) let response = client.request(request).await?;
}) let mut response_stream = response.into_body();
while let Some(response_chunk) = response_stream.next().await.transpose()? {
stdout().write_all(&response_chunk)?;
}
Ok(())
}) })
.map_err(WebmetroError::from);
Runtime::new().unwrap().block_on(future)
} }

View File

@ -1,7 +1,6 @@
use bytes::{BigEndian, ByteOrder, BufMut}; use bytes::{BigEndian, ByteOrder, BufMut};
use custom_error::custom_error; use custom_error::custom_error;
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
use futures::Async;
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3; pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
pub const DOC_TYPE_ID: u64 = 0x0282; pub const DOC_TYPE_ID: u64 = 0x0282;
@ -197,6 +196,12 @@ pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResul
output.write_all(&buffer.get_ref()[..]) output.write_all(&buffer.get_ref()[..])
} }
pub struct EbmlLayout {
pub element_id: u64,
pub body_offset: usize,
pub element_len: usize,
}
pub trait FromEbml<'a>: Sized { pub trait FromEbml<'a>: Sized {
/// Indicates if this tag's contents should be treated as a blob, /// Indicates if this tag's contents should be treated as a blob,
/// or if the tag header should be reported as an event and with further /// or if the tag header should be reported as an event and with further
@ -210,13 +215,14 @@ pub trait FromEbml<'a>: Sized {
/// references into the given buffer. /// references into the given buffer.
fn decode(element_id: u64, bytes: &'a[u8]) -> Result<Self, EbmlError>; fn decode(element_id: u64, bytes: &'a[u8]) -> Result<Self, EbmlError>;
/// Check if enough space exists in the given buffer for decode_element() to /// Check if enough space exists in the given buffer to decode an element;
/// be successful; parsing errors will be returned eagerly. /// it will not actually call `decode` or try to construct an instance,
fn check_space(bytes: &[u8]) -> Result<Option<usize>, EbmlError> { /// but EBML errors with the next tag header will be returned eagerly.
fn check_space(bytes: &[u8]) -> Result<Option<EbmlLayout>, EbmlError> {
match decode_tag(bytes) { match decode_tag(bytes) {
Ok(None) => Ok(None), Ok(None) => Ok(None),
Err(err) => Err(err), Err(err) => Err(err),
Ok(Some((element_id, payload_size_tag, tag_size))) => { Ok(Some((element_id, payload_size_tag, body_offset))) => {
let should_unwrap = Self::should_unwrap(element_id); let should_unwrap = Self::should_unwrap(element_id);
let payload_size = match (should_unwrap, payload_size_tag) { let payload_size = match (should_unwrap, payload_size_tag) {
@ -225,12 +231,16 @@ pub trait FromEbml<'a>: Sized {
(false, Varint::Value(size)) => size as usize (false, Varint::Value(size)) => size as usize
}; };
let element_size = tag_size + payload_size; let element_len = body_offset + payload_size;
if element_size > bytes.len() { if element_len > bytes.len() {
// need to read more still // need to read more still
Ok(None) Ok(None)
} else { } else {
Ok(Some(element_size)) Ok(Some(EbmlLayout {
element_id,
body_offset,
element_len
}))
} }
} }
} }
@ -238,26 +248,11 @@ pub trait FromEbml<'a>: Sized {
/// Attempt to construct an instance of this type from the given byte slice /// Attempt to construct an instance of this type from the given byte slice
fn decode_element(bytes: &'a[u8]) -> Result<Option<(Self, usize)>, EbmlError> { fn decode_element(bytes: &'a[u8]) -> Result<Option<(Self, usize)>, EbmlError> {
match decode_tag(bytes) { match Self::check_space(bytes)? {
Ok(None) => Ok(None), None => Ok(None),
Err(err) => Err(err), Some(info) => {
Ok(Some((element_id, payload_size_tag, tag_size))) => { match Self::decode(info.element_id, &bytes[info.body_offset..info.element_len]) {
let should_unwrap = Self::should_unwrap(element_id); Ok(element) => Ok(Some((element, info.element_len))),
let payload_size = match (should_unwrap, payload_size_tag) {
(true, _) => 0,
(false, Varint::Unknown) => return Err(EbmlError::UnknownElementLength),
(false, Varint::Value(size)) => size as usize
};
let element_size = tag_size + payload_size;
if element_size > bytes.len() {
// need to read more still
return Ok(None);
}
match Self::decode(element_id, &bytes[tag_size..element_size]) {
Ok(element) => Ok(Some((element, element_size))),
Err(error) => Err(error) Err(error) => Err(error)
} }
} }
@ -265,11 +260,6 @@ pub trait FromEbml<'a>: Sized {
} }
} }
pub trait EbmlEventSource {
type Error;
fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, Self::Error>;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::{BytesMut}; use bytes::{BytesMut};

View File

@ -3,12 +3,12 @@ use custom_error::custom_error;
custom_error!{pub WebmetroError custom_error!{pub WebmetroError
ResourcesExceeded = "resources exceeded", ResourcesExceeded = "resources exceeded",
EbmlError{source: crate::ebml::EbmlError} = "EBML error", EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
HttpError{source: http::Error} = "HTTP error", HttpError{source: http::Error} = "HTTP error: {source}",
HyperError{source: hyper::Error} = "Hyper error", HyperError{source: hyper::Error} = "Hyper error: {source}",
IoError{source: std::io::Error} = "IO error", Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
TimerError{source: tokio::timer::Error} = "Timer error", IoError{source: std::io::Error} = "IO error: {source}",
WarpError{source: warp::Error} = "Warp error", WarpError{source: warp::Error} = "Warp error: {source}",
ApplicationError{message: String} = "{message}" ApplicationError{message: String} = "{message}"
} }

View File

@ -1,27 +1,36 @@
use std::pin::Pin;
use std::task::{
Context,
Poll
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::prelude::*; use futures3::prelude::*;
use tokio::timer::Delay; use tokio2::timer::{
delay,
Delay
};
use crate::chunk::Chunk; use crate::chunk::Chunk;
use crate::error::WebmetroError; use crate::error::WebmetroError;
pub struct ChunkTimecodeFixer<S> { pub struct ChunkTimecodeFixer {
stream: S,
current_offset: u64, current_offset: u64,
last_observed_timecode: u64, last_observed_timecode: u64,
assumed_duration: u64 assumed_duration: u64
} }
impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S> impl ChunkTimecodeFixer {
{ pub fn new() -> ChunkTimecodeFixer {
type Item = S::Item; ChunkTimecodeFixer {
type Error = S::Error; current_offset: 0,
last_observed_timecode: 0,
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { assumed_duration: 33
let mut poll_chunk = self.stream.poll(); }
match poll_chunk { }
Ok(Async::Ready(Some(Chunk::ClusterHead(ref mut cluster_head)))) => { pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk {
match chunk {
Chunk::ClusterHead(ref mut cluster_head) => {
let start = cluster_head.start; let start = cluster_head.start;
if start < self.last_observed_timecode { if start < self.last_observed_timecode {
let next_timecode = self.last_observed_timecode + self.assumed_duration; let next_timecode = self.last_observed_timecode + self.assumed_duration;
@ -30,10 +39,10 @@ impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
cluster_head.update_timecode(start + self.current_offset); cluster_head.update_timecode(start + self.current_offset);
self.last_observed_timecode = cluster_head.end; self.last_observed_timecode = cluster_head.end;
}, }
_ => {} _ => {}
}; }
poll_chunk chunk
} }
} }
@ -43,33 +52,32 @@ pub struct StartingPointFinder<S> {
seen_keyframe: bool seen_keyframe: bool
} }
impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
{ {
type Item = S::Item; type Item = Result<Chunk, S::Error>;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
loop { loop {
return match self.stream.poll() { return match self.stream.try_poll_next_unpin(cx) {
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => { Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => {
if cluster_head.keyframe { if cluster_head.keyframe {
self.seen_keyframe = true; self.seen_keyframe = true;
} }
if self.seen_keyframe { if self.seen_keyframe {
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head))))
} else { } else {
continue; continue;
} }
}, },
chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => { chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => {
if self.seen_keyframe { if self.seen_keyframe {
chunk chunk
} else { } else {
continue; continue;
} }
}, },
chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => { chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
if self.seen_header { if self.seen_header {
// new stream starting, we don't need a new header but should wait for a safe spot to resume // new stream starting, we don't need a new header but should wait for a safe spot to resume
self.seen_keyframe = false; self.seen_keyframe = false;
@ -91,37 +99,46 @@ pub struct Throttle<S> {
sleep: Delay sleep: Delay
} }
impl<S: Stream<Item = Chunk, Error = WebmetroError>> Stream for Throttle<S> impl<S> Throttle<S> {
{ pub fn new(wrap: S) -> Throttle<S> {
type Item = S::Item; let now = Instant::now();
type Error = WebmetroError; Throttle {
stream: wrap,
start_time: now,
sleep: delay(now)
}
}
}
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> { impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S>
match self.sleep.poll() { {
Err(err) => return Err(err.into()), type Item = Result<Chunk, WebmetroError>;
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => { /* can continue */ } fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
match self.sleep.poll_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => { /* can continue */ },
} }
let next_chunk = self.stream.poll(); let next_chunk = self.stream.try_poll_next_unpin(cx);
if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
// snooze until real time has "caught up" to the stream // snooze until real time has "caught up" to the stream
let offset = Duration::from_millis(cluster_head.end); let offset = Duration::from_millis(cluster_head.end);
self.sleep.reset(self.start_time + offset); let sleep_until = self.start_time + offset;
self.sleep.reset(sleep_until);
} }
next_chunk next_chunk
} }
} }
pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> { pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> { /*fn fix_timecodes(self) -> Map<_> {
ChunkTimecodeFixer { let fixer = ;
stream: self, self.map(move |chunk| {
current_offset: 0, fixer.process(chunk);
last_observed_timecode: 0, chunk
assumed_duration: 33 })
} }*/
}
fn find_starting_point(self) -> StartingPointFinder<Self> { fn find_starting_point(self) -> StartingPointFinder<Self> {
StartingPointFinder { StartingPointFinder {
@ -132,13 +149,8 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
} }
fn throttle(self) -> Throttle<Self> { fn throttle(self) -> Throttle<Self> {
let now = Instant::now(); Throttle::new(self)
Throttle {
stream: self,
start_time: now,
sleep: Delay::new(now)
}
} }
} }
impl<T: Stream<Item = Chunk>> ChunkStream for T {} impl<T: TryStream<Ok = Chunk>> ChunkStream for T {}

View File

@ -1,8 +1,8 @@
pub mod ebml; pub mod ebml;
pub mod error; pub mod error;
pub mod iterator; pub mod iterator;
pub mod slice;
pub mod stream_parser; pub mod stream_parser;
pub mod chunk; pub mod chunk;
@ -15,18 +15,6 @@ pub use crate::ebml::{EbmlError, FromEbml};
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::future::{ok, Future};
pub const TEST_FILE: &'static [u8] = include_bytes!("data/test1.webm"); pub const TEST_FILE: &'static [u8] = include_bytes!("data/test1.webm");
pub const ENCODE_WEBM_TEST_FILE: &'static [u8] = include_bytes!("data/encode_webm_test.webm"); pub const ENCODE_WEBM_TEST_FILE: &'static [u8] = include_bytes!("data/encode_webm_test.webm");
#[test]
fn hello_futures() {
let my_future = ok::<String, ()>("Hello".into())
.map(|hello| hello + ", Futures!");
let string_result = my_future.wait().unwrap();
assert_eq!(string_result, "Hello, Futures!");
}
} }

View File

@ -1,18 +0,0 @@
use futures::Async;
use crate::ebml::EbmlError;
use crate::ebml::EbmlEventSource;
use crate::ebml::FromEbml;
pub struct EbmlSlice<'a>(pub &'a [u8]);
impl<'b> EbmlEventSource for EbmlSlice<'b> {
type Error = EbmlError;
fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, EbmlError> {
T::decode_element(self.0).map(|option| option.map(|(element, element_size)| {
self.0 = &self.0[element_size..];
element
})).map(Async::Ready)
}
}

View File

@ -1,24 +1,15 @@
use bytes::{ use bytes::{Buf, BufMut, Bytes, BytesMut};
Buf, use futures3::stream::{Stream, StreamExt, TryStream};
BufMut, use std::task::{Context, Poll};
BytesMut
};
use futures::{
Async,
stream::Stream
};
use crate::ebml::{ use crate::ebml::FromEbml;
EbmlEventSource,
FromEbml
};
use crate::error::WebmetroError; use crate::error::WebmetroError;
pub struct EbmlStreamingParser<S> { pub struct EbmlStreamingParser<S> {
stream: S, stream: S,
buffer: BytesMut, buffer: BytesMut,
buffer_size_limit: Option<usize>, buffer_size_limit: Option<usize>,
last_read: usize borrowed: Bytes,
} }
impl<S> EbmlStreamingParser<S> { impl<S> EbmlStreamingParser<S> {
@ -31,70 +22,183 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml where Self: Sized + Stream, Self::Item: Buf { pub trait StreamEbml: Sized + TryStream + Unpin
where
Self: Sized + TryStream + Unpin,
Self::Ok: Buf,
{
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
stream: self, stream: self,
buffer: BytesMut::new(), buffer: BytesMut::new(),
buffer_size_limit: None, buffer_size_limit: None,
last_read: 0 borrowed: Bytes::new(),
} }
} }
} }
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> StreamEbml for S {} impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S> {
pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> {
// release buffer from previous event
self.buffer.advance(self.last_read);
self.last_read = 0;
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
pub fn poll_event<'a, T: FromEbml<'a>>(
&'a mut self,
cx: &mut Context,
) -> Poll<Option<Result<T, WebmetroError>>> {
loop { loop {
match T::check_space(&self.buffer) { match T::check_space(&self.buffer)? {
Ok(None) => { None => {
// need to refill buffer, below // need to refill buffer, below
}, }
other => return other.map_err(WebmetroError::from).and_then(move |_| { Some(info) => {
match T::decode_element(&self.buffer) { let mut bytes = self.buffer.split_to(info.element_len).freeze();
Err(err) => Err(err.into()), bytes.advance(info.body_offset);
Ok(None) => panic!("Buffer was supposed to have enough data to parse element, somehow did not."), self.borrowed = bytes;
Ok(Some((element, element_size))) => { return Poll::Ready(Some(T::decode(
self.last_read = element_size; info.element_id,
Ok(Async::Ready(Some(element))) &self.borrowed,
} ).map_err(Into::into)));
} }
})
} }
if let Some(limit) = self.buffer_size_limit { if let Some(limit) = self.buffer_size_limit {
if limit <= self.buffer.len() { if limit <= self.buffer.len() {
return Err(WebmetroError::ResourcesExceeded); return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded)));
} }
} }
match self.stream.poll() { match self.stream.poll_next_unpin(cx)? {
Ok(Async::Ready(Some(buf))) => { Poll::Ready(Some(buf)) => {
self.buffer.reserve(buf.remaining()); self.buffer.reserve(buf.remaining());
self.buffer.put(buf); self.buffer.put(buf);
// ok can retry decoding now // ok can retry decoding now
}, }
other => return other.map(|async_status| async_status.map(|_| None)) Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
} }
} }
} }
} }
impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlEventSource for EbmlStreamingParser<S> { impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
type Error = WebmetroError; pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
loop {
if let Some(info) = T::check_space(&self.buffer)? {
let mut bytes = self.buffer.split_to(info.element_len).freeze();
bytes.advance(info.body_offset);
self.borrowed = bytes;
return Ok(Some(T::decode(info.element_id, &self.borrowed)?));
}
fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, WebmetroError> { if let Some(limit) = self.buffer_size_limit {
return EbmlStreamingParser::poll_event(self); if limit <= self.buffer.len() {
// hit our buffer limit and still nothing parsed
return Err(WebmetroError::ResourcesExceeded);
}
}
match self.stream.next().await.transpose()? {
Some(refill) => {
self.buffer.reserve(refill.remaining());
self.buffer.put(refill);
}
None => {
// Nothing left, we're done
return Ok(None);
}
}
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
//#[test] use bytes::IntoBuf;
use futures3::{future::poll_fn, stream::StreamExt, FutureExt};
use matches::assert_matches;
use std::task::Poll::*;
use crate::stream_parser::*;
use crate::tests::ENCODE_WEBM_TEST_FILE;
use crate::webm::*;
#[test]
fn stream_webm_test() {
poll_fn(|cx| {
let pieces = vec![
&ENCODE_WEBM_TEST_FILE[0..20],
&ENCODE_WEBM_TEST_FILE[20..40],
&ENCODE_WEBM_TEST_FILE[40..],
];
let mut stream_parser = futures3::stream::iter(pieces.iter())
.map(|bytes| Ok(bytes.into_buf()))
.parse_ebml();
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::EbmlHead)))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Segment)))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Tracks(_))))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Cluster)))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Timecode(0))))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::SimpleBlock(_))))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Cluster)))
);
assert_matches!(
stream_parser.poll_event(cx),
Ready(Some(Ok(WebmElement::Timecode(1000))))
);
std::task::Poll::Ready(())
})
.now_or_never()
.expect("Test tried to block on I/O");
}
#[test]
fn async_webm_test() {
let pieces = vec![
&ENCODE_WEBM_TEST_FILE[0..20],
&ENCODE_WEBM_TEST_FILE[20..40],
&ENCODE_WEBM_TEST_FILE[40..],
];
async {
let mut parser = futures3::stream::iter(pieces.iter())
.map(|bytes| Ok(bytes.into_buf()))
.parse_ebml();
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
assert_matches!(parser.next().await?, Some(WebmElement::Segment));
assert_matches!(parser.next().await?, Some(WebmElement::Tracks(_)));
assert_matches!(parser.next().await?, Some(WebmElement::Cluster));
assert_matches!(parser.next().await?, Some(WebmElement::Timecode(0)));
assert_matches!(parser.next().await?, Some(WebmElement::SimpleBlock(_)));
assert_matches!(parser.next().await?, Some(WebmElement::Cluster));
assert_matches!(parser.next().await?, Some(WebmElement::Timecode(1000)));
Result::<(), WebmetroError>::Ok(())
}
.now_or_never()
.expect("Test tried to block on I/O")
.expect("Parse failed");
}
} }