diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 1cdecde..9aa76a1 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -25,7 +25,7 @@ pub fn options() -> App<'static, 'static> { .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) } -pub fn run(args: &ArgMatches) -> Box + Send> { +pub fn run(args: &ArgMatches) -> impl Future + Send { let mut chunk_stream: Box + Send> = Box::new( stdin_stream() .parse_ebml() @@ -37,7 +37,7 @@ pub fn run(args: &ArgMatches) -> Box + Send chunk_stream = Box::new(chunk_stream.throttle()); } - Box::new(chunk_stream.for_each(|chunk| { + chunk_stream.for_each(|chunk| { io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) - })) + }) } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 30a3b0b..baed627 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -48,8 +48,6 @@ header! { (XAccelBuffering, "X-Accel-Buffering") => [String] } const BUFFER_LIMIT: usize = 2 * 1024 * 1024; -type BodyStream = Box>; - struct RelayServer(Arc>); impl RelayServer { @@ -57,16 +55,14 @@ impl RelayServer { self.0.clone() } - fn get_stream(&self) -> BodyStream { - Box::new( - Listener::new(self.get_channel()) - .fix_timecodes() - .find_starting_point() - .map_err(|err| match err {}) - ) + fn get_stream(&self) -> impl Stream { + Listener::new(self.get_channel()) + .fix_timecodes() + .find_starting_point() + .map_err(|err| match err {}) } - fn post_stream, S: Stream + 'static>(&self, stream: S) -> BodyStream + fn post_stream, S: Stream + 'static>(&self, stream: S) -> impl Stream where S::Error: Error + Send { let source = stream .map_err(WebmetroError::from_err) @@ -74,22 +70,22 @@ impl RelayServer { .chunk_webm().with_soft_limit(BUFFER_LIMIT); let sink = Transmitter::new(self.get_channel()); - Box::new( - source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) - .into_stream() - .map(|_| empty()) - .map_err(|err| { - //TODO: log something somewhere - to_hyper_error(err) - }) - .flatten() - ) + source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) + .into_stream() + .map(|_| empty()) + .map_err(|err| { + println!("[Warning] {}", err); + to_hyper_error(err) + }) + .flatten() } } +type BoxedBodyStream = Box>; + impl Service for RelayServer { type Request = Request; - type Response = Response; + type Response = Response; type Error = HyperError; type Future = FutureResult; @@ -110,11 +106,11 @@ impl Service for RelayServer { .with_header(ContentType("video/webm".parse().unwrap())) .with_header(XAccelBuffering("no".to_string())) .with_header(CacheControl(vec![CacheDirective::NoCache, CacheDirective::NoStore])) - .with_body(self.get_stream()) + .with_body(Box::new(self.get_stream()) as BoxedBodyStream) }, (Post, "/live") | (Put, "/live") => { Response::new() - .with_body(self.post_stream(request_body)) + .with_body(Box::new(self.post_stream(request_body)) as BoxedBodyStream) }, _ => { Response::new()