Remove unneeded struct

This commit is contained in:
Tangent 128 2018-10-21 22:29:36 -04:00
parent 01bb2c2e19
commit f1c1114835
2 changed files with 38 additions and 47 deletions

View File

@ -30,8 +30,10 @@ pub struct Channel {
listeners: Vec<Sender<Chunk>> listeners: Vec<Sender<Chunk>>
} }
pub type Handle = Arc<Mutex<Channel>>;
impl Channel { impl Channel {
pub fn new() -> Arc<Mutex<Channel>> { pub fn new() -> Handle {
Arc::new(Mutex::new(Channel { Arc::new(Mutex::new(Channel {
header_chunk: None, header_chunk: None,
listeners: Vec::new() listeners: Vec::new()
@ -40,11 +42,11 @@ impl Channel {
} }
pub struct Transmitter { pub struct Transmitter {
channel: Arc<Mutex<Channel>> channel: Handle
} }
impl Transmitter { impl Transmitter {
pub fn new(channel_arc: Arc<Mutex<Channel>>) -> Self { pub fn new(channel_arc: Handle) -> Self {
Transmitter { Transmitter {
channel: channel_arc channel: channel_arc
} }
@ -77,12 +79,12 @@ impl Sink for Transmitter {
pub struct Listener { pub struct Listener {
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
_channel: Arc<Mutex<Channel>>, _channel: Handle,
receiver: Receiver<Chunk> receiver: Receiver<Chunk>
} }
impl Listener { impl Listener {
pub fn new(channel_arc: Arc<Mutex<Channel>>) -> Self { pub fn new(channel_arc: Handle) -> Self {
let (mut sender, receiver) = mpsc_channel(5); let (mut sender, receiver) = mpsc_channel(5);
{ {

View File

@ -1,9 +1,5 @@
use std::error::Error; use std::error::Error;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{
Arc,
Mutex
};
use bytes::{Bytes, Buf}; use bytes::{Bytes, Buf};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
@ -28,6 +24,7 @@ use warp::{
use webmetro::{ use webmetro::{
channel::{ channel::{
Channel, Channel,
Handle,
Listener, Listener,
Transmitter Transmitter
}, },
@ -39,37 +36,29 @@ use webmetro::{
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
struct RelayServer(Arc<Mutex<Channel>>); fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
Listener::new(channel)
.fix_timecodes()
.find_starting_point()
.map(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err| match err {})
}
impl RelayServer { fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = impl Error + Send + Sync + 'static>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
fn get_channel(&self) -> Arc<Mutex<Channel>> { let source = stream
self.0.clone() .map_err(WebmetroError::from_err)
} .parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT);
let sink = Transmitter::new(channel);
fn get_stream(&self) -> impl Stream<Item = Bytes, Error = WebmetroError> { source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
Listener::new(self.get_channel()) .into_stream()
.fix_timecodes() .map(|_| empty())
.find_starting_point() .map_err(|err| {
.map(|webm_chunk| webm_chunk.into_bytes()) println!("[Warning] {}", err);
.map_err(|err| match err {}) err
} })
.flatten()
fn post_stream(&self, stream: impl Stream<Item = impl Buf, Error = impl Error + Send + Sync + 'static>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
let source = stream
.map_err(WebmetroError::from_err)
.parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT);
let sink = Transmitter::new(self.get_channel());
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.into_stream()
.map(|_| empty())
.map_err(|err| {
println!("[Warning] {}", err);
err
})
.flatten()
}
} }
fn media_response(body: Body) -> Response<Body> { fn media_response(body: Body) -> Response<Body> {
@ -95,19 +84,19 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?;
let relay_server = path!("live").map(move || RelayServer(single_channel.clone())); let channel = path!("live").map(move || single_channel.clone());
let head = relay_server.clone().and(warp::head()) let head = channel.clone().and(warp::head())
.map(|_| media_response(Body::empty())); .map(|_| media_response(Body::empty()));
let get = relay_server.clone().and(warp::get2()) let get = channel.clone().and(warp::get2())
.map(|server: RelayServer| media_response(Body::wrap_stream(server.get_stream()))); .map(|channel| media_response(Body::wrap_stream(get_stream(channel))));
let post_put = relay_server.clone().and(warp::post2().or(warp::put2()).unify()) let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify())
.and(warp::body::stream()).map(|server: RelayServer, stream| { .and(warp::body::stream()).map(|channel, stream| {
println!("[Info] Source Connected"); println!("[Info] Source Connected");
Response::new(Body::wrap_stream(server.post_stream(stream))) Response::new(Body::wrap_stream(post_stream(channel, stream)))
}); });
let routes = head let routes = head
.or(get) .or(get)