Compare commits

..

No commits in common. "master" and "chunk_refactor" have entirely different histories.

21 changed files with 1967 additions and 1293 deletions

View file

@ -1,17 +1,4 @@
## v0.3.1-dev
- Teach filter subcommand to recognize --skip and --take options
- MSRV now rustc 1.61
- forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs.
- Add INFO logging for channel creation/garbage-collection
- Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start)
- Teach send subcommand to recognize --skip and --take options
## v0.3.0
- update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1
## v0.2.2 ## v0.2.2
- use the `log` and `env_logger` crates for logging; the `RUST_LOG` environment variable configures the logging level.
- see [the env_logger documentation](https://docs.rs/env_logger/*/env_logger/) for more information
- support listening on multiple addresses if given a DNS name instead of an IP address. All bindings reference the same namespace for channels, but this allows, e.g., binding to both IPv4 and IPv6 `localhost`. - support listening on multiple addresses if given a DNS name instead of an IP address. All bindings reference the same namespace for channels, but this allows, e.g., binding to both IPv4 and IPv6 `localhost`.
- released November 20, 2019 - released November 20, 2019

2145
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,24 +1,25 @@
[package] [package]
name = "webmetro" name = "webmetro"
version = "0.3.1-dev" version = "0.2.3-dev"
authors = ["Tangent 128 <Tangent128@gmail.com>"] authors = ["Tangent 128 <Tangent128@gmail.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
byteorder = "1" bytes = "^0.4.12"
bytes = "1" clap = "^2.33.0"
clap = { version="^3.1.18", features=["cargo", "derive"] }
custom_error = "^1.7" custom_error = "^1.7"
env_logger = "^0.9" env_logger = "^0.7"
futures = "^0.3" futures = "0.1.29"
http = "^0.2" futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
html-escape = "0.2.13" http = "^0.1.18"
hyper = "^0.14" hyper = "^0.12.35"
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
log = "^0.4.8" log = "^0.4.8"
matches = "^0.1" matches = "^0.1.8"
pin-project = "1" odds = { version = "0.3.1", features = ["std-vec"] }
tokio = { version="^1.18", features = ["io-std", "macros", "net", "rt", "rt-multi-thread", "time"] } tokio = "0.1.22"
tokio-util = { version="^0.7", features=["codec"] } tokio2 = { package = "tokio", version="0.2.0-alpha.6" }
urlencoding = "2.1.2" tokio-codec = "0.1.1"
warp = "^0.3" tokio-io = "0.1.12"
weak-table = "^0.3" warp = "0.1.20"
weak-table = "^0.2.3"

View file

@ -1,8 +0,0 @@
#!/bin/sh
# this will probably work with docker just as well as podman
podman run --rm \
-v ..:/usr/src/build/ \
-v ./buster-target:/usr/src/build/webmetro/target \
-w /usr/src/build/webmetro \
rust:1-buster cargo build --release

View file

@ -1 +0,0 @@
*

View file

@ -1,11 +1,24 @@
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::task::{
use std::task::{Context, Poll}; Context,
Poll
use futures::{
channel::mpsc::{channel as mpsc_channel, Receiver, Sender},
Stream,
}; };
use std::sync::{
Arc,
Mutex
};
use futures3::{
channel::mpsc::{
channel as mpsc_channel,
Sender,
Receiver
},
Sink,
Stream,
Never
};
use odds::vec::VecExt;
use crate::chunk::Chunk; use crate::chunk::Chunk;
@ -17,66 +30,77 @@ use crate::chunk::Chunk;
pub struct Channel { pub struct Channel {
pub name: String, pub name: String,
header_chunk: Option<Chunk>, header_chunk: Option<Chunk>,
listeners: Vec<Sender<Chunk>>, listeners: Vec<Sender<Chunk>>
} }
pub type Handle = Arc<Mutex<Channel>>; pub type Handle = Arc<Mutex<Channel>>;
impl Channel { impl Channel {
pub fn new(name: String) -> Handle { pub fn new(name: String) -> Handle {
info!("Opening Channel {}", name);
Arc::new(Mutex::new(Channel { Arc::new(Mutex::new(Channel {
name, name,
header_chunk: None, header_chunk: None,
listeners: Vec::new(), listeners: Vec::new()
})) }))
} }
} }
impl Drop for Channel {
fn drop(&mut self) {
info!("Closing Channel {}", self.name);
}
}
pub struct Transmitter { pub struct Transmitter {
channel: Handle, channel: Handle
} }
impl Transmitter { impl Transmitter {
pub fn new(channel_arc: Handle) -> Self { pub fn new(channel_arc: Handle) -> Self {
Transmitter { Transmitter {
channel: channel_arc, channel: channel_arc
} }
} }
}
pub fn send(&self, chunk: Chunk) { impl Sink<Chunk> for Transmitter {
type Error = Never; // never errors, slow clients are simply dropped
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk { if let Chunk::Headers { .. } = chunk {
channel.header_chunk = Some(chunk.clone()); channel.header_chunk = Some(chunk.clone());
} }
channel channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
.listeners
.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
}
}
impl Drop for Transmitter { Ok(())
fn drop(&mut self) { }
if let Ok(mut channel) = self.channel.lock() {
// when disconnecting, clean up the header chunk so subsequent fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
// clients don't get a potentially incorrect initialization segment let mut channel = self.channel.lock().expect("Locking channel");
channel.header_chunk = None; let mut result = Poll::Ready(Ok(()));
}
// just disconnect any erroring listeners
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) {
Poll::Pending => {result = Poll::Pending; true},
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => false,
});
result
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
// don't actually disconnect listeners, since other sources may want to transmit to this channel;
// just ensure we've sent everything we can out
self.poll_flush(cx)
} }
} }
pub struct Listener { pub struct Listener {
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
_channel: Handle, _channel: Handle,
receiver: Receiver<Chunk>, receiver: Receiver<Chunk>
} }
impl Listener { impl Listener {
@ -87,9 +111,7 @@ impl Listener {
let mut channel = channel_arc.lock().expect("Locking channel"); let mut channel = channel_arc.lock().expect("Locking channel");
if let Some(ref chunk) = channel.header_chunk { if let Some(ref chunk) = channel.header_chunk {
sender sender.start_send(chunk.clone()).expect("Queuing existing header chunk");
.start_send(chunk.clone())
.expect("Queuing existing header chunk");
} }
channel.listeners.push(sender); channel.listeners.push(sender);
@ -97,7 +119,7 @@ impl Listener {
Listener { Listener {
_channel: channel_arc, _channel: channel_arc,
receiver: receiver, receiver: receiver
} }
} }
} }

