Use hyper::Body instead of a custom Payload type

This commit is contained in:
Tangent 128 2018-10-21 02:25:24 -04:00
parent 26930b07e0
commit 9b9b6beb54
4 changed files with 18 additions and 36 deletions

View file

@ -1,5 +1,4 @@
use std::io::{ use std::io::{
Cursor,
Error as IoError, Error as IoError,
stdin, stdin,
Stdin Stdin
@ -9,16 +8,12 @@ use futures::{
prelude::*, prelude::*,
stream::MapErr stream::MapErr
}; };
use hyper::body::Payload;
use tokio_io::io::AllowStdIo; use tokio_io::io::AllowStdIo;
use tokio_codec::{ use tokio_codec::{
BytesCodec, BytesCodec,
FramedRead FramedRead
}; };
use webmetro::{ use webmetro::error::WebmetroError;
chunk::Chunk,
error::WebmetroError,
};
pub mod dump; pub mod dump;
pub mod filter; pub mod filter;
@ -32,15 +27,3 @@ pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(Io
FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
.map_err(WebmetroError::IoError) .map_err(WebmetroError::IoError)
} }
/// A wrapper to make a Stream of Webm chunks work as a payload for Hyper
pub struct WebmPayload<S: Send + 'static>(pub S);
impl<S: Stream<Item = Chunk, Error = WebmetroError> + Send + 'static> Payload for WebmPayload<S> {
type Data = Cursor<Chunk>;
type Error = S::Error;
fn poll_data(&mut self) -> Poll<Option<Cursor<Chunk>>, WebmetroError> {
self.0.poll().map(|async| async.map(|option| option.map(Cursor::new)))
}
}

View file

@ -1,10 +1,10 @@
use std::error::Error;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{ use std::sync::{
Arc, Arc,
Mutex Mutex
}; };
use bytes::Bytes;
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures::{
Future, Future,
@ -39,14 +39,12 @@ use webmetro::{
Listener, Listener,
Transmitter Transmitter
}, },
chunk::{Chunk, WebmStream}, chunk::WebmStream,
error::WebmetroError, error::WebmetroError,
fixers::ChunkStream, fixers::ChunkStream,
stream_parser::StreamEbml stream_parser::StreamEbml
}; };
use super::WebmPayload;
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
struct RelayServer(Arc<Mutex<Channel>>); struct RelayServer(Arc<Mutex<Channel>>);
@ -56,15 +54,15 @@ impl RelayServer {
self.0.clone() self.0.clone()
} }
fn get_stream(&self) -> impl Stream<Item = Chunk, Error = WebmetroError> { fn get_stream(&self) -> impl Stream<Item = Bytes, Error = WebmetroError> {
Listener::new(self.get_channel()) Listener::new(self.get_channel())
.fix_timecodes() .fix_timecodes()
.find_starting_point() .find_starting_point()
.map(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err| match err {}) .map_err(|err| match err {})
} }
fn post_stream<I: AsRef<[u8]>, S: Stream<Item = I> + Send + 'static>(&self, stream: S) -> impl Stream<Item = Chunk, Error = WebmetroError> fn post_stream(&self, stream: Body) -> impl Stream<Item = Bytes, Error = WebmetroError> {
where S::Error: Error + Send + Sync {
let source = stream let source = stream
.map_err(WebmetroError::from_err) .map_err(WebmetroError::from_err)
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)
@ -82,13 +80,11 @@ impl RelayServer {
} }
} }
type BoxedBodyStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send + 'static>;
impl Service for RelayServer { impl Service for RelayServer {
type ReqBody = Body; type ReqBody = Body;
type ResBody = WebmPayload<BoxedBodyStream>; type ResBody = Body;
type Error = WebmetroError; type Error = WebmetroError;
type Future = FutureResult<Response<WebmPayload<BoxedBodyStream>>, WebmetroError>; type Future = FutureResult<Response<Body>, WebmetroError>;
fn call(&mut self, request: Request<Body>) -> Self::Future { fn call(&mut self, request: Request<Body>) -> Self::Future {
let (Parts {method, uri, ..}, request_body) = request.into_parts(); let (Parts {method, uri, ..}, request_body) = request.into_parts();
@ -99,7 +95,7 @@ impl Service for RelayServer {
.header(CONTENT_TYPE, "video/webm") .header(CONTENT_TYPE, "video/webm")
.header("X-Accel-Buffering", "no") .header("X-Accel-Buffering", "no")
.header(CACHE_CONTROL, "no-cache, no-store") .header(CACHE_CONTROL, "no-cache, no-store")
.body(WebmPayload(Box::new(empty()) as BoxedBodyStream)) .body(Body::empty())
.unwrap() .unwrap()
}, },
(Method::GET, "/live") => { (Method::GET, "/live") => {
@ -107,17 +103,17 @@ impl Service for RelayServer {
.header(CONTENT_TYPE, "video/webm") .header(CONTENT_TYPE, "video/webm")
.header("X-Accel-Buffering", "no") .header("X-Accel-Buffering", "no")
.header(CACHE_CONTROL, "no-cache, no-store") .header(CACHE_CONTROL, "no-cache, no-store")
.body(WebmPayload(Box::new(self.get_stream()) as BoxedBodyStream)) .body(Body::wrap_stream(self.get_stream()))
.unwrap() .unwrap()
}, },
(Method::POST, "/live") | (Method::PUT, "/live") => { (Method::POST, "/live") | (Method::PUT, "/live") => {
println!("[Info] New source on {}", uri.path()); println!("[Info] New source on {}", uri.path());
Response::new(WebmPayload(Box::new(self.post_stream(request_body)) as BoxedBodyStream)) Response::new(Body::wrap_stream(self.post_stream(request_body)))
}, },
_ => { _ => {
Response::builder() Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::NOT_FOUND)
.body(WebmPayload(Box::new(empty()) as BoxedBodyStream)) .body(Body::empty())
.unwrap() .unwrap()
} }
}) })

View file

@ -3,6 +3,7 @@ use futures::{
prelude::* prelude::*
}; };
use hyper::{ use hyper::{
Body,
Client, Client,
client::HttpConnector, client::HttpConnector,
Request Request
@ -10,8 +11,7 @@ use hyper::{
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use super::{ use super::{
stdin_stream, stdin_stream
WebmPayload
}; };
use webmetro::{ use webmetro::{
chunk::{ chunk::{
@ -53,7 +53,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
chunk_stream = Box::new(chunk_stream.throttle()); chunk_stream = Box::new(chunk_stream.throttle());
} }
let request_payload = WebmPayload(chunk_stream.map_err(|err| { let request_payload = Body::wrap_stream(chunk_stream.map(
|webm_chunk| webm_chunk.into_bytes()
).map_err(|err| {
eprintln!("{}", &err); eprintln!("{}", &err);
err err
})); }));

View file

@ -1,3 +1,4 @@
extern crate bytes;
#[macro_use] extern crate clap; #[macro_use] extern crate clap;
extern crate futures; extern crate futures;
extern crate http; extern crate http;