142 lines
4.2 KiB
Rust
142 lines
4.2 KiB
Rust
use std::net::ToSocketAddrs;
|
|
use std::sync::{Arc, Mutex, Weak};
|
|
use std::time::{SystemTime, UNIX_EPOCH, Duration};
|
|
|
|
use bytes::{Buf, Bytes};
|
|
use clap::Args;
|
|
use futures::{prelude::*, stream::FuturesUnordered, Stream};
|
|
use html_escape::encode_double_quoted_attribute;
|
|
use hyper::{
|
|
header::{CACHE_CONTROL, CONTENT_TYPE},
|
|
Body, Response,
|
|
};
|
|
use stream::iter;
|
|
use warp::reply::{html, with_header};
|
|
use warp::{self, path, Filter, Reply};
|
|
use weak_table::WeakValueHashMap;
|
|
use webmetro::{
|
|
channel::{Channel, Handle, Listener, Transmitter},
|
|
chunk::Chunk,
|
|
chunk::WebmStream,
|
|
error::WebmetroError,
|
|
fixers::{ChunkStream, ChunkTimecodeFixer},
|
|
stream_parser::StreamEbml,
|
|
};
|
|
|
|
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
|
|
|
fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
|
Listener::new(channel)
|
|
.map(|c| Result::<Chunk, WebmetroError>::Ok(c))
|
|
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
|
.find_starting_point()
|
|
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
|
|
.try_flatten()
|
|
}
|
|
|
|
fn post_stream(
|
|
channel: Handle,
|
|
stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
|
|
) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
|
let channel = Transmitter::new(channel);
|
|
stream
|
|
.map_err(WebmetroError::from)
|
|
.parse_ebml()
|
|
.with_soft_limit(BUFFER_LIMIT)
|
|
.chunk_webm()
|
|
.with_soft_limit(BUFFER_LIMIT)
|
|
.map_ok(move |chunk| {
|
|
channel.send(chunk);
|
|
Bytes::new()
|
|
})
|
|
.inspect_err(|err| warn!("{}", err))
|
|
}
|
|
|
|
fn media_response(body: Body) -> Response<Body> {
|
|
Response::builder()
|
|
.header(CONTENT_TYPE, "video/webm")
|
|
.header("X-Accel-Buffering", "no")
|
|
.header(CACHE_CONTROL, "no-cache, no-store")
|
|
.body(body)
|
|
.unwrap()
|
|
}
|
|
|
|
fn player_css() -> impl Reply {
|
|
let css = include_str!("../data/player.css");
|
|
with_header(css, CONTENT_TYPE, "text/css")
|
|
}
|
|
|
|
fn player_html(channel: impl AsRef<str>) -> impl Reply {
|
|
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos();
|
|
let player = format!(
|
|
include_str!("../data/player.html"),
|
|
channel = encode_double_quoted_attribute(channel.as_ref()),
|
|
cachebust = timestamp
|
|
);
|
|
html(player)
|
|
}
|
|
|
|
/// Hosts an HTTP-based relay server
|
|
#[derive(Args, Debug)]
|
|
pub struct RelayArgs {
|
|
/// The address:port to listen to
|
|
listen: String,
|
|
}
|
|
|
|
#[tokio::main]
|
|
pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
|
|
let channel_map = Arc::new(Mutex::new(
|
|
WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new(),
|
|
));
|
|
let addr_str = args.listen;
|
|
|
|
let addrs = addr_str.to_socket_addrs()?;
|
|
info!("Binding to {:?}", addrs);
|
|
if addrs.len() == 0 {
|
|
return Err("Listen address didn't resolve".into());
|
|
}
|
|
|
|
let channel = path!("live" / String).map(move |name: String| {
|
|
let channel = channel_map
|
|
.lock()
|
|
.unwrap()
|
|
.entry(name.clone())
|
|
.or_insert_with(|| Channel::new(name.clone()));
|
|
(channel, name)
|
|
});
|
|
|
|
let head = channel.clone().and(warp::head()).map(|(_, name)| {
|
|
info!("HEAD Request For Channel {}", name);
|
|
media_response(Body::empty())
|
|
});
|
|
|
|
let get = channel.clone().and(warp::get()).map(|(channel, name)| {
|
|
info!("Listener Connected On Channel {}", name);
|
|
media_response(Body::wrap_stream(get_stream(channel)))
|
|
});
|
|
|
|
let post_put = channel
|
|
.clone()
|
|
.and(warp::post().or(warp::put()).unify())
|
|
.and(warp::body::stream())
|
|
.map(|(channel, name), stream| {
|
|
info!("Source Connected On Channel {}", name);
|
|
Response::new(Body::wrap_stream(post_stream(channel, stream)))
|
|
});
|
|
|
|
let live = head.or(get).or(post_put);
|
|
let watch = path!("watch" / String).map(player_html);
|
|
let css = path!("static" / "css").map(player_css);
|
|
|
|
let routes = live.or(watch).or(css);
|
|
|
|
let mut server_futures: FuturesUnordered<_> = addrs
|
|
.map(|addr| warp::serve(routes.clone()).try_bind(addr))
|
|
.collect();
|
|
|
|
while let Some(_) = server_futures.next().await {}
|
|
|
|
Ok(())
|
|
}
|