View file

@ -1,23 +1,24 @@
use crate::error::WebmetroError; use bytes::{Buf, Bytes};
use crate::stream_parser::EbmlStreamingParser; use futures3::prelude::*;
use crate::webm::*;
use bytes::{Buf, Bytes, BytesMut};
use futures::prelude::*;
use std::{ use std::{
io::Cursor, io::Cursor,
mem, mem,
pin::Pin, pin::Pin,
task::{Context, Poll, Poll::*}, task::{Context, Poll, Poll::*},
}; };
use crate::stream_parser::EbmlStreamingParser;
use crate::error::WebmetroError;
use crate::webm::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ClusterHead { pub struct ClusterHead {
pub keyframe: bool, pub keyframe: bool,
pub start: u64, pub start: u64,
pub end: u64, pub end: u64,
/// a Cluster tag and a Timecode tag together take at most 15 bytes; /// space for a Cluster tag and a Timecode tag
/// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems /// TODO: consider using a BytesMut here for simplicity
bytes: BytesMut, bytes: [u8;16],
bytes_used: u8
} }
impl ClusterHead { impl ClusterHead {
@ -26,7 +27,8 @@ impl ClusterHead {
keyframe: false, keyframe: false,
start: 0, start: 0,
end: 0, end: 0,
bytes: BytesMut::with_capacity(15), bytes: [0;16],
bytes_used: 0
}; };
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
cluster_head cluster_head
@ -35,14 +37,11 @@ impl ClusterHead {
let delta = self.end - self.start; let delta = self.end - self.start;
self.start = timecode; self.start = timecode;
self.end = self.start + delta; self.end = self.start + delta;
let mut buffer = [0; 15]; let mut cursor = Cursor::new(self.bytes.as_mut());
let mut cursor = Cursor::new(buffer.as_mut());
// buffer is sized so these should never fail // buffer is sized so these should never fail
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
self.bytes.clear(); self.bytes_used = cursor.position() as u8;
let len = cursor.position() as usize;
self.bytes.extend_from_slice(&buffer[..len]);
} }
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
let absolute_timecode = self.start + (timecode as u64); let absolute_timecode = self.start + (timecode as u64);
@ -52,52 +51,41 @@ impl ClusterHead {
} }
} }
impl AsRef<[u8]> for ClusterHead {
fn as_ref(&self) -> &[u8] {
self.bytes[..self.bytes_used as usize].as_ref()
}
}
/// A chunk of WebM data /// A chunk of WebM data
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Chunk { pub enum Chunk {
Headers { bytes: Bytes }, Headers {
Cluster(ClusterHead, Bytes), bytes: Bytes
},
ClusterHead(ClusterHead),
ClusterBody {
bytes: Bytes
}
} }
impl Chunk { impl Chunk {
pub fn overlaps(&self, start: u128, stop: u128) -> bool { /// converts this chunk of data into a Bytes object, perhaps to send over the network
pub fn into_bytes(self) -> Bytes {
match self { match self {
Chunk::Cluster(head, _) => head.start as u128 <= stop && head.end as u128 >= start, Chunk::Headers {bytes, ..} => bytes,
_ => true, Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()),
Chunk::ClusterBody {bytes, ..} => bytes
} }
} }
} }
impl IntoIterator for Chunk { impl AsRef<[u8]> for Chunk {
type Item = Bytes; fn as_ref(&self) -> &[u8] {
type IntoIter = Iter;
fn into_iter(self) -> Self::IntoIter {
match self { match self {
Chunk::Headers { bytes } => Iter::Buffer(bytes), &Chunk::Headers {ref bytes, ..} => bytes.as_ref(),
Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes), &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(),
} &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref()
}
}
pub enum Iter {
Cluster(ClusterHead, Bytes),
Buffer(Bytes),
Empty,
}
impl Iterator for Iter {
type Item = Bytes;
fn next(&mut self) -> Option<Bytes> {
let iter = mem::replace(self, Iter::Empty);
match iter {
Iter::Cluster(ClusterHead { bytes: head, .. }, body) => {
*self = Iter::Buffer(body);
Some(head.freeze())
}
Iter::Buffer(bytes) => Some(bytes),
Iter::Empty => None,
} }
} }
} }
@ -107,13 +95,14 @@ enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// ClusterHead & body buffer // ClusterHead & body buffer
BuildingCluster(ClusterHead, Cursor<Vec<u8>>), BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
End, End
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: EbmlStreamingParser<S>, source: EbmlStreamingParser<S>,
buffer_size_limit: Option<usize>, buffer_size_limit: Option<usize>,
state: ChunkerState, state: ChunkerState,
pending_chunk: Option<Chunk>,
} }
impl<S> WebmChunker<S> { impl<S> WebmChunker<S> {
@ -126,11 +115,7 @@ impl<S> WebmChunker<S> {
} }
} }
fn encode( fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usize>) -> Result<(), WebmetroError> {
element: WebmElement,
buffer: &mut Cursor<Vec<u8>>,
limit: Option<usize>,
) -> Result<(), WebmetroError> {
if let Some(limit) = limit { if let Some(limit) = limit {
if limit <= buffer.get_ref().len() { if limit <= buffer.get_ref().len() {
return Err(WebmetroError::ResourcesExceeded); return Err(WebmetroError::ResourcesExceeded);
@ -140,17 +125,15 @@ fn encode(
encode_webm_element(element, buffer).map_err(|err| err.into()) encode_webm_element(element, buffer).map_err(|err| err.into())
} }
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S> impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
where
WebmetroError: From<E>,
{ {
type Item = Result<Chunk, WebmetroError>; type Item = Result<Chunk, WebmetroError>;
fn poll_next( fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut(); let mut chunker = self.get_mut();
if chunker.pending_chunk.is_some() {
return Ready(chunker.pending_chunk.take().map(Ok));
}
loop { loop {
match chunker.state { match chunker.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
@ -160,117 +143,92 @@ where
Ready(None) => return Ready(None), Ready(None) => return Ready(None),
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_buffer = let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
mem::replace(buffer, Cursor::new(Vec::new())); let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
let header_chunk = Chunk::Headers {
bytes: Bytes::from(liberated_buffer.into_inner()),
};
chunker.state = ChunkerState::BuildingCluster( chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()), Cursor::new(Vec::new())
); );
return Ready(Some(Ok(header_chunk))); return Ready(Some(Ok(header_chunk)));
} },
WebmElement::Info => {} WebmElement::Info => {},
WebmElement::Void => {} WebmElement::Void => {},
WebmElement::Unknown(_) => {} WebmElement::Unknown(_) => {},
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
}, }
} }
} },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match chunker.source.poll_event(cx) { match chunker.source.poll_event(cx) {
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Pending => return Pending, Pending => return Pending,
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::EbmlHead | WebmElement::Segment => { WebmElement::EbmlHead | WebmElement::Segment => {
let liberated_cluster_head = let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode( match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
element,
&mut new_header_cursor,
chunker.buffer_size_limit,
) {
Ok(_) => { Ok(_) => {
chunker.state = chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
ChunkerState::BuildingHeader(new_header_cursor); chunker.state = ChunkerState::BuildingHeader(new_header_cursor);
return Ready(Some(Ok(Chunk::Cluster( return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
liberated_cluster_head, },
Bytes::from(liberated_buffer.into_inner()),
))));
}
Err(err) => { Err(err) => {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
} },
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_cluster_head = let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
return Ready(Some(Ok(Chunk::Cluster( chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
liberated_cluster_head, return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
Bytes::from(liberated_buffer.into_inner()), },
))));
}
WebmElement::Timecode(timecode) => { WebmElement::Timecode(timecode) => {
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
} },
WebmElement::SimpleBlock(ref block) => { WebmElement::SimpleBlock(ref block) => {
if (block.flags & 0b10000000) != 0 { if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
if let Err(err) = encode( if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) {
WebmElement::SimpleBlock(*block),
buffer,
chunker.buffer_size_limit,
) {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} },
WebmElement::Info => {} WebmElement::Info => {},
WebmElement::Void => {} WebmElement::Void => {},
WebmElement::Unknown(_) => {} WebmElement::Unknown(_) => {},
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} },
}, },
Ready(None) => { Ready(None) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_cluster_head = let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::Cluster( return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
} }
} }
} },
ChunkerState::End => return Ready(None), ChunkerState::End => return Ready(None)
}; };
} }
} }
@ -288,6 +246,7 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
source: self, source: self,
buffer_size_limit: None, buffer_size_limit: None,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
pending_chunk: None
} }
} }
} }

