simplify channel streams

This commit is contained in:
Tangent Wantwight 2020-05-09 00:05:49 -04:00
parent 4b923ebed5
commit 5a6d1e764d
2 changed files with 26 additions and 67 deletions

View file

@ -1,22 +1,10 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{ use std::sync::{Arc, Mutex};
Context, use std::task::{Context, Poll};
Poll
};
use std::sync::{
Arc,
Mutex
};
use futures::{ use futures::{
channel::mpsc::{ channel::mpsc::{channel as mpsc_channel, Receiver, Sender},
channel as mpsc_channel,
Sender,
Receiver
},
Sink,
Stream, Stream,
never::Never,
}; };
use odds::vec::VecExt; use odds::vec::VecExt;
@ -30,7 +18,7 @@ use crate::chunk::Chunk;
pub struct Channel { pub struct Channel {
pub name: String, pub name: String,
header_chunk: Option<Chunk>, header_chunk: Option<Chunk>,
listeners: Vec<Sender<Chunk>> listeners: Vec<Sender<Chunk>>,
} }
pub type Handle = Arc<Mutex<Channel>>; pub type Handle = Arc<Mutex<Channel>>;
@ -40,67 +28,39 @@ impl Channel {
Arc::new(Mutex::new(Channel { Arc::new(Mutex::new(Channel {
name, name,
header_chunk: None, header_chunk: None,
listeners: Vec::new() listeners: Vec::new(),
})) }))
} }
} }
pub struct Transmitter { pub struct Transmitter {
channel: Handle channel: Handle,
} }
impl Transmitter { impl Transmitter {
pub fn new(channel_arc: Handle) -> Self { pub fn new(channel_arc: Handle) -> Self {
Transmitter { Transmitter {
channel: channel_arc channel: channel_arc,
}
} }
} }
impl Sink<Chunk> for Transmitter { pub fn send(&self, chunk: Chunk) {
type Error = Never; // never errors, slow clients are simply dropped
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk { if let Chunk::Headers { .. } = chunk {
channel.header_chunk = Some(chunk.clone()); channel.header_chunk = Some(chunk.clone());
} }
channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); channel
.listeners
Ok(()) .retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
let mut channel = self.channel.lock().expect("Locking channel");
let mut result = Poll::Ready(Ok(()));
// just disconnect any erroring listeners
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) {
Poll::Pending => {result = Poll::Pending; true},
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => false,
});
result
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
// don't actually disconnect listeners, since other sources may want to transmit to this channel;
// just ensure we've sent everything we can out
self.poll_flush(cx)
} }
} }
pub struct Listener { pub struct Listener {
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
_channel: Handle, _channel: Handle,
receiver: Receiver<Chunk> receiver: Receiver<Chunk>,
} }
impl Listener { impl Listener {
@ -111,7 +71,9 @@ impl Listener {
let mut channel = channel_arc.lock().expect("Locking channel"); let mut channel = channel_arc.lock().expect("Locking channel");
if let Some(ref chunk) = channel.header_chunk { if let Some(ref chunk) = channel.header_chunk {
sender.start_send(chunk.clone()).expect("Queuing existing header chunk"); sender
.start_send(chunk.clone())
.expect("Queuing existing header chunk");
} }
channel.listeners.push(sender); channel.listeners.push(sender);
@ -119,7 +81,7 @@ impl Listener {
Listener { Listener {
_channel: channel_arc, _channel: channel_arc,
receiver: receiver receiver: receiver,
} }
} }
} }

View file

@ -8,7 +8,6 @@ use std::sync::{
use bytes::{Bytes, Buf}; use bytes::{Bytes, Buf};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use futures::{ use futures::{
never::Never,
prelude::*, prelude::*,
Stream, Stream,
stream::FuturesUnordered, stream::FuturesUnordered,
@ -53,22 +52,20 @@ fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError
.map_ok(move |chunk| timecode_fixer.process(chunk)) .map_ok(move |chunk| timecode_fixer.process(chunk))
.find_starting_point() .find_starting_point()
.map_ok(|webm_chunk| webm_chunk.into_bytes()) .map_ok(|webm_chunk| webm_chunk.into_bytes())
.map_err(|err: Never| match err {})
} }
fn post_stream(channel: Handle, stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin) -> impl Stream<Item = Result<Bytes, WebmetroError>> { fn post_stream(channel: Handle, stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
let source = stream let channel = Transmitter::new(channel);
stream
.map_err(WebmetroError::from) .map_err(WebmetroError::from)
.parse_ebml().with_soft_limit(BUFFER_LIMIT) .parse_ebml().with_soft_limit(BUFFER_LIMIT)
.chunk_webm().with_soft_limit(BUFFER_LIMIT); .chunk_webm().with_soft_limit(BUFFER_LIMIT)
let sink = Transmitter::new(channel); .map_ok(move |chunk| {
channel.send(chunk);
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) Bytes::new()
.into_stream() })
.map_ok(|_| Bytes::new()) .inspect_err(|err| {
.map_err(|err| { warn!("{}", err)
warn!("{}", err);
err
}) })
} }