diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 2628da7..7e1abfd 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,5 +1,4 @@ use std::io::{ - Cursor, Error as IoError, stdin, Stdin @@ -9,16 +8,12 @@ use futures::{ prelude::*, stream::MapErr }; -use hyper::body::Payload; use tokio_io::io::AllowStdIo; use tokio_codec::{ BytesCodec, FramedRead }; -use webmetro::{ - chunk::Chunk, - error::WebmetroError, -}; +use webmetro::error::WebmetroError; pub mod dump; pub mod filter; @@ -32,15 +27,3 @@ pub fn stdin_stream() -> MapErr, BytesCodec>, fn(Io FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) .map_err(WebmetroError::IoError) } - -/// A wrapper to make a Stream of Webm chunks work as a payload for Hyper -pub struct WebmPayload(pub S); - -impl + Send + 'static> Payload for WebmPayload { - type Data = Cursor; - type Error = S::Error; - - fn poll_data(&mut self) -> Poll>, WebmetroError> { - self.0.poll().map(|async| async.map(|option| option.map(Cursor::new))) - } -} diff --git a/src/commands/relay.rs b/src/commands/relay.rs index ff11087..232cfc6 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -1,10 +1,10 @@ -use std::error::Error; use std::net::ToSocketAddrs; use std::sync::{ Arc, Mutex }; +use bytes::Bytes; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::{ Future, @@ -39,14 +39,12 @@ use webmetro::{ Listener, Transmitter }, - chunk::{Chunk, WebmStream}, + chunk::WebmStream, error::WebmetroError, fixers::ChunkStream, stream_parser::StreamEbml }; -use super::WebmPayload; - const BUFFER_LIMIT: usize = 2 * 1024 * 1024; struct RelayServer(Arc>); @@ -56,15 +54,15 @@ impl RelayServer { self.0.clone() } - fn get_stream(&self) -> impl Stream { + fn get_stream(&self) -> impl Stream { Listener::new(self.get_channel()) .fix_timecodes() .find_starting_point() + .map(|webm_chunk| webm_chunk.into_bytes()) .map_err(|err| match err {}) } - fn post_stream, S: Stream + Send + 'static>(&self, stream: S) -> impl Stream - where S::Error: Error + Send + Sync { + fn post_stream(&self, stream: Body) -> impl Stream { let source = stream .map_err(WebmetroError::from_err) .parse_ebml().with_soft_limit(BUFFER_LIMIT) @@ -82,13 +80,11 @@ impl RelayServer { } } -type BoxedBodyStream = Box + Send + 'static>; - impl Service for RelayServer { type ReqBody = Body; - type ResBody = WebmPayload; + type ResBody = Body; type Error = WebmetroError; - type Future = FutureResult>, WebmetroError>; + type Future = FutureResult, WebmetroError>; fn call(&mut self, request: Request) -> Self::Future { let (Parts {method, uri, ..}, request_body) = request.into_parts(); @@ -99,7 +95,7 @@ impl Service for RelayServer { .header(CONTENT_TYPE, "video/webm") .header("X-Accel-Buffering", "no") .header(CACHE_CONTROL, "no-cache, no-store") - .body(WebmPayload(Box::new(empty()) as BoxedBodyStream)) + .body(Body::empty()) .unwrap() }, (Method::GET, "/live") => { @@ -107,17 +103,17 @@ impl Service for RelayServer { .header(CONTENT_TYPE, "video/webm") .header("X-Accel-Buffering", "no") .header(CACHE_CONTROL, "no-cache, no-store") - .body(WebmPayload(Box::new(self.get_stream()) as BoxedBodyStream)) + .body(Body::wrap_stream(self.get_stream())) .unwrap() }, (Method::POST, "/live") | (Method::PUT, "/live") => { println!("[Info] New source on {}", uri.path()); - Response::new(WebmPayload(Box::new(self.post_stream(request_body)) as BoxedBodyStream)) + Response::new(Body::wrap_stream(self.post_stream(request_body))) }, _ => { Response::builder() .status(StatusCode::NOT_FOUND) - .body(WebmPayload(Box::new(empty()) as BoxedBodyStream)) + .body(Body::empty()) .unwrap() } }) diff --git a/src/commands/send.rs b/src/commands/send.rs index 03467f3..148e9ad 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -3,6 +3,7 @@ use futures::{ prelude::* }; use hyper::{ + Body, Client, client::HttpConnector, Request @@ -10,8 +11,7 @@ use hyper::{ use tokio::runtime::Runtime; use super::{ - stdin_stream, - WebmPayload + stdin_stream }; use webmetro::{ chunk::{ @@ -53,7 +53,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { chunk_stream = Box::new(chunk_stream.throttle()); } - let request_payload = WebmPayload(chunk_stream.map_err(|err| { + let request_payload = Body::wrap_stream(chunk_stream.map( + |webm_chunk| webm_chunk.into_bytes() + ).map_err(|err| { eprintln!("{}", &err); err })); diff --git a/src/main.rs b/src/main.rs index e2616cb..4f45e54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +extern crate bytes; #[macro_use] extern crate clap; extern crate futures; extern crate http;