View file

@ -1,27 +1,35 @@
use clap::Args; use clap::{App, AppSettings, ArgMatches, SubCommand};
use tokio2::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
error::WebmetroError, error::WebmetroError,
stream_parser::StreamEbml, stream_parser::StreamEbml,
webm::{SimpleBlock, WebmElement::*}, webm::{
SimpleBlock,
WebmElement::*
}
}; };
/// Dumps WebM parsing events from parsing stdin pub fn options() -> App<'static, 'static> {
#[derive(Args, Debug)] SubCommand::with_name("dump")
pub struct DumpArgs; .setting(AppSettings::Hidden)
.about("Dumps WebM parsing events from parsing stdin")
}
pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
#[tokio::main]
pub async fn run(_args: DumpArgs) -> Result<(), WebmetroError> {
let mut events = stdin_stream().parse_ebml(); let mut events = stdin_stream().parse_ebml();
while let Some(element) = events.next().await? { Runtime::new().unwrap().block_on(async {
match element { while let Some(element) = events.next().await? {
// suppress printing byte arrays match element {
Tracks(slice) => println!("Tracks[{}]", slice.len()), // suppress printing byte arrays
SimpleBlock(SimpleBlock { timecode, .. }) => println!("SimpleBlock@{}", timecode), Tracks(slice) => println!("Tracks[{}]", slice.len()),
other => println!("{:?}", other), SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
other => println!("{:?}", other)
}
} }
} Ok(())
Ok(()) })
} }

View file

@ -1,53 +1,49 @@
use std::{io, io::prelude::*, pin::Pin, time::Duration}; use std::{
io,
use clap::Args; io::prelude::*
use futures::prelude::*;
use super::{parse_time, stdin_stream};
use webmetro::{
chunk::{Chunk, WebmStream},
error::WebmetroError,
fixers::{ChunkTimecodeFixer, Throttle},
stream_parser::StreamEbml,
}; };
/// Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does. use clap::{App, Arg, ArgMatches, SubCommand};
#[derive(Args, Debug)] use futures3::prelude::*;
pub struct FilterArgs { use futures3::future::ready;
/// Slow down output to "real time" speed as determined by the timestamps (useful for streaming static files) use tokio2::runtime::Runtime;
#[clap(long)]
throttle: bool, use super::stdin_stream;
/// Skip approximately n seconds of content before uploading or throttling use webmetro::{
#[clap(long, short, parse(try_from_str = parse_time))] chunk::{
skip: Option<Duration>, Chunk,
/// Stop uploading after approximately n seconds of content WebmStream
#[clap(long, short, parse(try_from_str = parse_time))] },
take: Option<Duration>, error::WebmetroError,
fixers::{
ChunkTimecodeFixer,
Throttle,
},
stream_parser::StreamEbml
};
pub fn options() -> App<'static, 'static> {
SubCommand::with_name("filter")
.about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.")
.arg(Arg::with_name("throttle")
.long("throttle")
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
} }
#[tokio::main] pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> {
let start_time = args.skip.map_or(0, |s| s.as_millis());
let stop_time = args
.take
.map_or(std::u128::MAX, |t| t.as_millis() + start_time);
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send>> = let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
Box::pin( stdin_stream()
stdin_stream() .parse_ebml()
.parse_ebml() .chunk_webm()
.chunk_webm() .map_ok(move |chunk| timecode_fixer.process(chunk))
.map_ok(move |chunk| timecode_fixer.process(chunk)) );
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
);
if args.throttle { if args.is_present("throttle") {
chunk_stream = Box::pin(Throttle::new(chunk_stream)); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
while let Some(chunk) = chunk_stream.next().await { Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| {
chunk?.into_iter().try_for_each(|buffer| io::stdout().write_all(&buffer))?; ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from))
} }))
Ok(())
} }

