diff --git a/src/channel.rs b/src/channel.rs index ed10c5f..9e901de 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,20 +1,23 @@ +use std::pin::Pin; +use std::task::{ + Context, + Poll +}; use std::sync::{ Arc, Mutex }; -use futures::{ - Async, - AsyncSink, - Sink, - Stream, - sync::mpsc::{ +use futures3::{ + channel::mpsc::{ channel as mpsc_channel, Sender, Receiver - } + }, + Sink, + Stream, + Never }; -use futures3::Never; use odds::vec::VecExt; use crate::chunk::Chunk; @@ -54,11 +57,14 @@ impl Transmitter { } } -impl Sink for Transmitter { - type SinkItem = Chunk; - type SinkError = Never; // never errors, slow clients are simply dropped +impl Sink for Transmitter { + type Error = Never; // never errors, slow clients are simply dropped - fn start_send(&mut self, chunk: Chunk) -> Result, Never> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> { let mut channel = self.channel.lock().expect("Locking channel"); if let Chunk::Headers { .. } = chunk { @@ -67,14 +73,34 @@ impl Sink for Transmitter { channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); - Ok(AsyncSink::Ready) + Ok(()) } - fn poll_complete(&mut self) -> Result, Never> { + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut channel = self.channel.lock().expect("Locking channel"); + let mut result = Poll::Ready(Ok(())); + + // just disconnect any erroring listeners + channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) { + Poll::Pending => {result = Poll::Pending; true}, + Poll::Ready(Ok(())) => true, + Poll::Ready(Err(_)) => false, + }); + + result + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut channel = self.channel.lock().expect("Locking channel"); - channel.listeners.retain_mut(|listener| listener.poll_complete().is_ok()); + // there's no useful error we can offer here, just give everything a chance to try closing + channel.listeners.retain_mut(|listener| Pin::new(listener).poll_close(cx).is_pending()); - Ok(Async::Ready(())) + return if channel.listeners.len() > 0 { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } } @@ -107,9 +133,9 @@ impl Listener { impl Stream for Listener { type Item = Chunk; - type Error = Never; // no transmitter errors are exposed to the listeners - fn poll(&mut self) -> Result>, Never> { - Ok(self.receiver.poll().expect("Channel receiving can't error")) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let receiver = &mut self.get_mut().receiver; + Pin::new(receiver).poll_next(cx) } } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index bae0a6e..302fe38 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -13,6 +13,14 @@ use futures::{ Sink, stream::empty }; +use futures3::{ + compat::{ + Compat, + CompatSink, + }, + Never, + StreamExt +}; use hyper::{ Body, Response, @@ -45,11 +53,11 @@ use webmetro::{ const BUFFER_LIMIT: usize = 2 * 1024 * 1024; fn get_stream(channel: Handle) -> impl Stream { - Listener::new(channel) + Compat::new(Listener::new(channel).map(|c| Ok(c))) .fix_timecodes() .find_starting_point() .map(|webm_chunk| webm_chunk.into_bytes()) - .map_err(|err| match err {}) + .map_err(|err: Never| match err {}) } fn post_stream(channel: Handle, stream: impl Stream) -> impl Stream { @@ -57,7 +65,7 @@ fn post_stream(channel: Handle, stream: impl Stream WebmetroError {match err {}})) .into_stream()