diff --git a/src/channel.rs b/src/channel.rs index 80d7725..aa9f10d 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -30,8 +30,10 @@ pub struct Channel { listeners: Vec> } +pub type Handle = Arc>; + impl Channel { - pub fn new() -> Arc> { + pub fn new() -> Handle { Arc::new(Mutex::new(Channel { header_chunk: None, listeners: Vec::new() @@ -40,11 +42,11 @@ impl Channel { } pub struct Transmitter { - channel: Arc> + channel: Handle } impl Transmitter { - pub fn new(channel_arc: Arc>) -> Self { + pub fn new(channel_arc: Handle) -> Self { Transmitter { channel: channel_arc } @@ -77,12 +79,12 @@ impl Sink for Transmitter { pub struct Listener { /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter - _channel: Arc>, + _channel: Handle, receiver: Receiver } impl Listener { - pub fn new(channel_arc: Arc>) -> Self { + pub fn new(channel_arc: Handle) -> Self { let (mut sender, receiver) = mpsc_channel(5); { diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 7ebebe5..26810b7 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -1,9 +1,5 @@ use std::error::Error; use std::net::ToSocketAddrs; -use std::sync::{ - Arc, - Mutex -}; use bytes::{Bytes, Buf}; use clap::{App, Arg, ArgMatches, SubCommand}; @@ -28,6 +24,7 @@ use warp::{ use webmetro::{ channel::{ Channel, + Handle, Listener, Transmitter }, @@ -39,37 +36,29 @@ use webmetro::{ const BUFFER_LIMIT: usize = 2 * 1024 * 1024; -struct RelayServer(Arc>); +fn get_stream(channel: Handle) -> impl Stream { + Listener::new(channel) + .fix_timecodes() + .find_starting_point() + .map(|webm_chunk| webm_chunk.into_bytes()) + .map_err(|err| match err {}) +} -impl RelayServer { - fn get_channel(&self) -> Arc> { - self.0.clone() - } +fn post_stream(channel: Handle, stream: impl Stream) -> impl Stream { + let source = stream + .map_err(WebmetroError::from_err) + .parse_ebml().with_soft_limit(BUFFER_LIMIT) + .chunk_webm().with_soft_limit(BUFFER_LIMIT); + let sink = Transmitter::new(channel); - fn get_stream(&self) -> impl Stream { - Listener::new(self.get_channel()) - .fix_timecodes() - .find_starting_point() - .map(|webm_chunk| webm_chunk.into_bytes()) - .map_err(|err| match err {}) - } - - fn post_stream(&self, stream: impl Stream) -> impl Stream { - let source = stream - .map_err(WebmetroError::from_err) - .parse_ebml().with_soft_limit(BUFFER_LIMIT) - .chunk_webm().with_soft_limit(BUFFER_LIMIT); - let sink = Transmitter::new(self.get_channel()); - - source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) - .into_stream() - .map(|_| empty()) - .map_err(|err| { - println!("[Warning] {}", err); - err - }) - .flatten() - } + source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) + .into_stream() + .map(|_| empty()) + .map_err(|err| { + println!("[Warning] {}", err); + err + }) + .flatten() } fn media_response(body: Body) -> Response { @@ -95,19 +84,19 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; - let relay_server = path!("live").map(move || RelayServer(single_channel.clone())); + let channel = path!("live").map(move || single_channel.clone()); - let head = relay_server.clone().and(warp::head()) + let head = channel.clone().and(warp::head()) .map(|_| media_response(Body::empty())); - let get = relay_server.clone().and(warp::get2()) - .map(|server: RelayServer| media_response(Body::wrap_stream(server.get_stream()))); + let get = channel.clone().and(warp::get2()) + .map(|channel| media_response(Body::wrap_stream(get_stream(channel)))); - let post_put = relay_server.clone().and(warp::post2().or(warp::put2()).unify()) - .and(warp::body::stream()).map(|server: RelayServer, stream| { - println!("[Info] Source Connected"); - Response::new(Body::wrap_stream(server.post_stream(stream))) - }); + let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify()) + .and(warp::body::stream()).map(|channel, stream| { + println!("[Info] Source Connected"); + Response::new(Body::wrap_stream(post_stream(channel, stream))) + }); let routes = head .or(get)