View file

@ -1,8 +1,7 @@
use std::time::Duration; use std::io::Cursor;
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, TryStreamExt}; use futures3::TryStreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};
use webmetro::error::WebmetroError; use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
@ -13,15 +12,13 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> impl Stream<Item = Result<Bytes, std::io::Error>> + Sized + Unpin { pub fn stdin_stream() -> impl futures3::TryStream<
FramedRead::new(tokio::io::stdin(), BytesCodec::new()).map_ok(|bytes| bytes.freeze()) Item = Result<Cursor<Bytes>, WebmetroError>,
} Ok = Cursor<Bytes>,
Error = WebmetroError,
pub fn parse_time(arg: &str) -> Result<Duration, WebmetroError> { > + Sized
match arg.parse() { + Unpin {
Ok(secs) => Ok(Duration::from_secs(secs)), tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
Err(err) => Err(WebmetroError::ApplicationError { .map_ok(|bytes| Cursor::new(bytes.freeze()))
message: err.to_string(), .map_err(WebmetroError::from)
}),
}
} }

View file

@ -1,56 +1,85 @@
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, Weak}; use std::sync::{
use std::time::{SystemTime, UNIX_EPOCH, Duration}; Arc,
Mutex,
use bytes::{Buf, Bytes}; Weak
use clap::Args; };
use futures::{prelude::*, stream::FuturesUnordered, Stream};
use html_escape::encode_double_quoted_attribute; use bytes::{Bytes, Buf};
use hyper::{ use clap::{App, Arg, ArgMatches, SubCommand};
header::{CACHE_CONTROL, CONTENT_TYPE}, use futures::{
Body, Response, Future,
Stream,
Sink,
stream::empty
};
use futures3::{
compat::{
Compat,
CompatSink,
Compat01As03,
},
Never,
prelude::*,
};
use hyper::{
Body,
Response,
header::{
CACHE_CONTROL,
CONTENT_TYPE
}
};
use warp::{
self,
Filter,
path
};
use weak_table::{
WeakValueHashMap
}; };
use stream::iter;
use warp::reply::{html, with_header};
use warp::{self, path, Filter, Reply};
use weak_table::WeakValueHashMap;
use webmetro::{ use webmetro::{
channel::{Channel, Handle, Listener, Transmitter}, channel::{
chunk::Chunk, Channel,
Handle,
Listener,
Transmitter
},
chunk::WebmStream, chunk::WebmStream,
error::WebmetroError, error::WebmetroError,
fixers::{ChunkStream, ChunkTimecodeFixer}, fixers::{
stream_parser::StreamEbml, ChunkStream,
ChunkTimecodeFixer,
},
stream_parser::StreamEbml
}; };
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> { fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
Listener::new(channel) Compat::new(Listener::new(channel).map(|c| Ok(c))
.map(|c| Result::<Chunk, WebmetroError>::Ok(c)) .map_ok(move |chunk| timecode_fixer.process(chunk))
.map_ok(move |chunk| timecode_fixer.process(chunk)) .find_starting_point()
.find_starting_point() .map_ok(|webm_chunk| webm_chunk.into_bytes())
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok)) .map_err(|err: Never| match err {}))
.try_flatten()
} }
fn post_stream( fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
channel: Handle, let source = Compat01As03::new(stream
stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin, .map_err(WebmetroError::from))
) -> impl Stream<Item = Result<Bytes, WebmetroError>> { .parse_ebml().with_soft_limit(BUFFER_LIMIT)
let channel = Transmitter::new(channel); .chunk_webm().with_soft_limit(BUFFER_LIMIT);
stream let sink = CompatSink::new(Transmitter::new(channel));
.map_err(WebmetroError::from)
.parse_ebml() Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.with_soft_limit(BUFFER_LIMIT) .into_stream()
.chunk_webm() .map(|_| empty())
.with_soft_limit(BUFFER_LIMIT) .map_err(|err| {
.map_ok(move |chunk| { warn!("{}", err);
channel.send(chunk); err
Bytes::new() })
}) .flatten()
.inspect_err(|err| warn!("{}", err))
} }
fn media_response(body: Body) -> Response<Body> { fn media_response(body: Body) -> Response<Body> {
@ -62,34 +91,17 @@ fn media_response(body: Body) -> Response<Body> {
.unwrap() .unwrap()
} }
fn player_css() -> impl Reply { pub fn options() -> App<'static, 'static> {
let css = include_str!("../data/player.css"); SubCommand::with_name("relay")
with_header(css, CONTENT_TYPE, "text/css") .about("Hosts an HTTP-based relay server")
.arg(Arg::with_name("listen")
.help("The address:port to listen to")
.required(true))
} }
fn player_html(channel: impl AsRef<str>) -> impl Reply { pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos(); let channel_map = Arc::new(Mutex::new(WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new()));
let player = format!( let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
include_str!("../data/player.html"),
channel = encode_double_quoted_attribute(channel.as_ref()),
cachebust = timestamp
);
html(player)
}
/// Hosts an HTTP-based relay server
#[derive(Args, Debug)]
pub struct RelayArgs {
/// The address:port to listen to
listen: String,
}
#[tokio::main]
pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
let channel_map = Arc::new(Mutex::new(
WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new(),
));
let addr_str = args.listen;
let addrs = addr_str.to_socket_addrs()?; let addrs = addr_str.to_socket_addrs()?;
info!("Binding to {:?}", addrs); info!("Binding to {:?}", addrs);
@ -98,44 +110,39 @@ pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
} }
let channel = path!("live" / String).map(move |name: String| { let channel = path!("live" / String).map(move |name: String| {
let channel = channel_map let channel = channel_map.lock().unwrap()
.lock()
.unwrap()
.entry(name.clone()) .entry(name.clone())
.or_insert_with(|| Channel::new(name.clone())); .or_insert_with(|| Channel::new(name.clone()));
(channel, name) (channel, name)
}); });
let head = channel.clone().and(warp::head()).map(|(_, name)| { let head = channel.clone().and(warp::head())
info!("HEAD Request For Channel {}", name); .map(|(_, name)| {
media_response(Body::empty()) info!("HEAD Request For Channel {}", name);
}); media_response(Body::empty())
});
let get = channel.clone().and(warp::get()).map(|(channel, name)| { let get = channel.clone().and(warp::get2())
info!("Listener Connected On Channel {}", name); .map(|(channel, name)| {
media_response(Body::wrap_stream(get_stream(channel))) info!("Listener Connected On Channel {}", name);
}); media_response(Body::wrap_stream(get_stream(channel)))
});
let post_put = channel let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify())
.clone() .and(warp::body::stream()).map(|(channel, name), stream| {
.and(warp::post().or(warp::put()).unify())
.and(warp::body::stream())
.map(|(channel, name), stream| {
info!("Source Connected On Channel {}", name); info!("Source Connected On Channel {}", name);
Response::new(Body::wrap_stream(post_stream(channel, stream))) Response::new(Body::wrap_stream(post_stream(channel, stream)))
}); });
let live = head.or(get).or(post_put); let routes = head
let watch = path!("watch" / String).map(player_html); .or(get)
let css = path!("static" / "css").map(player_css); .or(post_put);
let routes = live.or(watch).or(css); let mut rt = tokio::runtime::Runtime::new()?;
let mut server_futures: FuturesUnordered<_> = addrs for do_serve in addrs.map(|addr| warp::serve(routes.clone()).try_bind(addr)) {
.map(|addr| warp::serve(routes.clone()).try_bind(addr)) rt.spawn(do_serve);
.collect(); }
while let Some(_) = server_futures.next().await {} rt.shutdown_on_idle().wait().map_err(|_| "Shutdown error.".into())
Ok(())
} }

