Have subcommands control launching the runtime.

This commit is contained in:
Tangent 128 2018-10-20 23:02:24 -04:00
parent 713a7d4741
commit 5d25d5adb7
4 changed files with 35 additions and 49 deletions

View file

@ -5,6 +5,7 @@ use std::{
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::prelude::*; use futures::prelude::*;
use tokio::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
@ -25,7 +26,7 @@ pub fn options() -> App<'static, 'static> {
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
pub fn run(args: &ArgMatches) -> impl Future<Item=(), Error=WebmetroError> + Send { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new( let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
@ -37,7 +38,7 @@ pub fn run(args: &ArgMatches) -> impl Future<Item=(), Error=WebmetroError> + Sen
chunk_stream = Box::new(chunk_stream.throttle()); chunk_stream = Box::new(chunk_stream.throttle());
} }
chunk_stream.for_each(|chunk| { Runtime::new().unwrap().block_on_all(chunk_stream.for_each(|chunk| {
io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError)
}) }))
} }

View file

@ -25,7 +25,6 @@ use hyper::{
Method, Method,
Request, Request,
Response, Response,
rt,
Server, Server,
service::Service, service::Service,
header::{ header::{
@ -33,6 +32,7 @@ use hyper::{
CONTENT_TYPE CONTENT_TYPE
} }
}; };
use tokio::runtime::Runtime;
use webmetro::{ use webmetro::{
channel::{ channel::{
Channel, Channel,
@ -138,14 +138,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?;
rt::run(Server::bind(&addr) Runtime::new().unwrap().block_on_all(Server::bind(&addr)
.serve(move || { .serve(move || {
ok::<_, WebmetroError>(RelayServer(single_channel.clone())) ok::<_, WebmetroError>(RelayServer(single_channel.clone()))
}) }).map_err(|err| WebmetroError::Unknown(Box::new(err)))
.map_err(|err| { )
println!("[Error] {}", err);
})
);
Ok(())
} }

View file

@ -1,6 +1,5 @@
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures::{
future,
prelude::* prelude::*
}; };
use hyper::{ use hyper::{
@ -8,6 +7,7 @@ use hyper::{
client::HttpConnector, client::HttpConnector,
Request Request
}; };
use tokio::runtime::Runtime;
use super::{ use super::{
stdin_stream, stdin_stream,
@ -36,7 +36,7 @@ pub fn options() -> App<'static, 'static> {
type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send>; type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send>;
pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send> { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut chunk_stream: BoxedChunkStream = Box::new( let mut chunk_stream: BoxedChunkStream = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
@ -46,7 +46,7 @@ pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send
let url_str = match args.value_of("url") { let url_str = match args.value_of("url") {
Some(url) => String::from(url), Some(url) => String::from(url),
_ => return Box::new(Err(WebmetroError::from_str("Listen address wasn't provided")).into_future()) _ => return Err(WebmetroError::from_str("Listen address wasn't provided"))
}; };
if args.is_present("throttle") { if args.is_present("throttle") {
@ -58,18 +58,19 @@ pub fn run(args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError> + Send
err err
})); }));
Box::new(future::lazy(move || {
Request::put(url_str) let request = Request::put(url_str)
.body(request_payload) .body(request_payload)
.map_err(WebmetroError::from_err) .map_err(WebmetroError::from_err)?;
}).and_then(|request| {
let client = Client::builder().build(HttpConnector::new(1)); let client = Client::builder().build(HttpConnector::new(1));
client.request(request) let future = client.request(request)
.and_then(|response| { .and_then(|response| {
response.into_body().for_each(|_chunk| { response.into_body().for_each(|_chunk| {
Ok(()) Ok(())
}) })
}) })
.map_err(WebmetroError::from_err) .map_err(WebmetroError::from_err);
}))
Runtime::new().unwrap().block_on_all(future)
} }

View file

@ -2,6 +2,7 @@
extern crate futures; extern crate futures;
extern crate http; extern crate http;
extern crate hyper; extern crate hyper;
extern crate tokio;
extern crate tokio_codec; extern crate tokio_codec;
extern crate tokio_io; extern crate tokio_io;
extern crate webmetro; extern crate webmetro;
@ -9,9 +10,6 @@ extern crate webmetro;
mod commands; mod commands;
use clap::{App, AppSettings}; use clap::{App, AppSettings};
use futures::prelude::*;
use hyper::rt;
use webmetro::error::WebmetroError;
use commands::{ use commands::{
relay, relay,
@ -36,25 +34,16 @@ fn main() {
let args = options().get_matches(); let args = options().get_matches();
match args.subcommand() { match args.subcommand() {
("filter", Some(sub_args)) => { tokio_run(filter::run(sub_args)); }, ("filter", Some(sub_args)) => filter::run(sub_args),
("relay", Some(sub_args)) => { relay::run(sub_args).unwrap_or_else(handle_error); }, ("relay", Some(sub_args)) => relay::run(sub_args),
("send", Some(sub_args)) => { tokio_run(send::run(sub_args)); }, ("send", Some(sub_args)) => send::run(sub_args),
("dump", Some(sub_args)) => { dump::run(sub_args).unwrap_or_else(handle_error); }, ("dump", Some(sub_args)) => dump::run(sub_args),
_ => { _ => {
options().print_help().unwrap(); options().print_help().unwrap();
println!(""); println!("");
Ok(())
} }
}; }.unwrap_or_else(|err| {
} eprintln!("Error: {}", err);
});
fn handle_error(err: WebmetroError) {
eprintln!("Error: {}", err);
}
fn tokio_run<T: IntoFuture<Item=(), Error=WebmetroError> + Send>(task: T)
where T::Future: Send + 'static {
rt::run(task.into_future().map_err(|err| {
handle_error(err);
::std::process::exit(1);
}));
} }