From 5d25d5adb7a5f52ebff2be3e27f56383552376cc Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sat, 20 Oct 2018 23:02:24 -0400 Subject: [PATCH] Have subcommands control launching the runtime. --- src/commands/filter.rs | 7 ++++--- src/commands/relay.rs | 13 ++++--------- src/commands/send.rs | 35 ++++++++++++++++++----------------- src/main.rs | 29 +++++++++-------------------- 4 files changed, 35 insertions(+), 49 deletions(-) diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 9aa76a1..76b8575 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -5,6 +5,7 @@ use std::{ use clap::{App, Arg, ArgMatches, SubCommand}; use futures::prelude::*; +use tokio::runtime::Runtime; use super::stdin_stream; use webmetro::{ @@ -25,7 +26,7 @@ pub fn options() -> App<'static, 'static> { .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) } -pub fn run(args: &ArgMatches) -> impl Future + Send { +pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let mut chunk_stream: Box + Send> = Box::new( stdin_stream() .parse_ebml() @@ -37,7 +38,7 @@ pub fn run(args: &ArgMatches) -> impl Future + Sen chunk_stream = Box::new(chunk_stream.throttle()); } - chunk_stream.for_each(|chunk| { + Runtime::new().unwrap().block_on_all(chunk_stream.for_each(|chunk| { io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) - }) + })) } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index f6d3ff0..ff11087 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -25,7 +25,6 @@ use hyper::{ Method, Request, Response, - rt, Server, service::Service, header::{ @@ -33,6 +32,7 @@ use hyper::{ CONTENT_TYPE } }; +use tokio::runtime::Runtime; use webmetro::{ channel::{ Channel, @@ -138,14 +138,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; - rt::run(Server::bind(&addr) + Runtime::new().unwrap().block_on_all(Server::bind(&addr) .serve(move || { ok::<_, WebmetroError>(RelayServer(single_channel.clone())) - }) - .map_err(|err| { - println!("[Error] {}", err); - }) - ); - - Ok(()) + }).map_err(|err| WebmetroError::Unknown(Box::new(err))) + ) } diff --git a/src/commands/send.rs b/src/commands/send.rs index d9c48c3..cd6f3b8 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -1,6 +1,5 @@ use clap::{App, Arg, ArgMatches, SubCommand}; use futures::{ - future, prelude::* }; use hyper::{ @@ -8,6 +7,7 @@ use hyper::{ client::HttpConnector, Request }; +use tokio::runtime::Runtime; use super::{ stdin_stream, @@ -36,7 +36,7 @@ pub fn options() -> App<'static, 'static> { type BoxedChunkStream = Box + Send>; -pub fn run(args: &ArgMatches) -> Box + Send> { +pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let mut chunk_stream: BoxedChunkStream = Box::new( stdin_stream() .parse_ebml() @@ -46,7 +46,7 @@ pub fn run(args: &ArgMatches) -> Box + Send let url_str = match args.value_of("url") { Some(url) => String::from(url), - _ => return Box::new(Err(WebmetroError::from_str("Listen address wasn't provided")).into_future()) + _ => return Err(WebmetroError::from_str("Listen address wasn't provided")) }; if args.is_present("throttle") { @@ -58,18 +58,19 @@ pub fn run(args: &ArgMatches) -> Box + Send err })); - Box::new(future::lazy(move || { - Request::put(url_str) - .body(request_payload) - .map_err(WebmetroError::from_err) - }).and_then(|request| { - let client = Client::builder().build(HttpConnector::new(1)); - client.request(request) - .and_then(|response| { - response.into_body().for_each(|_chunk| { - Ok(()) - }) - }) - .map_err(WebmetroError::from_err) - })) + + let request = Request::put(url_str) + .body(request_payload) + .map_err(WebmetroError::from_err)?; + + let client = Client::builder().build(HttpConnector::new(1)); + let future = client.request(request) + .and_then(|response| { + response.into_body().for_each(|_chunk| { + Ok(()) + }) + }) + .map_err(WebmetroError::from_err); + + Runtime::new().unwrap().block_on_all(future) } diff --git a/src/main.rs b/src/main.rs index e252548..e2616cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate futures; extern crate http; extern crate hyper; +extern crate tokio; extern crate tokio_codec; extern crate tokio_io; extern crate webmetro; @@ -9,9 +10,6 @@ extern crate webmetro; mod commands; use clap::{App, AppSettings}; -use futures::prelude::*; -use hyper::rt; -use webmetro::error::WebmetroError; use commands::{ relay, @@ -36,25 +34,16 @@ fn main() { let args = options().get_matches(); match args.subcommand() { - ("filter", Some(sub_args)) => { tokio_run(filter::run(sub_args)); }, - ("relay", Some(sub_args)) => { relay::run(sub_args).unwrap_or_else(handle_error); }, - ("send", Some(sub_args)) => { tokio_run(send::run(sub_args)); }, - ("dump", Some(sub_args)) => { dump::run(sub_args).unwrap_or_else(handle_error); }, + ("filter", Some(sub_args)) => filter::run(sub_args), + ("relay", Some(sub_args)) => relay::run(sub_args), + ("send", Some(sub_args)) => send::run(sub_args), + ("dump", Some(sub_args)) => dump::run(sub_args), _ => { options().print_help().unwrap(); println!(""); + Ok(()) } - }; -} - -fn handle_error(err: WebmetroError) { - eprintln!("Error: {}", err); -} - -fn tokio_run + Send>(task: T) -where T::Future: Send + 'static { - rt::run(task.into_future().map_err(|err| { - handle_error(err); - ::std::process::exit(1); - })); + }.unwrap_or_else(|err| { + eprintln!("Error: {}", err); + }); }