View file

@ -1,15 +1,10 @@
use bytes::Bytes; use clap::{App, Arg, ArgMatches, SubCommand};
use clap::Args; use futures3::prelude::*;
use futures::prelude::*; use hyper13::{client::HttpConnector, Body, Client, Request};
use hyper::{client::HttpConnector, Body, Client, Request}; use std::io::{stdout, Write};
use std::{ use tokio2::runtime::Runtime;
io::{stdout, Write},
pin::Pin,
time::Duration,
};
use stream::iter;
use super::{parse_time, stdin_stream}; use super::stdin_stream;
use webmetro::{ use webmetro::{
chunk::{Chunk, WebmStream}, chunk::{Chunk, WebmStream},
error::WebmetroError, error::WebmetroError,
@ -17,48 +12,44 @@ use webmetro::{
stream_parser::StreamEbml, stream_parser::StreamEbml,
}; };
type BoxedChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync>>; pub fn options() -> App<'static, 'static> {
SubCommand::with_name("send")
/// PUTs WebM from stdin to a relay server. .about("PUTs WebM from stdin to a relay server.")
#[derive(Args, Debug)] .arg(Arg::with_name("url")
pub struct SendArgs { .help("The location to upload to")
/// The location to upload to .required(true))
url: String, .arg(Arg::with_name("throttle")
/// Slow down upload to "real time" speed as determined by the timestamps (useful for streaming static files) .long("throttle")
#[clap(long)] .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
throttle: bool,
/// Skip approximately n seconds of content before uploading or throttling
#[clap(long, short, parse(try_from_str = parse_time))]
skip: Option<Duration>,
/// Stop uploading after approximately n seconds of content
#[clap(long, short, parse(try_from_str = parse_time))]
take: Option<Duration>,
} }
#[tokio::main] type BoxedChunkStream = Box<
pub async fn run(args: SendArgs) -> Result<(), WebmetroError> { dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
let start_time = args.skip.map_or(0, |s| s.as_millis()); + Send
let stop_time = args + Sync
.take + Unpin,
.map_or(std::u128::MAX, |t| t.as_millis() + start_time); >;
// build pipeline pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: BoxedChunkStream = Box::pin( let mut chunk_stream: BoxedChunkStream = Box::new(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.map_ok(move |chunk| timecode_fixer.process(chunk)) .map_ok(move |chunk| timecode_fixer.process(chunk)),
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
); );
if args.throttle { let url_str = match args.value_of("url") {
chunk_stream = Box::pin(Throttle::new(chunk_stream)); Some(url) => String::from(url),
_ => return Err("Listen address wasn't provided".into()),
};
if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
let chunk_stream = chunk_stream let chunk_stream = chunk_stream
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok)) .map_ok(|webm_chunk| webm_chunk.into_bytes())
.try_flatten()
.map_err(|err| { .map_err(|err| {
warn!("{}", &err); warn!("{}", &err);
err err
@ -66,13 +57,15 @@ pub async fn run(args: SendArgs) -> Result<(), WebmetroError> {
let request_payload = Body::wrap_stream(chunk_stream); let request_payload = Body::wrap_stream(chunk_stream);
let request = Request::put(args.url).body(request_payload)?; let request = Request::put(url_str).body(request_payload)?;
let client = Client::builder().build(HttpConnector::new()); let client = Client::builder().build(HttpConnector::new());
let response = client.request(request).await?; Runtime::new().unwrap().block_on(async {
let mut response_stream = response.into_body(); let response = client.request(request).await?;
while let Some(response_chunk) = response_stream.try_next().await? { let mut response_stream = response.into_body();
stdout().write_all(&response_chunk)?; while let Some(response_chunk) = response_stream.next().await.transpose()? {
} stdout().write_all(&response_chunk)?;
Ok(()) }
Ok(())
})
} }

View file

@ -1,20 +0,0 @@
body {
display: flex;
flex-flow: column;
align-items: center;
margin: 0;
padding: 0;
background: #111;
color: #777;
font-size: 16px;
line-height: 16px;
font-family: 'Gill Sans', 'Gill Sans MT', Calibri, 'Trebuchet MS', sans-serif;
}
section {
display: flex;
flex-flow: column;
}
video {
max-width: 100vw;
}

View file

@ -1,15 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Watchog Stream</title>
<meta name="viewport" content="initial-scale=1">
<link rel="stylesheet" href="../static/css" />
</head>
<body>
<section>
<video controls autoplay src="../live/{channel}?t={cachebust}"></video>
<p>The stream should begin automatically when ready;
if the video stutters, try pausing it for a second or two to allow a small buffer.</p>
</section>
</body>
</html>

View file

@ -1,7 +1,6 @@
use byteorder::{BigEndian, ByteOrder}; use bytes::{BigEndian, ByteOrder, BufMut};
use bytes::{BufMut};
use custom_error::custom_error; use custom_error::custom_error;
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3; pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
pub const DOC_TYPE_ID: u64 = 0x0282; pub const DOC_TYPE_ID: u64 = 0x0282;
@ -136,10 +135,10 @@ pub fn encode_varint<T: Write>(varint: Varint, output: &mut T) -> IoResult<()> {
} }
}; };
let mut buffer = [0; 8]; let mut buffer = Cursor::new([0; 8]);
buffer.as_mut().put_uint(number, size); buffer.put_uint_be(number, size);
return output.write_all(&buffer[..size]); return output.write_all(&buffer.get_ref()[..size]);
} }
const FOUR_FLAG: u64 = 0x10 << (8*3); const FOUR_FLAG: u64 = 0x10 << (8*3);
@ -155,10 +154,10 @@ pub fn encode_varint_4<T: Write>(varint: Varint, output: &mut T) -> IoResult<()>
Varint::Value(value) => FOUR_FLAG | value Varint::Value(value) => FOUR_FLAG | value
}; };
let mut buffer = [0; 4]; let mut buffer = Cursor::new([0; 4]);
buffer.as_mut().put_u32(number as u32); buffer.put_u32_be(number as u32);
output.write_all(&buffer) output.write_all(&buffer.get_ref()[..])
} }
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> { pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
@ -191,10 +190,10 @@ pub fn encode_bytes<T: Write>(tag: u64, bytes: &[u8], output: &mut T) -> IoResul
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> { pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
encode_tag_header(tag, Varint::Value(8), output)?; encode_tag_header(tag, Varint::Value(8), output)?;
let mut buffer = [0; 8]; let mut buffer = Cursor::new([0; 8]);
buffer.as_mut().put_u64(value); buffer.put_u64_be(value);
output.write_all(&buffer[..]) output.write_all(&buffer.get_ref()[..])
} }
pub struct EbmlLayout { pub struct EbmlLayout {
@ -263,10 +262,11 @@ pub trait FromEbml<'a>: Sized {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::BytesMut; use bytes::{BytesMut};
use crate::ebml::*; use crate::ebml::*;
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId}; use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
use crate::ebml::Varint::{Unknown, Value}; use crate::ebml::Varint::{Unknown, Value};
use std::io::Cursor;
use crate::tests::TEST_FILE; use crate::tests::TEST_FILE;
#[test] #[test]
@ -298,48 +298,44 @@ mod tests {
fn encode_varints() { fn encode_varints() {
let mut buffer = BytesMut::with_capacity(10).writer(); let mut buffer = BytesMut::with_capacity(10).writer();
let mut no_space = [0; 0]; let mut no_space = Cursor::new([0; 0]).writer();
let mut no_space_writer = no_space.as_mut().writer(); assert_eq!(no_space.get_ref().remaining_mut(), 0);
assert_eq!(no_space_writer.get_mut().remaining_mut(), 0);
let mut six_buffer = [0; 6]; let mut six_buffer = Cursor::new([0; 6]).writer();
let mut six_buffer_writer = six_buffer.as_mut().writer(); assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
// 1 byte // 1 byte
encode_varint(Varint::Unknown, &mut buffer).unwrap(); encode_varint(Varint::Unknown, &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
assert_eq!(encode_varint(Varint::Unknown, &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Unknown, &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(0), &mut buffer).unwrap(); encode_varint(Varint::Value(0), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
assert_eq!(encode_varint(Varint::Value(0), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(1), &mut buffer).unwrap(); encode_varint(Varint::Value(1), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
assert_eq!(encode_varint(Varint::Value(1), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(1), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(126), &mut buffer).unwrap(); encode_varint(Varint::Value(126), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
assert_eq!(encode_varint(Varint::Value(126), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(126), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
// 2 bytes // 2 bytes
encode_varint(Varint::Value(127), &mut buffer).unwrap(); encode_varint(Varint::Value(127), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref()); assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
assert_eq!(encode_varint(Varint::Value(127), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(127), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(128), &mut buffer).unwrap(); encode_varint(Varint::Value(128), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref()); assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
assert_eq!(encode_varint(Varint::Value(128), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(128), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
// 6 bytes // 6 bytes
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6); assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer_writer).unwrap(); encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer).unwrap();
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 0); assert_eq!(six_buffer.get_ref().remaining_mut(), 0);
assert_eq!(&six_buffer, &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); assert_eq!(&six_buffer.get_ref().get_ref(), &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
six_buffer = Cursor::new([0; 6]).writer();
let mut six_buffer = [0; 6];
let mut six_buffer_writer = six_buffer.as_mut().writer();
// 7 bytes // 7 bytes
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap(); encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
@ -351,8 +347,8 @@ mod tests {
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap(); encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer_writer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer).unwrap_err().kind(), ErrorKind::WriteZero);
// 8 bytes // 8 bytes
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap(); encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();

View file

@ -6,6 +6,7 @@ custom_error!{pub WebmetroError
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}", EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
HttpError{source: http::Error} = "HTTP error: {source}", HttpError{source: http::Error} = "HTTP error: {source}",
HyperError{source: hyper::Error} = "Hyper error: {source}", HyperError{source: hyper::Error} = "Hyper error: {source}",
Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
IoError{source: std::io::Error} = "IO error: {source}", IoError{source: std::io::Error} = "IO error: {source}",
WarpError{source: warp::Error} = "Warp error: {source}", WarpError{source: warp::Error} = "Warp error: {source}",
ApplicationError{message: String} = "{message}" ApplicationError{message: String} = "{message}"

View file

@ -1,16 +1,23 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{
Context,
Poll
};
use std::time::{Duration, Instant};
use futures::prelude::*; use futures3::prelude::*;
use pin_project::pin_project; use tokio2::timer::{
use tokio::time::{sleep_until, Duration, Instant, Sleep}; delay,
Delay
};
use crate::chunk::Chunk; use crate::chunk::Chunk;
use crate::error::WebmetroError;
pub struct ChunkTimecodeFixer { pub struct ChunkTimecodeFixer {
current_offset: u64, current_offset: u64,
last_observed_timecode: u64, last_observed_timecode: u64,
assumed_duration: u64, assumed_duration: u64
} }
impl ChunkTimecodeFixer { impl ChunkTimecodeFixer {
@ -18,12 +25,12 @@ impl ChunkTimecodeFixer {
ChunkTimecodeFixer { ChunkTimecodeFixer {
current_offset: 0, current_offset: 0,
last_observed_timecode: 0, last_observed_timecode: 0,
assumed_duration: 33, assumed_duration: 33
} }
} }
pub fn process(&mut self, mut chunk: Chunk) -> Chunk { pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk {
match chunk { match chunk {
Chunk::Cluster(ref mut cluster_head, _) => { Chunk::ClusterHead(ref mut cluster_head) => {
let start = cluster_head.start; let start = cluster_head.start;
if start < self.last_observed_timecode { if start < self.last_observed_timecode {
let next_timecode = self.last_observed_timecode + self.assumed_duration; let next_timecode = self.last_observed_timecode + self.assumed_duration;
@ -42,30 +49,35 @@ impl ChunkTimecodeFixer {
pub struct StartingPointFinder<S> { pub struct StartingPointFinder<S> {
stream: S, stream: S,
seen_header: bool, seen_header: bool,
seen_keyframe: bool, seen_keyframe: bool
} }
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> { impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
{
type Item = Result<Chunk, S::Error>; type Item = Result<Chunk, S::Error>;
fn poll_next( fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
loop { loop {
return match self.stream.try_poll_next_unpin(cx) { return match self.stream.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => { Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => {
if cluster_head.keyframe { if cluster_head.keyframe {
self.seen_keyframe = true; self.seen_keyframe = true;
} }
if self.seen_keyframe { if self.seen_keyframe {
Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head))))
} else { } else {
continue; continue;
} }
} },
chunk @ Poll::Ready(Some(Ok(Chunk::Headers { .. }))) => { chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => {
if self.seen_keyframe {
chunk
} else {
continue;
}
},
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
if self.seen_header { if self.seen_header {
// new stream starting, we don't need a new header but should wait for a safe spot to resume // new stream starting, we don't need a new header but should wait for a safe spot to resume
self.seen_keyframe = false; self.seen_keyframe = false;
@ -74,20 +86,17 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> {
self.seen_header = true; self.seen_header = true;
chunk chunk
} }
} },
chunk => chunk, chunk => chunk
}; }
} };
} }
} }
#[pin_project]
pub struct Throttle<S> { pub struct Throttle<S> {
#[pin]
stream: S, stream: S,
start_time: Option<Instant>, start_time: Instant,
#[pin] sleep: Delay
sleep: Sleep,
} }
impl<S> Throttle<S> { impl<S> Throttle<S> {
@ -95,46 +104,34 @@ impl<S> Throttle<S> {
let now = Instant::now(); let now = Instant::now();
Throttle { Throttle {
stream: wrap, stream: wrap,
start_time: None, start_time: now,
sleep: sleep_until(now), sleep: delay(now)
} }
} }
} }
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S> { impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S>
type Item = Result<Chunk, S::Error>; {
type Item = Result<Chunk, WebmetroError>;
fn poll_next( fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
self: Pin<&mut Self>, match self.sleep.poll_unpin(cx) {
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
let mut this = self.project();
match this.sleep.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(()) => { /* can continue */ } Poll::Ready(()) => { /* can continue */ },
} }
let next_chunk = this.stream.try_poll_next_unpin(cx); let next_chunk = self.stream.try_poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk { if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
let offset = Duration::from_millis(cluster_head.end);
// we have actual data, so start the clock if we haven't yet;
// if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero
let start_time = this
.start_time
.get_or_insert_with(|| Instant::now() - offset);
// snooze until real time has "caught up" to the stream // snooze until real time has "caught up" to the stream
let sleep_until = *start_time + offset; let offset = Duration::from_millis(cluster_head.end);
this.sleep.reset(sleep_until); let sleep_until = self.start_time + offset;
self.sleep.reset(sleep_until);
} }
next_chunk next_chunk
} }
} }
pub trait ChunkStream pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
where
Self: Sized + TryStream<Ok = Chunk>,
{
/*fn fix_timecodes(self) -> Map<_> { /*fn fix_timecodes(self) -> Map<_> {
let fixer = ; let fixer = ;
self.map(move |chunk| { self.map(move |chunk| {
@ -147,7 +144,7 @@ where
StartingPointFinder { StartingPointFinder {
stream: self, stream: self,
seen_header: false, seen_header: false,
seen_keyframe: false, seen_keyframe: false
} }
} }

View file

@ -1,4 +1,3 @@
#[macro_use] extern crate log;
pub mod ebml; pub mod ebml;
pub mod error; pub mod error;

View file

@ -1,37 +1,44 @@
#[macro_use]
extern crate log; #[macro_use] extern crate log;
mod commands; mod commands;
use clap::{Parser, Subcommand}; use clap::{App, AppSettings, crate_version};
/// Utilities for broadcasting & relaying live WebM video/audio streams use crate::commands::{
#[derive(Parser, Debug)] relay,
#[clap(version)] filter,
struct Args { send,
#[clap(subcommand)] dump
command: Command, };
}
#[derive(Subcommand, Debug)] fn options() -> App<'static, 'static> {
enum Command { App::new("webmetro")
Dump(commands::dump::DumpArgs), .version(crate_version!())
Filter(commands::filter::FilterArgs), .about("Utilities for broadcasting & relaying live WebM video/audio streams")
Relay(commands::relay::RelayArgs), .setting(AppSettings::DisableHelpSubcommand)
Send(commands::send::SendArgs), .setting(AppSettings::VersionlessSubcommands)
.subcommand(relay::options())
.subcommand(filter::options())
.subcommand(send::options())
.subcommand(dump::options())
} }
fn main() { fn main() {
env_logger::init(); env_logger::init();
let args = Args::parse(); let args = options().get_matches();
match args.command { match args.subcommand() {
Command::Dump(args) => commands::dump::run(args), ("filter", Some(sub_args)) => filter::run(sub_args),
Command::Filter(args) => commands::filter::run(args), ("relay", Some(sub_args)) => relay::run(sub_args),
Command::Relay(args) => commands::relay::run(args), ("send", Some(sub_args)) => send::run(sub_args),
Command::Send(args) => commands::send::run(args), ("dump", Some(sub_args)) => dump::run(sub_args),
} _ => {
.unwrap_or_else(|err| { options().print_help().unwrap();
println!("");
Ok(())
}
}.unwrap_or_else(|err| {
error!("{}", err); error!("{}", err);
}); });
} }

View file

@ -1,8 +1,5 @@
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{ use futures3::stream::{Stream, StreamExt, TryStream};
stream::{Stream, StreamExt},
TryStreamExt,
};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::ebml::FromEbml; use crate::ebml::FromEbml;
@ -25,7 +22,11 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml: Sized { pub trait StreamEbml: Sized + TryStream + Unpin
where
Self: Sized + TryStream + Unpin,
Self::Ok: Buf,
{
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
stream: self, stream: self,
@ -36,28 +37,29 @@ pub trait StreamEbml: Sized {
} }
} }
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> StreamEbml for S where WebmetroError: From<E> impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
{}
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> EbmlStreamingParser<S> impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
where
WebmetroError: From<E>,
{
pub fn poll_event<'a, T: FromEbml<'a>>( pub fn poll_event<'a, T: FromEbml<'a>>(
&'a mut self, &'a mut self,
cx: &mut Context, cx: &mut Context,
) -> Poll<Option<Result<T, WebmetroError>>> { ) -> Poll<Option<Result<T, WebmetroError>>> {
loop { loop {
if let Some(info) = T::check_space(&self.buffer)? { match T::check_space(&self.buffer)? {
self.borrowed = self.buffer.split_to(info.element_len).freeze(); None => {
self.borrowed.advance(info.body_offset); // need to refill buffer, below
return Poll::Ready(Some( }
T::decode(info.element_id, &self.borrowed).map_err(Into::into), Some(info) => {
)); let mut bytes = self.buffer.split_to(info.element_len).freeze();
bytes.advance(info.body_offset);
self.borrowed = bytes;
return Poll::Ready(Some(T::decode(
info.element_id,
&self.borrowed,
).map_err(Into::into)));
}
} }
// need to refill buffer
if let Some(limit) = self.buffer_size_limit { if let Some(limit) = self.buffer_size_limit {
if limit <= self.buffer.len() { if limit <= self.buffer.len() {
return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded))); return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded)));
@ -75,12 +77,15 @@ where
} }
} }
} }
}
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> { pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
loop { loop {
if let Some(info) = T::check_space(&self.buffer)? { if let Some(info) = T::check_space(&self.buffer)? {
self.borrowed = self.buffer.split_to(info.element_len).freeze(); let mut bytes = self.buffer.split_to(info.element_len).freeze();
self.borrowed.advance(info.body_offset); bytes.advance(info.body_offset);
self.borrowed = bytes;
return Ok(Some(T::decode(info.element_id, &self.borrowed)?)); return Ok(Some(T::decode(info.element_id, &self.borrowed)?));
} }
@ -91,7 +96,7 @@ where
} }
} }
match self.stream.try_next().await? { match self.stream.next().await.transpose()? {
Some(refill) => { Some(refill) => {
self.buffer.reserve(refill.remaining()); self.buffer.reserve(refill.remaining());
self.buffer.put(refill); self.buffer.put(refill);
@ -107,7 +112,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::{future::poll_fn, stream::StreamExt, FutureExt}; use bytes::IntoBuf;
use futures3::{future::poll_fn, stream::StreamExt, FutureExt};
use matches::assert_matches; use matches::assert_matches;
use std::task::Poll::*; use std::task::Poll::*;
@ -124,8 +130,8 @@ mod tests {
&ENCODE_WEBM_TEST_FILE[40..], &ENCODE_WEBM_TEST_FILE[40..],
]; ];
let mut stream_parser = futures::stream::iter(pieces.iter()) let mut stream_parser = futures3::stream::iter(pieces.iter())
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..])) .map(|bytes| Ok(bytes.into_buf()))
.parse_ebml(); .parse_ebml();
assert_matches!( assert_matches!(
@ -176,8 +182,8 @@ mod tests {
]; ];
async { async {
let mut parser = futures::stream::iter(pieces.iter()) let mut parser = futures3::stream::iter(pieces.iter())
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..])) .map(|bytes| Ok(bytes.into_buf()))
.parse_ebml(); .parse_ebml();
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead)); assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));

View file

@ -1,6 +1,5 @@
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek}; use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
use byteorder::{BigEndian, ByteOrder}; use bytes::{BigEndian, BufMut, ByteOrder};
use bytes::BufMut;
use crate::ebml::*; use crate::ebml::*;
use crate::iterator::ebml_iter; use crate::iterator::ebml_iter;
use crate::iterator::EbmlIterator; use crate::iterator::EbmlIterator;
@ -104,12 +103,11 @@ pub fn encode_simple_block<T: Write>(block: SimpleBlock, output: &mut T) -> IoRe
encode_varint(Varint::Value(track), output)?; encode_varint(Varint::Value(track), output)?;
let mut buffer = [0; 3]; let mut buffer = Cursor::new([0; 3]);
let mut cursor = buffer.as_mut(); buffer.put_i16_be(timecode);
cursor.put_i16(timecode); buffer.put_u8(flags);
cursor.put_u8(flags);
output.write_all(&buffer)?; output.write_all(&buffer.get_ref()[..])?;
output.write_all(data) output.write_all(data)
} }
@ -133,7 +131,6 @@ pub fn encode_webm_element<T: Write + Seek>(element: WebmElement, output: &mut T
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Cursor;
use crate::tests::{ use crate::tests::{
TEST_FILE, TEST_FILE,
ENCODE_WEBM_TEST_FILE ENCODE_WEBM_TEST_FILE