diff --git a/src/channel.rs b/src/channel.rs index 76a9ec2..e52c4a5 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,22 +1,10 @@ use std::pin::Pin; -use std::task::{ - Context, - Poll -}; -use std::sync::{ - Arc, - Mutex -}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use futures::{ - channel::mpsc::{ - channel as mpsc_channel, - Sender, - Receiver - }, - Sink, + channel::mpsc::{channel as mpsc_channel, Receiver, Sender}, Stream, - never::Never, }; use odds::vec::VecExt; @@ -30,7 +18,7 @@ use crate::chunk::Chunk; pub struct Channel { pub name: String, header_chunk: Option, - listeners: Vec> + listeners: Vec>, } pub type Handle = Arc>; @@ -40,67 +28,39 @@ impl Channel { Arc::new(Mutex::new(Channel { name, header_chunk: None, - listeners: Vec::new() + listeners: Vec::new(), })) } } pub struct Transmitter { - channel: Handle + channel: Handle, } impl Transmitter { pub fn new(channel_arc: Handle) -> Self { Transmitter { - channel: channel_arc + channel: channel_arc, } } -} -impl Sink for Transmitter { - type Error = Never; // never errors, slow clients are simply dropped - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> { + pub fn send(&self, chunk: Chunk) { let mut channel = self.channel.lock().expect("Locking channel"); if let Chunk::Headers { .. } = chunk { channel.header_chunk = Some(chunk.clone()); } - channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); - - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut channel = self.channel.lock().expect("Locking channel"); - let mut result = Poll::Ready(Ok(())); - - // just disconnect any erroring listeners - channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) { - Poll::Pending => {result = Poll::Pending; true}, - Poll::Ready(Ok(())) => true, - Poll::Ready(Err(_)) => false, - }); - - result - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // don't actually disconnect listeners, since other sources may want to transmit to this channel; - // just ensure we've sent everything we can out - self.poll_flush(cx) + channel + .listeners + .retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); } } pub struct Listener { /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter _channel: Handle, - receiver: Receiver + receiver: Receiver, } impl Listener { @@ -111,7 +71,9 @@ impl Listener { let mut channel = channel_arc.lock().expect("Locking channel"); if let Some(ref chunk) = channel.header_chunk { - sender.start_send(chunk.clone()).expect("Queuing existing header chunk"); + sender + .start_send(chunk.clone()) + .expect("Queuing existing header chunk"); } channel.listeners.push(sender); @@ -119,7 +81,7 @@ impl Listener { Listener { _channel: channel_arc, - receiver: receiver + receiver: receiver, } } } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 5c5a53b..eacd3d1 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -8,7 +8,6 @@ use std::sync::{ use bytes::{Bytes, Buf}; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::{ - never::Never, prelude::*, Stream, stream::FuturesUnordered, @@ -53,23 +52,21 @@ fn get_stream(channel: Handle) -> impl Stream> + Unpin) -> impl Stream> { - let source = stream + let channel = Transmitter::new(channel); + stream .map_err(WebmetroError::from) .parse_ebml().with_soft_limit(BUFFER_LIMIT) - .chunk_webm().with_soft_limit(BUFFER_LIMIT); - let sink = Transmitter::new(channel); - - source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) - .into_stream() - .map_ok(|_| Bytes::new()) - .map_err(|err| { - warn!("{}", err); - err - }) + .chunk_webm().with_soft_limit(BUFFER_LIMIT) + .map_ok(move |chunk| { + channel.send(chunk); + Bytes::new() + }) + .inspect_err(|err| { + warn!("{}", err) + }) } fn media_response(body: Body) -> Response {