webmetro/src/channel.rs

113 lines
2.8 KiB
Rust
Raw Permalink Normal View History

2019-10-09 03:35:13 +00:00
use std::pin::Pin;
2020-05-09 04:05:49 +00:00
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::{
2020-05-09 04:05:49 +00:00
channel::mpsc::{channel as mpsc_channel, Receiver, Sender},
2019-10-09 03:35:13 +00:00
Stream,
};
2018-12-22 20:03:19 +00:00
use crate::chunk::Chunk;
/// A collection of listeners to a stream of WebM chunks.
/// Sending a chunk may fail due to a client being disconnected,
/// or simply failing to keep up with the stream buffer. In either
/// case, there's nothing practical the server can do to recover,
/// so the failing client is just dropped from the listener list.
pub struct Channel {
pub name: String,
header_chunk: Option<Chunk>,
2020-05-09 04:05:49 +00:00
listeners: Vec<Sender<Chunk>>,
}
2018-10-22 02:29:36 +00:00
pub type Handle = Arc<Mutex<Channel>>;
impl Channel {
pub fn new(name: String) -> Handle {
info!("Opening Channel {}", name);
Arc::new(Mutex::new(Channel {
name,
header_chunk: None,
2020-05-09 04:05:49 +00:00
listeners: Vec::new(),
}))
}
}
impl Drop for Channel {
fn drop(&mut self) {
info!("Closing Channel {}", self.name);
}
}
pub struct Transmitter {
2020-05-09 04:05:49 +00:00
channel: Handle,
}
impl Transmitter {
2018-10-22 02:29:36 +00:00
pub fn new(channel_arc: Handle) -> Self {
Transmitter {
2020-05-09 04:05:49 +00:00
channel: channel_arc,
}
}
2019-10-09 03:35:13 +00:00
2020-05-09 04:05:49 +00:00
pub fn send(&self, chunk: Chunk) {
let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk {
channel.header_chunk = Some(chunk.clone());
}
2020-05-09 04:05:49 +00:00
channel
.listeners
.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
}
}
impl Drop for Transmitter {
fn drop(&mut self) {
if let Ok(mut channel) = self.channel.lock() {
// when disconnecting, clean up the header chunk so subsequent
// clients don't get a potentially incorrect initialization segment
channel.header_chunk = None;
}
}
}
pub struct Listener {
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
2018-10-22 02:29:36 +00:00
_channel: Handle,
2020-05-09 04:05:49 +00:00
receiver: Receiver<Chunk>,
}
impl Listener {
2018-10-22 02:29:36 +00:00
pub fn new(channel_arc: Handle) -> Self {
let (mut sender, receiver) = mpsc_channel(5);
{
let mut channel = channel_arc.lock().expect("Locking channel");
if let Some(ref chunk) = channel.header_chunk {
2020-05-09 04:05:49 +00:00
sender
.start_send(chunk.clone())
.expect("Queuing existing header chunk");
}
channel.listeners.push(sender);
}
Listener {
_channel: channel_arc,
2020-05-09 04:05:49 +00:00
receiver: receiver,
}
}
}
impl Stream for Listener {
type Item = Chunk;
2019-10-09 03:35:13 +00:00
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Chunk>> {
let receiver = &mut self.get_mut().receiver;
Pin::new(receiver).poll_next(cx)
}
}