webmetro/src/commands/relay.rs

142 lines
4.2 KiB
Rust

use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, Weak};
use std::time::{SystemTime, UNIX_EPOCH, Duration};
use bytes::{Buf, Bytes};
use clap::Args;
use futures::{prelude::*, stream::FuturesUnordered, Stream};
use html_escape::encode_double_quoted_attribute;
use hyper::{
header::{CACHE_CONTROL, CONTENT_TYPE},
Body, Response,
};
use stream::iter;
use warp::reply::{html, with_header};
use warp::{self, path, Filter, Reply};
use weak_table::WeakValueHashMap;
use webmetro::{
channel::{Channel, Handle, Listener, Transmitter},
chunk::Chunk,
chunk::WebmStream,
error::WebmetroError,
fixers::{ChunkStream, ChunkTimecodeFixer},
stream_parser::StreamEbml,
};
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
let mut timecode_fixer = ChunkTimecodeFixer::new();
Listener::new(channel)
.map(|c| Result::<Chunk, WebmetroError>::Ok(c))
.map_ok(move |chunk| timecode_fixer.process(chunk))
.find_starting_point()
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
.try_flatten()
}
fn post_stream(
channel: Handle,
stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
let channel = Transmitter::new(channel);
stream
.map_err(WebmetroError::from)
.parse_ebml()
.with_soft_limit(BUFFER_LIMIT)
.chunk_webm()
.with_soft_limit(BUFFER_LIMIT)
.map_ok(move |chunk| {
channel.send(chunk);
Bytes::new()
})
.inspect_err(|err| warn!("{}", err))
}
fn media_response(body: Body) -> Response<Body> {
Response::builder()
.header(CONTENT_TYPE, "video/webm")
.header("X-Accel-Buffering", "no")
.header(CACHE_CONTROL, "no-cache, no-store")
.body(body)
.unwrap()
}
fn player_css() -> impl Reply {
let css = include_str!("../data/player.css");
with_header(css, CONTENT_TYPE, "text/css")
}
fn player_html(channel: impl AsRef<str>) -> impl Reply {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos();
let player = format!(
include_str!("../data/player.html"),
channel = encode_double_quoted_attribute(channel.as_ref()),
cachebust = timestamp
);
html(player)
}
/// Hosts an HTTP-based relay server
#[derive(Args, Debug)]
pub struct RelayArgs {
/// The address:port to listen to
listen: String,
}
#[tokio::main]
pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
let channel_map = Arc::new(Mutex::new(
WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new(),
));
let addr_str = args.listen;
let addrs = addr_str.to_socket_addrs()?;
info!("Binding to {:?}", addrs);
if addrs.len() == 0 {
return Err("Listen address didn't resolve".into());
}
let channel = path!("live" / String).map(move |name: String| {
let channel = channel_map
.lock()
.unwrap()
.entry(name.clone())
.or_insert_with(|| Channel::new(name.clone()));
(channel, name)
});
let head = channel.clone().and(warp::head()).map(|(_, name)| {
info!("HEAD Request For Channel {}", name);
media_response(Body::empty())
});
let get = channel.clone().and(warp::get()).map(|(channel, name)| {
info!("Listener Connected On Channel {}", name);
media_response(Body::wrap_stream(get_stream(channel)))
});
let post_put = channel
.clone()
.and(warp::post().or(warp::put()).unify())
.and(warp::body::stream())
.map(|(channel, name), stream| {
info!("Source Connected On Channel {}", name);
Response::new(Body::wrap_stream(post_stream(channel, stream)))
});
let live = head.or(get).or(post_put);
let watch = path!("watch" / String).map(player_html);
let css = path!("static" / "css").map(player_css);
let routes = live.or(watch).or(css);
let mut server_futures: FuturesUnordered<_> = addrs
.map(|addr| warp::serve(routes.clone()).try_bind(addr))
.collect();
while let Some(_) = server_futures.next().await {}
Ok(())
}