Convert channel.rs to futures-preview

This commit is contained in:
Tangent 128 2019-10-08 23:35:13 -04:00
parent 5cd28a6cdc
commit cfb56f1281
2 changed files with 56 additions and 22 deletions

View file

@ -1,20 +1,23 @@
use std::pin::Pin;
use std::task::{
Context,
Poll
};
use std::sync::{ use std::sync::{
Arc, Arc,
Mutex Mutex
}; };
use futures::{ use futures3::{
Async, channel::mpsc::{
AsyncSink,
Sink,
Stream,
sync::mpsc::{
channel as mpsc_channel, channel as mpsc_channel,
Sender, Sender,
Receiver Receiver
} },
Sink,
Stream,
Never
}; };
use futures3::Never;
use odds::vec::VecExt; use odds::vec::VecExt;
use crate::chunk::Chunk; use crate::chunk::Chunk;
@ -54,11 +57,14 @@ impl Transmitter {
} }
} }
impl Sink for Transmitter { impl Sink<Chunk> for Transmitter {
type SinkItem = Chunk; type Error = Never; // never errors, slow clients are simply dropped
type SinkError = Never; // never errors, slow clients are simply dropped
fn start_send(&mut self, chunk: Chunk) -> Result<AsyncSink<Chunk>, Never> { fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk { if let Chunk::Headers { .. } = chunk {
@ -67,14 +73,34 @@ impl Sink for Transmitter {
channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
Ok(AsyncSink::Ready) Ok(())
} }
fn poll_complete(&mut self) -> Result<Async<()>, Never> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
let mut channel = self.channel.lock().expect("Locking channel");
let mut result = Poll::Ready(Ok(()));
// just disconnect any erroring listeners
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) {
Poll::Pending => {result = Poll::Pending; true},
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => false,
});
result
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
channel.listeners.retain_mut(|listener| listener.poll_complete().is_ok()); // there's no useful error we can offer here, just give everything a chance to try closing
channel.listeners.retain_mut(|listener| Pin::new(listener).poll_close(cx).is_pending());
Ok(Async::Ready(())) return if channel.listeners.len() > 0 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
} }
} }
@ -107,9 +133,9 @@ impl Listener {
impl Stream for Listener { impl Stream for Listener {
type Item = Chunk; type Item = Chunk;
type Error = Never; // no transmitter errors are exposed to the listeners
fn poll(&mut self) -> Result<Async<Option<Chunk>>, Never> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Chunk>> {
Ok(self.receiver.poll().expect("Channel receiving can't error")) let receiver = &mut self.get_mut().receiver;
Pin::new(receiver).poll_next(cx)
} }
} }

View file

@ -13,6 +13,14 @@ use futures::{
Sink, Sink,
stream::empty stream::empty
}; };
use futures3::{
compat::{
Compat,
CompatSink,
},
Never,
StreamExt
};
use hyper::{ use hyper::{
Body, Body,
Response, Response,
@ -45,11 +53,11 @@ use webmetro::{
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
Listener::new(channel) Compat::new(Listener::new(channel).map(|c| Ok(c)))
.fix_timecodes() .fix_timecodes()
.find_starting_point() .find_starting_point()
.map(|webm_chunk| webm_chunk.into_bytes()) .map(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err| match err {}) .map_err(|err: Never| match err {})
} }
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
@ -57,7 +65,7 @@ fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = war
.map_err(WebmetroError::from) .map_err(WebmetroError::from)
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT); .chunk_webm().with_soft_limit(BUFFER_LIMIT);
let sink = Transmitter::new(channel); let sink = CompatSink::new(Transmitter::new(channel));
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.into_stream() .into_stream()