Have utilities read stdin via tokio_io wrapper

This commit is contained in:
Tangent 128 2018-04-14 18:18:50 -04:00
parent e77a3d0e98
commit a847d62b34
6 changed files with 26 additions and 47 deletions

1
Cargo.lock generated
View file

@ -598,6 +598,7 @@ dependencies = [
"hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)",
"odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]

View file

@ -10,3 +10,4 @@ futures = "0.1.20"
hyper = "0.11.25" hyper = "0.11.25"
odds = { version = "0.3.1", features = ["std-vec"] } odds = { version = "0.3.1", features = ["std-vec"] }
tokio = "0.1.5" tokio = "0.1.5"
tokio-io = "0.1.6"

View file

@ -1,12 +1,9 @@
use std::{ use std::error::Error;
error::Error,
io
};
use clap::{App, AppSettings, ArgMatches, SubCommand}; use clap::{App, AppSettings, ArgMatches, SubCommand};
use futures::Async; use futures::Async;
use super::StdinStream; use super::stdin_stream;
use webmetro::{ use webmetro::{
stream_parser::StreamEbml, stream_parser::StreamEbml,
webm::{ webm::{
@ -23,8 +20,7 @@ pub fn options() -> App<'static, 'static> {
pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> {
let stdin = io::stdin(); let mut events = stdin_stream().parse_ebml();
let mut events = StdinStream::new(stdin.lock()).parse_ebml();
// stdin is sync so Async::NotReady will never happen // stdin is sync so Async::NotReady will never happen
while let Ok(Async::Ready(Some(element))) = events.poll_event() { while let Ok(Async::Ready(Some(element))) = events.poll_event() {

View file

@ -7,7 +7,7 @@ use std::{
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
use super::StdinStream; use super::stdin_stream;
use webmetro::{ use webmetro::{
chunk::{ chunk::{
Chunk, Chunk,
@ -28,9 +28,8 @@ pub fn options() -> App<'static, 'static> {
pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
let stdin = io::stdin();
let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError>> = Box::new( let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError>> = Box::new(
StdinStream::new(stdin.lock()) stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.fix_timecodes() .fix_timecodes()

View file

@ -1,11 +1,19 @@
use std::io::{ use std::io::{
StdinLock, Error as IoError,
prelude::* stdin,
Stdin
}; };
use futures::{ use futures::{
Async, prelude::*,
stream::Stream stream::MapErr
};
use tokio_io::{
io::AllowStdIo,
codec::{
BytesCodec,
FramedRead
}
}; };
use webmetro::error::WebmetroError; use webmetro::error::WebmetroError;
@ -13,37 +21,10 @@ pub mod dump;
pub mod filter; pub mod filter;
pub mod relay; pub mod relay;
/// A hackish adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Buffers aren't optimized either /// is NOT actually async, and just uses blocking read. Don't use more than
/// and copy more than necessary. /// one at once, who knows who gets which bytes.
pub struct StdinStream<'a> { pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(IoError) -> WebmetroError> {
buf_reader: StdinLock<'a>, FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
read_bytes: usize .map_err(WebmetroError::IoError)
}
impl<'a> StdinStream<'a> {
pub fn new(lock: StdinLock<'a>) -> Self {
StdinStream {
buf_reader: lock,
read_bytes: 0
}
}
}
impl<'a> Stream for StdinStream<'a> {
type Item = Vec<u8>;
type Error = WebmetroError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.buf_reader.consume(self.read_bytes);
let read_bytes = &mut self.read_bytes;
self.buf_reader.fill_buf().map(|slice| {
*read_bytes = slice.len();
if *read_bytes > 0 {
Async::Ready(Some(Into::<Vec<u8>>::into(slice)))
} else {
Async::Ready(None)
}
}).map_err(WebmetroError::IoError)
}
} }

View file

@ -2,6 +2,7 @@
extern crate futures; extern crate futures;
extern crate hyper; extern crate hyper;
extern crate tokio; extern crate tokio;
extern crate tokio_io;
extern crate webmetro; extern crate webmetro;
mod commands; mod commands;