diff --git a/Cargo.lock b/Cargo.lock index e679726..559258a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -598,6 +598,7 @@ dependencies = [ "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)", "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f2d5d16..99d0239 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ futures = "0.1.20" hyper = "0.11.25" odds = { version = "0.3.1", features = ["std-vec"] } tokio = "0.1.5" +tokio-io = "0.1.6" diff --git a/src/commands/dump.rs b/src/commands/dump.rs index fa2fb40..7a83a69 100644 --- a/src/commands/dump.rs +++ b/src/commands/dump.rs @@ -1,12 +1,9 @@ -use std::{ - error::Error, - io -}; +use std::error::Error; use clap::{App, AppSettings, ArgMatches, SubCommand}; use futures::Async; -use super::StdinStream; +use super::stdin_stream; use webmetro::{ stream_parser::StreamEbml, webm::{ @@ -23,8 +20,7 @@ pub fn options() -> App<'static, 'static> { pub fn run(_args: &ArgMatches) -> Result<(), Box> { - let stdin = io::stdin(); - let mut events = StdinStream::new(stdin.lock()).parse_ebml(); + let mut events = stdin_stream().parse_ebml(); // stdin is sync so Async::NotReady will never happen while let Ok(Async::Ready(Some(element))) = events.poll_event() { diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 4e6d7bd..5025afc 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -7,7 +7,7 @@ use std::{ use clap::{App, Arg, ArgMatches, SubCommand}; use futures::prelude::*; -use super::StdinStream; +use super::stdin_stream; use webmetro::{ chunk::{ Chunk, @@ -28,9 +28,8 @@ pub fn options() -> App<'static, 'static> { pub fn run(args: &ArgMatches) -> Result<(), Box> { - let stdin = io::stdin(); let mut chunk_stream: Box> = Box::new( - StdinStream::new(stdin.lock()) + stdin_stream() .parse_ebml() .chunk_webm() .fix_timecodes() diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 46f739b..a7654ce 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,11 +1,19 @@ use std::io::{ - StdinLock, - prelude::* + Error as IoError, + stdin, + Stdin }; use futures::{ - Async, - stream::Stream + prelude::*, + stream::MapErr +}; +use tokio_io::{ + io::AllowStdIo, + codec::{ + BytesCodec, + FramedRead + } }; use webmetro::error::WebmetroError; @@ -13,37 +21,10 @@ pub mod dump; pub mod filter; pub mod relay; -/// A hackish adapter that makes chunks of bytes from stdin available as a Stream; -/// is NOT actually async, and just uses blocking read. Buffers aren't optimized either -/// and copy more than necessary. -pub struct StdinStream<'a> { - buf_reader: StdinLock<'a>, - read_bytes: usize -} - -impl<'a> StdinStream<'a> { - pub fn new(lock: StdinLock<'a>) -> Self { - StdinStream { - buf_reader: lock, - read_bytes: 0 - } - } -} - -impl<'a> Stream for StdinStream<'a> { - type Item = Vec; - type Error = WebmetroError; - - fn poll(&mut self) -> Result>, Self::Error> { - self.buf_reader.consume(self.read_bytes); - let read_bytes = &mut self.read_bytes; - self.buf_reader.fill_buf().map(|slice| { - *read_bytes = slice.len(); - if *read_bytes > 0 { - Async::Ready(Some(Into::>::into(slice))) - } else { - Async::Ready(None) - } - }).map_err(WebmetroError::IoError) - } +/// An adapter that makes chunks of bytes from stdin available as a Stream; +/// is NOT actually async, and just uses blocking read. Don't use more than +/// one at once, who knows who gets which bytes. +pub fn stdin_stream() -> MapErr, BytesCodec>, fn(IoError) -> WebmetroError> { + FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) + .map_err(WebmetroError::IoError) } diff --git a/src/main.rs b/src/main.rs index eb6d5ab..3e76ad8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate futures; extern crate hyper; extern crate tokio; +extern crate tokio_io; extern crate webmetro; mod commands;