Simplify/sanify some types

This commit is contained in:
Tangent Wantwight 2020-05-08 21:15:18 -04:00
parent 00ec517e78
commit c422a1c3f3
6 changed files with 40 additions and 66 deletions

View File

@ -141,7 +141,9 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
encode_webm_element(element, buffer).map_err(|err| err.into()) encode_webm_element(element, buffer).map_err(|err| err.into())
} }
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S> impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S>
where
WebmetroError: From<E>,
{ {
type Item = Result<Chunk, WebmetroError>; type Item = Result<Chunk, WebmetroError>;

View File

@ -1,23 +1,14 @@
use std::{ use std::{io, io::prelude::*};
io,
io::prelude::*
};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
chunk::{ chunk::{Chunk, WebmStream},
Chunk,
WebmStream
},
error::WebmetroError, error::WebmetroError,
fixers::{ fixers::{ChunkTimecodeFixer, Throttle},
ChunkTimecodeFixer, stream_parser::StreamEbml,
Throttle,
},
stream_parser::StreamEbml
}; };
pub fn options() -> App<'static, 'static> { pub fn options() -> App<'static, 'static> {
@ -31,21 +22,20 @@ pub fn options() -> App<'static, 'static> {
#[tokio::main] #[tokio::main]
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new( let mut chunk_stream: Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Unpin> =
stdin_stream() Box::new(
.parse_ebml() stdin_stream()
.chunk_webm() .parse_ebml()
.map_ok(move |chunk| timecode_fixer.process(chunk)) .chunk_webm()
); .map_ok(move |chunk| timecode_fixer.process(chunk)),
);
if args.is_present("throttle") { if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream)); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
while let Some(chunk) = chunk_stream.next().await { while let Some(chunk) = chunk_stream.next().await {
chunk?.try_for_each(|buffer| chunk?.try_for_each(|buffer| io::stdout().write_all(&buffer))?;
io::stdout().write_all(&buffer) }
)?;
};
Ok(()) Ok(())
} }

View File

@ -1,9 +1,6 @@
use std::io::Cursor;
use bytes::Bytes; use bytes::Bytes;
use futures::{TryStream, TryStreamExt}; use futures::{Stream, TryStreamExt};
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
pub mod filter; pub mod filter;
@ -13,13 +10,7 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> impl TryStream< pub fn stdin_stream() -> impl Stream<Item = Result<Bytes, std::io::Error>> + Sized + Unpin {
Item = Result<Cursor<Bytes>, WebmetroError>,
Ok = Cursor<Bytes>,
Error = WebmetroError,
> + Sized
+ Unpin {
FramedRead::new(tokio::io::stdin(), BytesCodec::new()) FramedRead::new(tokio::io::stdin(), BytesCodec::new())
.map_ok(|bytes| Cursor::new(bytes.freeze())) .map_ok(|bytes| bytes.freeze())
.map_err(WebmetroError::from)
} }

View File

@ -22,12 +22,7 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
type BoxedChunkStream = Box< type BoxedChunkStream = Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync + Unpin>;
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
+ Send
+ Sync
+ Unpin,
>;
#[tokio::main] #[tokio::main]
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {

View File

@ -13,7 +13,6 @@ use tokio::time::{
}; };
use crate::chunk::Chunk; use crate::chunk::Chunk;
use crate::error::WebmetroError;
pub struct ChunkTimecodeFixer { pub struct ChunkTimecodeFixer {
current_offset: u64, current_offset: u64,
@ -29,7 +28,7 @@ impl ChunkTimecodeFixer {
assumed_duration: 33 assumed_duration: 33
} }
} }
pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk { pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
match chunk { match chunk {
Chunk::ClusterHead(ref mut cluster_head) => { Chunk::ClusterHead(ref mut cluster_head) => {
let start = cluster_head.start; let start = cluster_head.start;
@ -111,11 +110,11 @@ impl<S> Throttle<S> {
} }
} }
impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S>
{ {
type Item = Result<Chunk, WebmetroError>; type Item = Result<Chunk, S::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
match self.sleep.poll_unpin(cx) { match self.sleep.poll_unpin(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(()) => { /* can continue */ }, Poll::Ready(()) => { /* can continue */ },

View File

@ -1,5 +1,5 @@
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::stream::{Stream, StreamExt, TryStream}; use futures::stream::{Stream, StreamExt};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::ebml::FromEbml; use crate::ebml::FromEbml;
@ -22,11 +22,7 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml: Sized + TryStream + Unpin pub trait StreamEbml: Sized {
where
Self: Sized + TryStream + Unpin,
Self::Ok: Buf,
{
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
stream: self, stream: self,
@ -37,9 +33,13 @@ where
} }
} }
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {} impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> StreamEbml for S where WebmetroError: From<E>
{}
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> { impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> EbmlStreamingParser<S>
where
WebmetroError: From<E>,
{
pub fn poll_event<'a, T: FromEbml<'a>>( pub fn poll_event<'a, T: FromEbml<'a>>(
&'a mut self, &'a mut self,
cx: &mut Context, cx: &mut Context,
@ -53,10 +53,9 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
let mut bytes = self.buffer.split_to(info.element_len).freeze(); let mut bytes = self.buffer.split_to(info.element_len).freeze();
bytes.advance(info.body_offset); bytes.advance(info.body_offset);
self.borrowed = bytes; self.borrowed = bytes;
return Poll::Ready(Some(T::decode( return Poll::Ready(Some(
info.element_id, T::decode(info.element_id, &self.borrowed).map_err(Into::into),
&self.borrowed, ));
).map_err(Into::into)));
} }
} }
@ -77,9 +76,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
} }
} }
} }
}
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> { pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
loop { loop {
if let Some(info) = T::check_space(&self.buffer)? { if let Some(info) = T::check_space(&self.buffer)? {
@ -130,7 +127,7 @@ mod tests {
]; ];
let mut stream_parser = futures::stream::iter(pieces.iter()) let mut stream_parser = futures::stream::iter(pieces.iter())
.map(|bytes| Ok(&bytes[..])) .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
.parse_ebml(); .parse_ebml();
assert_matches!( assert_matches!(
@ -182,7 +179,7 @@ mod tests {
async { async {
let mut parser = futures::stream::iter(pieces.iter()) let mut parser = futures::stream::iter(pieces.iter())
.map(|bytes| Ok(&bytes[..])) .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
.parse_ebml(); .parse_ebml();
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead)); assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
@ -196,8 +193,8 @@ mod tests {
Result::<(), WebmetroError>::Ok(()) Result::<(), WebmetroError>::Ok(())
} }
.now_or_never() .now_or_never()
.expect("Test tried to block on I/O") .expect("Test tried to block on I/O")
.expect("Parse failed"); .expect("Parse failed");
} }
} }