diff --git a/src/commands/dump.rs b/src/commands/dump.rs index 7a83a69..8b691e6 100644 --- a/src/commands/dump.rs +++ b/src/commands/dump.rs @@ -1,10 +1,9 @@ -use std::error::Error; - use clap::{App, AppSettings, ArgMatches, SubCommand}; -use futures::Async; +use futures::prelude::*; use super::stdin_stream; use webmetro::{ + error::WebmetroError, stream_parser::StreamEbml, webm::{ SimpleBlock, @@ -18,7 +17,7 @@ pub fn options() -> App<'static, 'static> { .about("Dumps WebM parsing events from parsing stdin") } -pub fn run(_args: &ArgMatches) -> Result<(), Box> { +pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> { let mut events = stdin_stream().parse_ebml(); diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 2062d01..a276d88 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -1,12 +1,10 @@ use std::{ - error::Error, io, io::prelude::* }; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::prelude::*; -use tokio; use super::stdin_stream; use webmetro::{ @@ -27,8 +25,7 @@ pub fn options() -> App<'static, 'static> { .help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)")) } -pub fn run(args: &ArgMatches) -> Result<(), Box> { - +pub fn run(args: &ArgMatches) -> Box + Send> { let mut chunk_stream: Box + Send> = Box::new( stdin_stream() .parse_ebml() @@ -40,12 +37,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Box> { chunk_stream = Box::new(chunk_stream.throttle()); } - tokio::run(chunk_stream.for_each(|chunk| { + Box::new(chunk_stream.for_each(|chunk| { io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) - }).map_err(|err| { - println!("Error: {}", err); - ::std::process::exit(1); - })); - - Ok(()) + })) } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 6c5b29b..88e116d 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -122,12 +122,19 @@ pub fn options() -> App<'static, 'static> { .required(true)) } -pub fn run(args: &ArgMatches) -> Result<(), Box> { +pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let single_channel = Channel::new(); let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; - Http::new().bind(&addr, move || Ok(RelayServer(single_channel.clone())))?.run()?; + Http::new() + .bind(&addr, move || { + Ok(RelayServer(single_channel.clone())) + }) + .map_err(|err| WebmetroError::Unknown(Box::new(err)))? + .run() + .map_err(|err| WebmetroError::Unknown(Box::new(err)))?; + Ok(()) } diff --git a/src/error.rs b/src/error.rs index 050f147..d0c754d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -53,3 +53,10 @@ impl From> for WebmetroError { WebmetroError::Unknown(err) } } + +impl<'a> From<&'a str> for WebmetroError { + fn from(err: &'a str) -> WebmetroError { + let error: Box = err.into(); + WebmetroError::Unknown(error) + } +} diff --git a/src/main.rs b/src/main.rs index 9b5cd7d..5278bc9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,9 @@ extern crate webmetro; mod commands; use clap::{App, AppSettings}; +use futures::prelude::*; +use webmetro::error::WebmetroError; + use commands::{ relay, filter, @@ -27,17 +30,26 @@ fn options() -> App<'static, 'static> { fn main() { let args = options().get_matches(); - match args.subcommand() { - ("filter", Some(sub_args)) => filter::run(sub_args), - ("relay", Some(sub_args)) => relay::run(sub_args), - ("dump", Some(sub_args)) => dump::run(sub_args), - _ => { + tokio_run(match args.subcommand() { + ("filter", Some(sub_args)) => box_up(filter::run(sub_args)), + ("relay", Some(sub_args)) => box_up(relay::run(sub_args)), + ("dump", Some(sub_args)) => box_up(dump::run(sub_args)), + _ => box_up(futures::lazy(|| { options().print_help().unwrap(); println!(""); Ok(()) - } - }.unwrap_or_else(|err| { - println!("Error: {}", err); - ::std::process::exit(1); + })) }); } + +fn tokio_run(task: Box + Send + 'static>) { + tokio::run(task.into_future().map_err(|err| { + eprintln!("Error: {}", err); + ::std::process::exit(1); + })); +} + +fn box_up>(task: F) -> Box + Send + 'static> +where F::Future: Send + 'static { + Box::new(task.into_future()) +}