Give all subcommands option of returning a future to run in a Tokio context

This commit is contained in:
Tangent 128 2018-04-14 22:34:46 -04:00
parent 0075e52f7b
commit 49541347a4
5 changed files with 43 additions and 26 deletions

View file

@ -1,10 +1,9 @@
use std::error::Error;
use clap::{App, AppSettings, ArgMatches, SubCommand}; use clap::{App, AppSettings, ArgMatches, SubCommand};
use futures::Async; use futures::prelude::*;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
error::WebmetroError,
stream_parser::StreamEbml, stream_parser::StreamEbml,
webm::{ webm::{
SimpleBlock, SimpleBlock,
@ -18,7 +17,7 @@ pub fn options() -> App<'static, 'static> {
.about("Dumps WebM parsing events from parsing stdin") .about("Dumps WebM parsing events from parsing stdin")
} }
pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
let mut events = stdin_stream().parse_ebml(); let mut events = stdin_stream().parse_ebml();

View file

@ -1,12 +1,10 @@
use std::{ use std::{
error::Error,
io, io,
io::prelude::* io::prelude::*
}; };
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
use tokio;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -27,8 +25,7 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)")) .help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)"))
} }
pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send> {
let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new( let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
@ -40,12 +37,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
chunk_stream = Box::new(chunk_stream.throttle()); chunk_stream = Box::new(chunk_stream.throttle());
} }
tokio::run(chunk_stream.for_each(|chunk| { Box::new(chunk_stream.for_each(|chunk| {
io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError)
}).map_err(|err| { }))
println!("Error: {}", err);
::std::process::exit(1);
}));
Ok(())
} }

View file

@ -122,12 +122,19 @@ pub fn options() -> App<'static, 'static> {
.required(true)) .required(true))
} }
pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let single_channel = Channel::new(); let single_channel = Channel::new();
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?;
Http::new().bind(&addr, move || Ok(RelayServer(single_channel.clone())))?.run()?; Http::new()
.bind(&addr, move || {
Ok(RelayServer(single_channel.clone()))
})
.map_err(|err| WebmetroError::Unknown(Box::new(err)))?
.run()
.map_err(|err| WebmetroError::Unknown(Box::new(err)))?;
Ok(()) Ok(())
} }

View file

@ -53,3 +53,10 @@ impl From<Box<Error + Send>> for WebmetroError {
WebmetroError::Unknown(err) WebmetroError::Unknown(err)
} }
} }
impl<'a> From<&'a str> for WebmetroError {
fn from(err: &'a str) -> WebmetroError {
let error: Box<Error + Send + Sync> = err.into();
WebmetroError::Unknown(error)
}
}

View file

@ -8,6 +8,9 @@ extern crate webmetro;
mod commands; mod commands;
use clap::{App, AppSettings}; use clap::{App, AppSettings};
use futures::prelude::*;
use webmetro::error::WebmetroError;
use commands::{ use commands::{
relay, relay,
filter, filter,
@ -27,17 +30,26 @@ fn options() -> App<'static, 'static> {
fn main() { fn main() {
let args = options().get_matches(); let args = options().get_matches();
match args.subcommand() { tokio_run(match args.subcommand() {
("filter", Some(sub_args)) => filter::run(sub_args), ("filter", Some(sub_args)) => box_up(filter::run(sub_args)),
("relay", Some(sub_args)) => relay::run(sub_args), ("relay", Some(sub_args)) => box_up(relay::run(sub_args)),
("dump", Some(sub_args)) => dump::run(sub_args), ("dump", Some(sub_args)) => box_up(dump::run(sub_args)),
_ => { _ => box_up(futures::lazy(|| {
options().print_help().unwrap(); options().print_help().unwrap();
println!(""); println!("");
Ok(()) Ok(())
} }))
}.unwrap_or_else(|err| {
println!("Error: {}", err);
::std::process::exit(1);
}); });
} }
fn tokio_run(task: Box<Future<Item=(), Error=WebmetroError> + Send + 'static>) {
tokio::run(task.into_future().map_err(|err| {
eprintln!("Error: {}", err);
::std::process::exit(1);
}));
}
fn box_up<F: IntoFuture<Item=(), Error=WebmetroError>>(task: F) -> Box<Future<Item=(), Error=WebmetroError> + Send + 'static>
where F::Future: Send + 'static {
Box::new(task.into_future())
}