From e61244bce360d32109dca5c2fa00b14d3b6fb4e3 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Thu, 12 Apr 2018 00:14:16 -0400 Subject: [PATCH] Factor StdinStream out of dump applet --- src/commands/dump.rs | 23 ++++------------------ src/commands/mod.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/commands/dump.rs b/src/commands/dump.rs index af94cec..fa2fb40 100644 --- a/src/commands/dump.rs +++ b/src/commands/dump.rs @@ -1,14 +1,12 @@ use std::{ error::Error, - io::{self, prelude::*} + io }; use clap::{App, AppSettings, ArgMatches, SubCommand}; -use futures::{ - Async, - stream::poll_fn -}; +use futures::Async; +use super::StdinStream; use webmetro::{ stream_parser::StreamEbml, webm::{ @@ -26,20 +24,7 @@ pub fn options() -> App<'static, 'static> { pub fn run(_args: &ArgMatches) -> Result<(), Box> { let stdin = io::stdin(); - let mut buf_reader = stdin.lock(); - let mut read_bytes = 0; - - let mut events = poll_fn(|| { - buf_reader.consume(read_bytes); - buf_reader.fill_buf().map(|slice| { - read_bytes = slice.len(); - if read_bytes > 0 { - Async::Ready(Some(Into::>::into(slice))) - } else { - Async::Ready(None) - } - }) - }).parse_ebml(); + let mut events = StdinStream::new(stdin.lock()).parse_ebml(); // stdin is sync so Async::NotReady will never happen while let Ok(Async::Ready(Some(element))) = events.poll_event() { diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 98a684e..3ff2c18 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,2 +1,48 @@ +use std::io::{ + Error as IoError, + StdinLock, + prelude::* +}; + +use futures::{ + Async, + stream::Stream +}; + pub mod dump; pub mod relay; + +/// A hackish adapter that makes chunks of bytes from stdin available as a Stream; +/// is NOT actually async, and just uses blocking read. Buffers aren't optimized either +/// and copy more than necessary. +pub struct StdinStream<'a> { + buf_reader: StdinLock<'a>, + read_bytes: usize +} + +impl<'a> StdinStream<'a> { + pub fn new(lock: StdinLock<'a>) -> Self { + StdinStream { + buf_reader: lock, + read_bytes: 0 + } + } +} + +impl<'a> Stream for StdinStream<'a> { + type Item = Vec; + type Error = IoError; + + fn poll(&mut self) -> Result>, Self::Error> { + self.buf_reader.consume(self.read_bytes); + let read_bytes = &mut self.read_bytes; + self.buf_reader.fill_buf().map(|slice| { + *read_bytes = slice.len(); + if *read_bytes > 0 { + Async::Ready(Some(Into::>::into(slice))) + } else { + Async::Ready(None) + } + }) + } +}