Compare commits
44 commits
Author | SHA1 | Date | |
---|---|---|---|
1ce026eb2e | |||
3bb6641a53 | |||
fba1829970 | |||
fc3724683a | |||
4db64df278 | |||
884aa37888 | |||
2b88d09d0f | |||
bfe7d4b1d7 | |||
15520f26bd | |||
fc4e0577c8 | |||
8cecb2166f | |||
3bc46210e4 | |||
18fb8390a0 | |||
43acf93fa9 | |||
9cc7d8064d | |||
2fb8408ebc | |||
c8124ed388 | |||
4dc8ec1bbd | |||
d585ad7b31 | |||
5a6d1e764d | |||
4b923ebed5 | |||
17bf7f0eef | |||
bb013761b1 | |||
1a6764b778 | |||
c422a1c3f3 | |||
00ec517e78 | |||
1273b4adff | |||
f4f752548e | |||
9274fabeea | |||
dbcbf2831e | |||
2400720d03 | |||
5e2e1bcf83 | |||
3da59e2d96 | |||
|
3ed514c99c | ||
|
0c48d59e3a | ||
|
3f7adb3bd6 | ||
|
ca1ade1341 | ||
|
c6a1e3daf3 | ||
|
8fa16152f8 | ||
|
ba1d921a7e | ||
|
2f129d6986 | ||
|
22c6b0cfc8 | ||
|
188bf23803 | ||
|
77401e9f51 |
21 changed files with 1334 additions and 1914 deletions
23
CHANGELOG.md
Normal file
23
CHANGELOG.md
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
## v0.3.1-dev
|
||||||
|
- Teach filter subcommand to recognize --skip and --take options
|
||||||
|
- MSRV now rustc 1.61
|
||||||
|
- forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs.
|
||||||
|
- Add INFO logging for channel creation/garbage-collection
|
||||||
|
- Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start)
|
||||||
|
- Teach send subcommand to recognize --skip and --take options
|
||||||
|
|
||||||
|
## v0.3.0
|
||||||
|
- update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1
|
||||||
|
|
||||||
|
## v0.2.2
|
||||||
|
- use the `log` and `env_logger` crates for logging; the `RUST_LOG` environment variable configures the logging level.
|
||||||
|
- see [the env_logger documentation](https://docs.rs/env_logger/*/env_logger/) for more information
|
||||||
|
- support listening on multiple addresses if given a DNS name instead of an IP address. All bindings reference the same namespace for channels, but this allows, e.g., binding to both IPv4 and IPv6 `localhost`.
|
||||||
|
- released November 20, 2019
|
||||||
|
|
||||||
|
## v0.2.1
|
||||||
|
- update most internals to use `std::future`
|
||||||
|
|
||||||
|
## v0.2.0
|
||||||
|
- support proxying an arbitrary number of streams at `/live/$NAME`
|
||||||
|
- released October 27, 2018
|
2100
Cargo.lock
generated
2100
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
32
Cargo.toml
32
Cargo.toml
|
@ -1,20 +1,24 @@
|
||||||
[package]
|
[package]
|
||||||
name = "webmetro"
|
name = "webmetro"
|
||||||
version = "0.2.0"
|
version = "0.3.1-dev"
|
||||||
authors = ["Tangent 128 <Tangent128@gmail.com>"]
|
authors = ["Tangent 128 <Tangent128@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.4.12"
|
byteorder = "1"
|
||||||
clap = "2.33.0"
|
bytes = "1"
|
||||||
custom_error = "1.7"
|
clap = { version="^3.1.18", features=["cargo", "derive"] }
|
||||||
futures = "0.1.29"
|
custom_error = "^1.7"
|
||||||
futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
|
env_logger = "^0.9"
|
||||||
http = "0.1.18"
|
futures = "^0.3"
|
||||||
hyper = "0.12.35"
|
http = "^0.2"
|
||||||
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
|
html-escape = "0.2.13"
|
||||||
matches = "0.1.8"
|
hyper = "^0.14"
|
||||||
odds = { version = "0.3.1", features = ["std-vec"] }
|
log = "^0.4.8"
|
||||||
tokio2 = { package = "tokio", version="0.2.0-alpha.6" }
|
matches = "^0.1"
|
||||||
warp = "0.1.20"
|
pin-project = "1"
|
||||||
weak-table = "0.2.3"
|
tokio = { version="^1.18", features = ["io-std", "macros", "net", "rt", "rt-multi-thread", "time"] }
|
||||||
|
tokio-util = { version="^0.7", features=["codec"] }
|
||||||
|
urlencoding = "2.1.2"
|
||||||
|
warp = "^0.3"
|
||||||
|
weak-table = "^0.3"
|
||||||
|
|
8
build-buster.sh
Executable file
8
build-buster.sh
Executable file
|
@ -0,0 +1,8 @@
|
||||||
|
#!/bin/sh
|
||||||
|
# this will probably work with docker just as well as podman
|
||||||
|
|
||||||
|
podman run --rm \
|
||||||
|
-v ..:/usr/src/build/ \
|
||||||
|
-v ./buster-target:/usr/src/build/webmetro/target \
|
||||||
|
-w /usr/src/build/webmetro \
|
||||||
|
rust:1-buster cargo build --release
|
1
buster-target/.gitignore
vendored
Normal file
1
buster-target/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
*
|
|
@ -1,24 +1,11 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{
|
use std::sync::{Arc, Mutex};
|
||||||
Context,
|
use std::task::{Context, Poll};
|
||||||
Poll
|
|
||||||
};
|
|
||||||
use std::sync::{
|
|
||||||
Arc,
|
|
||||||
Mutex
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures3::{
|
use futures::{
|
||||||
channel::mpsc::{
|
channel::mpsc::{channel as mpsc_channel, Receiver, Sender},
|
||||||
channel as mpsc_channel,
|
|
||||||
Sender,
|
|
||||||
Receiver
|
|
||||||
},
|
|
||||||
Sink,
|
|
||||||
Stream,
|
Stream,
|
||||||
Never
|
|
||||||
};
|
};
|
||||||
use odds::vec::VecExt;
|
|
||||||
|
|
||||||
use crate::chunk::Chunk;
|
use crate::chunk::Chunk;
|
||||||
|
|
||||||
|
@ -30,77 +17,66 @@ use crate::chunk::Chunk;
|
||||||
pub struct Channel {
|
pub struct Channel {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
header_chunk: Option<Chunk>,
|
header_chunk: Option<Chunk>,
|
||||||
listeners: Vec<Sender<Chunk>>
|
listeners: Vec<Sender<Chunk>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Handle = Arc<Mutex<Channel>>;
|
pub type Handle = Arc<Mutex<Channel>>;
|
||||||
|
|
||||||
impl Channel {
|
impl Channel {
|
||||||
pub fn new(name: String) -> Handle {
|
pub fn new(name: String) -> Handle {
|
||||||
|
info!("Opening Channel {}", name);
|
||||||
Arc::new(Mutex::new(Channel {
|
Arc::new(Mutex::new(Channel {
|
||||||
name,
|
name,
|
||||||
header_chunk: None,
|
header_chunk: None,
|
||||||
listeners: Vec::new()
|
listeners: Vec::new(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for Channel {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
info!("Closing Channel {}", self.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Transmitter {
|
pub struct Transmitter {
|
||||||
channel: Handle
|
channel: Handle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Transmitter {
|
impl Transmitter {
|
||||||
pub fn new(channel_arc: Handle) -> Self {
|
pub fn new(channel_arc: Handle) -> Self {
|
||||||
Transmitter {
|
Transmitter {
|
||||||
channel: channel_arc
|
channel: channel_arc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Sink<Chunk> for Transmitter {
|
pub fn send(&self, chunk: Chunk) {
|
||||||
type Error = Never; // never errors, slow clients are simply dropped
|
|
||||||
|
|
||||||
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
|
|
||||||
let mut channel = self.channel.lock().expect("Locking channel");
|
let mut channel = self.channel.lock().expect("Locking channel");
|
||||||
|
|
||||||
if let Chunk::Headers { .. } = chunk {
|
if let Chunk::Headers { .. } = chunk {
|
||||||
channel.header_chunk = Some(chunk.clone());
|
channel.header_chunk = Some(chunk.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
|
channel
|
||||||
|
.listeners
|
||||||
Ok(())
|
.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
|
impl Drop for Transmitter {
|
||||||
let mut channel = self.channel.lock().expect("Locking channel");
|
fn drop(&mut self) {
|
||||||
let mut result = Poll::Ready(Ok(()));
|
if let Ok(mut channel) = self.channel.lock() {
|
||||||
|
// when disconnecting, clean up the header chunk so subsequent
|
||||||
// just disconnect any erroring listeners
|
// clients don't get a potentially incorrect initialization segment
|
||||||
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) {
|
channel.header_chunk = None;
|
||||||
Poll::Pending => {result = Poll::Pending; true},
|
}
|
||||||
Poll::Ready(Ok(())) => true,
|
|
||||||
Poll::Ready(Err(_)) => false,
|
|
||||||
});
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
|
|
||||||
// don't actually disconnect listeners, since other sources may want to transmit to this channel;
|
|
||||||
// just ensure we've sent everything we can out
|
|
||||||
self.poll_flush(cx)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Listener {
|
pub struct Listener {
|
||||||
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
|
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
|
||||||
_channel: Handle,
|
_channel: Handle,
|
||||||
receiver: Receiver<Chunk>
|
receiver: Receiver<Chunk>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Listener {
|
impl Listener {
|
||||||
|
@ -111,7 +87,9 @@ impl Listener {
|
||||||
let mut channel = channel_arc.lock().expect("Locking channel");
|
let mut channel = channel_arc.lock().expect("Locking channel");
|
||||||
|
|
||||||
if let Some(ref chunk) = channel.header_chunk {
|
if let Some(ref chunk) = channel.header_chunk {
|
||||||
sender.start_send(chunk.clone()).expect("Queuing existing header chunk");
|
sender
|
||||||
|
.start_send(chunk.clone())
|
||||||
|
.expect("Queuing existing header chunk");
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.listeners.push(sender);
|
channel.listeners.push(sender);
|
||||||
|
@ -119,7 +97,7 @@ impl Listener {
|
||||||
|
|
||||||
Listener {
|
Listener {
|
||||||
_channel: channel_arc,
|
_channel: channel_arc,
|
||||||
receiver: receiver
|
receiver: receiver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
234
src/chunk.rs
234
src/chunk.rs
|
@ -1,24 +1,23 @@
|
||||||
use bytes::{Buf, Bytes};
|
use crate::error::WebmetroError;
|
||||||
use futures3::prelude::*;
|
use crate::stream_parser::EbmlStreamingParser;
|
||||||
|
use crate::webm::*;
|
||||||
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
|
use futures::prelude::*;
|
||||||
use std::{
|
use std::{
|
||||||
io::Cursor,
|
io::Cursor,
|
||||||
mem,
|
mem,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll, Poll::*},
|
task::{Context, Poll, Poll::*},
|
||||||
};
|
};
|
||||||
use crate::stream_parser::EbmlStreamingParser;
|
|
||||||
use crate::error::WebmetroError;
|
|
||||||
use crate::webm::*;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct ClusterHead {
|
pub struct ClusterHead {
|
||||||
pub keyframe: bool,
|
pub keyframe: bool,
|
||||||
pub start: u64,
|
pub start: u64,
|
||||||
pub end: u64,
|
pub end: u64,
|
||||||
/// space for a Cluster tag and a Timecode tag
|
/// a Cluster tag and a Timecode tag together take at most 15 bytes;
|
||||||
/// TODO: consider using a BytesMut here for simplicity
|
/// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems
|
||||||
bytes: [u8;16],
|
bytes: BytesMut,
|
||||||
bytes_used: u8
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClusterHead {
|
impl ClusterHead {
|
||||||
|
@ -27,8 +26,7 @@ impl ClusterHead {
|
||||||
keyframe: false,
|
keyframe: false,
|
||||||
start: 0,
|
start: 0,
|
||||||
end: 0,
|
end: 0,
|
||||||
bytes: [0;16],
|
bytes: BytesMut::with_capacity(15),
|
||||||
bytes_used: 0
|
|
||||||
};
|
};
|
||||||
cluster_head.update_timecode(timecode);
|
cluster_head.update_timecode(timecode);
|
||||||
cluster_head
|
cluster_head
|
||||||
|
@ -37,11 +35,14 @@ impl ClusterHead {
|
||||||
let delta = self.end - self.start;
|
let delta = self.end - self.start;
|
||||||
self.start = timecode;
|
self.start = timecode;
|
||||||
self.end = self.start + delta;
|
self.end = self.start + delta;
|
||||||
let mut cursor = Cursor::new(self.bytes.as_mut());
|
let mut buffer = [0; 15];
|
||||||
|
let mut cursor = Cursor::new(buffer.as_mut());
|
||||||
// buffer is sized so these should never fail
|
// buffer is sized so these should never fail
|
||||||
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
|
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
|
||||||
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
|
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
|
||||||
self.bytes_used = cursor.position() as u8;
|
self.bytes.clear();
|
||||||
|
let len = cursor.position() as usize;
|
||||||
|
self.bytes.extend_from_slice(&buffer[..len]);
|
||||||
}
|
}
|
||||||
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
|
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
|
||||||
let absolute_timecode = self.start + (timecode as u64);
|
let absolute_timecode = self.start + (timecode as u64);
|
||||||
|
@ -51,41 +52,52 @@ impl ClusterHead {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsRef<[u8]> for ClusterHead {
|
|
||||||
fn as_ref(&self) -> &[u8] {
|
|
||||||
self.bytes[..self.bytes_used as usize].as_ref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A chunk of WebM data
|
/// A chunk of WebM data
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum Chunk {
|
pub enum Chunk {
|
||||||
Headers {
|
Headers { bytes: Bytes },
|
||||||
bytes: Bytes
|
Cluster(ClusterHead, Bytes),
|
||||||
},
|
|
||||||
ClusterHead(ClusterHead),
|
|
||||||
ClusterBody {
|
|
||||||
bytes: Bytes
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Chunk {
|
impl Chunk {
|
||||||
/// converts this chunk of data into a Bytes object, perhaps to send over the network
|
pub fn overlaps(&self, start: u128, stop: u128) -> bool {
|
||||||
pub fn into_bytes(self) -> Bytes {
|
|
||||||
match self {
|
match self {
|
||||||
Chunk::Headers {bytes, ..} => bytes,
|
Chunk::Cluster(head, _) => head.start as u128 <= stop && head.end as u128 >= start,
|
||||||
Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()),
|
_ => true,
|
||||||
Chunk::ClusterBody {bytes, ..} => bytes
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsRef<[u8]> for Chunk {
|
impl IntoIterator for Chunk {
|
||||||
fn as_ref(&self) -> &[u8] {
|
type Item = Bytes;
|
||||||
|
type IntoIter = Iter;
|
||||||
|
|
||||||
|
fn into_iter(self) -> Self::IntoIter {
|
||||||
match self {
|
match self {
|
||||||
&Chunk::Headers {ref bytes, ..} => bytes.as_ref(),
|
Chunk::Headers { bytes } => Iter::Buffer(bytes),
|
||||||
&Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(),
|
Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes),
|
||||||
&Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref()
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum Iter {
|
||||||
|
Cluster(ClusterHead, Bytes),
|
||||||
|
Buffer(Bytes),
|
||||||
|
Empty,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for Iter {
|
||||||
|
type Item = Bytes;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Bytes> {
|
||||||
|
let iter = mem::replace(self, Iter::Empty);
|
||||||
|
match iter {
|
||||||
|
Iter::Cluster(ClusterHead { bytes: head, .. }, body) => {
|
||||||
|
*self = Iter::Buffer(body);
|
||||||
|
Some(head.freeze())
|
||||||
|
}
|
||||||
|
Iter::Buffer(bytes) => Some(bytes),
|
||||||
|
Iter::Empty => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -95,19 +107,13 @@ enum ChunkerState {
|
||||||
BuildingHeader(Cursor<Vec<u8>>),
|
BuildingHeader(Cursor<Vec<u8>>),
|
||||||
// ClusterHead & body buffer
|
// ClusterHead & body buffer
|
||||||
BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
|
BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
|
||||||
EmittingClusterBody(Vec<u8>),
|
End,
|
||||||
EmittingClusterBodyBeforeNewHeader {
|
|
||||||
body: Vec<u8>,
|
|
||||||
new_header: Cursor<Vec<u8>>
|
|
||||||
},
|
|
||||||
EmittingFinalClusterBody(Vec<u8>),
|
|
||||||
End
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WebmChunker<S> {
|
pub struct WebmChunker<S> {
|
||||||
source: EbmlStreamingParser<S>,
|
source: EbmlStreamingParser<S>,
|
||||||
buffer_size_limit: Option<usize>,
|
buffer_size_limit: Option<usize>,
|
||||||
state: ChunkerState
|
state: ChunkerState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> WebmChunker<S> {
|
impl<S> WebmChunker<S> {
|
||||||
|
@ -120,7 +126,11 @@ impl<S> WebmChunker<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usize>) -> Result<(), WebmetroError> {
|
fn encode(
|
||||||
|
element: WebmElement,
|
||||||
|
buffer: &mut Cursor<Vec<u8>>,
|
||||||
|
limit: Option<usize>,
|
||||||
|
) -> Result<(), WebmetroError> {
|
||||||
if let Some(limit) = limit {
|
if let Some(limit) = limit {
|
||||||
if limit <= buffer.get_ref().len() {
|
if limit <= buffer.get_ref().len() {
|
||||||
return Err(WebmetroError::ResourcesExceeded);
|
return Err(WebmetroError::ResourcesExceeded);
|
||||||
|
@ -130,11 +140,16 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
|
||||||
encode_webm_element(element, buffer).map_err(|err| err.into())
|
encode_webm_element(element, buffer).map_err(|err| err.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S>
|
||||||
|
where
|
||||||
|
WebmetroError: From<E>,
|
||||||
{
|
{
|
||||||
type Item = Result<Chunk, WebmetroError>;
|
type Item = Result<Chunk, WebmetroError>;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
fn poll_next(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
||||||
let mut chunker = self.get_mut();
|
let mut chunker = self.get_mut();
|
||||||
loop {
|
loop {
|
||||||
match chunker.state {
|
match chunker.state {
|
||||||
|
@ -145,116 +160,117 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
|
||||||
Ready(None) => return Ready(None),
|
Ready(None) => return Ready(None),
|
||||||
Ready(Some(Ok(element))) => match element {
|
Ready(Some(Ok(element))) => match element {
|
||||||
WebmElement::Cluster => {
|
WebmElement::Cluster => {
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer =
|
||||||
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
|
mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
let header_chunk = Chunk::Headers {
|
||||||
|
bytes: Bytes::from(liberated_buffer.into_inner()),
|
||||||
|
};
|
||||||
|
|
||||||
chunker.state = ChunkerState::BuildingCluster(
|
chunker.state = ChunkerState::BuildingCluster(
|
||||||
ClusterHead::new(0),
|
ClusterHead::new(0),
|
||||||
Cursor::new(Vec::new())
|
Cursor::new(Vec::new()),
|
||||||
);
|
);
|
||||||
return Ready(Some(Ok(header_chunk)));
|
return Ready(Some(Ok(header_chunk)));
|
||||||
},
|
}
|
||||||
WebmElement::Info => {},
|
WebmElement::Info => {}
|
||||||
WebmElement::Void => {},
|
WebmElement::Void => {}
|
||||||
WebmElement::Unknown(_) => {},
|
WebmElement::Unknown(_) => {}
|
||||||
element => {
|
element => {
|
||||||
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
|
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
|
||||||
|
{
|
||||||
chunker.state = ChunkerState::End;
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Err(err)));
|
return Ready(Some(Err(err)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
|
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
|
||||||
match chunker.source.poll_event(cx) {
|
match chunker.source.poll_event(cx) {
|
||||||
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
|
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
|
||||||
Pending => return Pending,
|
Pending => return Pending,
|
||||||
Ready(Some(Ok(element))) => match element {
|
Ready(Some(Ok(element))) => match element {
|
||||||
WebmElement::EbmlHead | WebmElement::Segment => {
|
WebmElement::EbmlHead | WebmElement::Segment => {
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head =
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
|
let liberated_buffer =
|
||||||
|
mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
|
||||||
let mut new_header_cursor = Cursor::new(Vec::new());
|
let mut new_header_cursor = Cursor::new(Vec::new());
|
||||||
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
|
match encode(
|
||||||
|
element,
|
||||||
|
&mut new_header_cursor,
|
||||||
|
chunker.buffer_size_limit,
|
||||||
|
) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{
|
chunker.state =
|
||||||
body: liberated_buffer.into_inner(),
|
ChunkerState::BuildingHeader(new_header_cursor);
|
||||||
new_header: new_header_cursor
|
return Ready(Some(Ok(Chunk::Cluster(
|
||||||
};
|
liberated_cluster_head,
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
Bytes::from(liberated_buffer.into_inner()),
|
||||||
},
|
))));
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
chunker.state = ChunkerState::End;
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Err(err)));
|
return Ready(Some(Err(err)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
WebmElement::Cluster => {
|
WebmElement::Cluster => {
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head =
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
|
let liberated_buffer =
|
||||||
|
mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
|
||||||
chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner());
|
return Ready(Some(Ok(Chunk::Cluster(
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
liberated_cluster_head,
|
||||||
},
|
Bytes::from(liberated_buffer.into_inner()),
|
||||||
|
))));
|
||||||
|
}
|
||||||
WebmElement::Timecode(timecode) => {
|
WebmElement::Timecode(timecode) => {
|
||||||
cluster_head.update_timecode(timecode);
|
cluster_head.update_timecode(timecode);
|
||||||
},
|
}
|
||||||
WebmElement::SimpleBlock(ref block) => {
|
WebmElement::SimpleBlock(ref block) => {
|
||||||
if (block.flags & 0b10000000) != 0 {
|
if (block.flags & 0b10000000) != 0 {
|
||||||
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
|
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
|
||||||
cluster_head.keyframe = true;
|
cluster_head.keyframe = true;
|
||||||
}
|
}
|
||||||
cluster_head.observe_simpleblock_timecode(block.timecode);
|
cluster_head.observe_simpleblock_timecode(block.timecode);
|
||||||
if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) {
|
if let Err(err) = encode(
|
||||||
|
WebmElement::SimpleBlock(*block),
|
||||||
|
buffer,
|
||||||
|
chunker.buffer_size_limit,
|
||||||
|
) {
|
||||||
chunker.state = ChunkerState::End;
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Err(err)));
|
return Ready(Some(Err(err)));
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
WebmElement::Info => {},
|
WebmElement::Info => {}
|
||||||
WebmElement::Void => {},
|
WebmElement::Void => {}
|
||||||
WebmElement::Unknown(_) => {},
|
WebmElement::Unknown(_) => {}
|
||||||
element => {
|
element => {
|
||||||
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
|
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
|
||||||
|
{
|
||||||
chunker.state = ChunkerState::End;
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Err(err)));
|
return Ready(Some(Err(err)));
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
},
|
},
|
||||||
Ready(None) => {
|
Ready(None) => {
|
||||||
// flush final Cluster on end of stream
|
// flush final Cluster on end of stream
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head =
|
||||||
|
mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
|
||||||
chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner());
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
return Ready(Some(Ok(Chunk::Cluster(
|
||||||
|
liberated_cluster_head,
|
||||||
|
Bytes::from(liberated_buffer.into_inner()),
|
||||||
|
))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
ChunkerState::EmittingClusterBody(ref mut buffer) => {
|
ChunkerState::End => return Ready(None),
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::BuildingCluster(
|
|
||||||
ClusterHead::new(0),
|
|
||||||
Cursor::new(Vec::new())
|
|
||||||
);
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
|
|
||||||
},
|
|
||||||
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
|
|
||||||
let liberated_body = mem::replace(body, Vec::new());
|
|
||||||
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
|
|
||||||
},
|
|
||||||
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
|
|
||||||
// flush final Cluster on end of stream
|
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::End;
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
|
|
||||||
},
|
|
||||||
ChunkerState::End => return Ready(None)
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,7 +287,7 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
|
||||||
WebmChunker {
|
WebmChunker {
|
||||||
source: self,
|
source: self,
|
||||||
buffer_size_limit: None,
|
buffer_size_limit: None,
|
||||||
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new()))
|
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,35 +1,27 @@
|
||||||
use clap::{App, AppSettings, ArgMatches, SubCommand};
|
use clap::Args;
|
||||||
use tokio2::runtime::Runtime;
|
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::stdin_stream;
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
error::WebmetroError,
|
error::WebmetroError,
|
||||||
stream_parser::StreamEbml,
|
stream_parser::StreamEbml,
|
||||||
webm::{
|
webm::{SimpleBlock, WebmElement::*},
|
||||||
SimpleBlock,
|
|
||||||
WebmElement::*
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn options() -> App<'static, 'static> {
|
/// Dumps WebM parsing events from parsing stdin
|
||||||
SubCommand::with_name("dump")
|
#[derive(Args, Debug)]
|
||||||
.setting(AppSettings::Hidden)
|
pub struct DumpArgs;
|
||||||
.about("Dumps WebM parsing events from parsing stdin")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
pub async fn run(_args: DumpArgs) -> Result<(), WebmetroError> {
|
||||||
let mut events = stdin_stream().parse_ebml();
|
let mut events = stdin_stream().parse_ebml();
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(async {
|
while let Some(element) = events.next().await? {
|
||||||
while let Some(element) = events.next().await? {
|
match element {
|
||||||
match element {
|
// suppress printing byte arrays
|
||||||
// suppress printing byte arrays
|
Tracks(slice) => println!("Tracks[{}]", slice.len()),
|
||||||
Tracks(slice) => println!("Tracks[{}]", slice.len()),
|
SimpleBlock(SimpleBlock { timecode, .. }) => println!("SimpleBlock@{}", timecode),
|
||||||
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
|
other => println!("{:?}", other),
|
||||||
other => println!("{:?}", other)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
})
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,49 +1,53 @@
|
||||||
use std::{
|
use std::{io, io::prelude::*, pin::Pin, time::Duration};
|
||||||
io,
|
|
||||||
io::prelude::*
|
|
||||||
};
|
|
||||||
|
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use clap::Args;
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures3::future::ready;
|
|
||||||
use tokio2::runtime::Runtime;
|
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::{parse_time, stdin_stream};
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
chunk::{
|
chunk::{Chunk, WebmStream},
|
||||||
Chunk,
|
|
||||||
WebmStream
|
|
||||||
},
|
|
||||||
error::WebmetroError,
|
error::WebmetroError,
|
||||||
fixers::{
|
fixers::{ChunkTimecodeFixer, Throttle},
|
||||||
ChunkTimecodeFixer,
|
stream_parser::StreamEbml,
|
||||||
Throttle,
|
|
||||||
},
|
|
||||||
stream_parser::StreamEbml
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn options() -> App<'static, 'static> {
|
/// Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.
|
||||||
SubCommand::with_name("filter")
|
#[derive(Args, Debug)]
|
||||||
.about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.")
|
pub struct FilterArgs {
|
||||||
.arg(Arg::with_name("throttle")
|
/// Slow down output to "real time" speed as determined by the timestamps (useful for streaming static files)
|
||||||
.long("throttle")
|
#[clap(long)]
|
||||||
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
throttle: bool,
|
||||||
|
/// Skip approximately n seconds of content before uploading or throttling
|
||||||
|
#[clap(long, short, parse(try_from_str = parse_time))]
|
||||||
|
skip: Option<Duration>,
|
||||||
|
/// Stop uploading after approximately n seconds of content
|
||||||
|
#[clap(long, short, parse(try_from_str = parse_time))]
|
||||||
|
take: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
#[tokio::main]
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> {
|
||||||
let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
|
let start_time = args.skip.map_or(0, |s| s.as_millis());
|
||||||
stdin_stream()
|
let stop_time = args
|
||||||
.parse_ebml()
|
.take
|
||||||
.chunk_webm()
|
.map_or(std::u128::MAX, |t| t.as_millis() + start_time);
|
||||||
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
|
||||||
);
|
|
||||||
|
|
||||||
if args.is_present("throttle") {
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
chunk_stream = Box::new(Throttle::new(chunk_stream));
|
let mut chunk_stream: Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send>> =
|
||||||
|
Box::pin(
|
||||||
|
stdin_stream()
|
||||||
|
.parse_ebml()
|
||||||
|
.chunk_webm()
|
||||||
|
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
||||||
|
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
|
||||||
|
);
|
||||||
|
|
||||||
|
if args.throttle {
|
||||||
|
chunk_stream = Box::pin(Throttle::new(chunk_stream));
|
||||||
}
|
}
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| {
|
while let Some(chunk) = chunk_stream.next().await {
|
||||||
ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from))
|
chunk?.into_iter().try_for_each(|buffer| io::stdout().write_all(&buffer))?;
|
||||||
}))
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use std::io::Cursor;
|
use std::time::Duration;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures3::TryStreamExt;
|
use futures::{Stream, TryStreamExt};
|
||||||
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
use webmetro::error::WebmetroError;
|
use webmetro::error::WebmetroError;
|
||||||
|
|
||||||
pub mod dump;
|
pub mod dump;
|
||||||
|
@ -12,13 +13,15 @@ pub mod send;
|
||||||
/// An adapter that makes chunks of bytes from stdin available as a Stream;
|
/// An adapter that makes chunks of bytes from stdin available as a Stream;
|
||||||
/// is NOT actually async, and just uses blocking read. Don't use more than
|
/// is NOT actually async, and just uses blocking read. Don't use more than
|
||||||
/// one at once, who knows who gets which bytes.
|
/// one at once, who knows who gets which bytes.
|
||||||
pub fn stdin_stream() -> impl futures3::TryStream<
|
pub fn stdin_stream() -> impl Stream<Item = Result<Bytes, std::io::Error>> + Sized + Unpin {
|
||||||
Item = Result<Cursor<Bytes>, WebmetroError>,
|
FramedRead::new(tokio::io::stdin(), BytesCodec::new()).map_ok(|bytes| bytes.freeze())
|
||||||
Ok = Cursor<Bytes>,
|
}
|
||||||
Error = WebmetroError,
|
|
||||||
> + Sized
|
pub fn parse_time(arg: &str) -> Result<Duration, WebmetroError> {
|
||||||
+ Unpin {
|
match arg.parse() {
|
||||||
tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
|
Ok(secs) => Ok(Duration::from_secs(secs)),
|
||||||
.map_ok(|bytes| Cursor::new(bytes.freeze()))
|
Err(err) => Err(WebmetroError::ApplicationError {
|
||||||
.map_err(WebmetroError::from)
|
message: err.to_string(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,85 +1,56 @@
|
||||||
use std::net::ToSocketAddrs;
|
use std::net::ToSocketAddrs;
|
||||||
use std::sync::{
|
use std::sync::{Arc, Mutex, Weak};
|
||||||
Arc,
|
use std::time::{SystemTime, UNIX_EPOCH, Duration};
|
||||||
Mutex,
|
|
||||||
Weak
|
|
||||||
};
|
|
||||||
|
|
||||||
use bytes::{Bytes, Buf};
|
use bytes::{Buf, Bytes};
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use clap::Args;
|
||||||
use futures::{
|
use futures::{prelude::*, stream::FuturesUnordered, Stream};
|
||||||
Future,
|
use html_escape::encode_double_quoted_attribute;
|
||||||
Stream,
|
|
||||||
Sink,
|
|
||||||
stream::empty
|
|
||||||
};
|
|
||||||
use futures3::{
|
|
||||||
compat::{
|
|
||||||
Compat,
|
|
||||||
CompatSink,
|
|
||||||
Compat01As03,
|
|
||||||
},
|
|
||||||
Never,
|
|
||||||
prelude::*,
|
|
||||||
};
|
|
||||||
use hyper::{
|
use hyper::{
|
||||||
Body,
|
header::{CACHE_CONTROL, CONTENT_TYPE},
|
||||||
Response,
|
Body, Response,
|
||||||
header::{
|
|
||||||
CACHE_CONTROL,
|
|
||||||
CONTENT_TYPE
|
|
||||||
}
|
|
||||||
};
|
|
||||||
use warp::{
|
|
||||||
self,
|
|
||||||
Filter,
|
|
||||||
path
|
|
||||||
};
|
|
||||||
use weak_table::{
|
|
||||||
WeakValueHashMap
|
|
||||||
};
|
};
|
||||||
|
use stream::iter;
|
||||||
|
use warp::reply::{html, with_header};
|
||||||
|
use warp::{self, path, Filter, Reply};
|
||||||
|
use weak_table::WeakValueHashMap;
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
channel::{
|
channel::{Channel, Handle, Listener, Transmitter},
|
||||||
Channel,
|
chunk::Chunk,
|
||||||
Handle,
|
|
||||||
Listener,
|
|
||||||
Transmitter
|
|
||||||
},
|
|
||||||
chunk::WebmStream,
|
chunk::WebmStream,
|
||||||
error::WebmetroError,
|
error::WebmetroError,
|
||||||
fixers::{
|
fixers::{ChunkStream, ChunkTimecodeFixer},
|
||||||
ChunkStream,
|
stream_parser::StreamEbml,
|
||||||
ChunkTimecodeFixer,
|
|
||||||
},
|
|
||||||
stream_parser::StreamEbml
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
||||||
|
|
||||||
fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
|
fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
Compat::new(Listener::new(channel).map(|c| Ok(c))
|
Listener::new(channel)
|
||||||
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
.map(|c| Result::<Chunk, WebmetroError>::Ok(c))
|
||||||
.find_starting_point()
|
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
||||||
.map_ok(|webm_chunk| webm_chunk.into_bytes())
|
.find_starting_point()
|
||||||
.map_err(|err: Never| match err {}))
|
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
|
||||||
|
.try_flatten()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
|
fn post_stream(
|
||||||
let source = Compat01As03::new(stream
|
channel: Handle,
|
||||||
.map_err(WebmetroError::from))
|
stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
|
||||||
.parse_ebml().with_soft_limit(BUFFER_LIMIT)
|
) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
||||||
.chunk_webm().with_soft_limit(BUFFER_LIMIT);
|
let channel = Transmitter::new(channel);
|
||||||
let sink = CompatSink::new(Transmitter::new(channel));
|
stream
|
||||||
|
.map_err(WebmetroError::from)
|
||||||
Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
|
.parse_ebml()
|
||||||
.into_stream()
|
.with_soft_limit(BUFFER_LIMIT)
|
||||||
.map(|_| empty())
|
.chunk_webm()
|
||||||
.map_err(|err| {
|
.with_soft_limit(BUFFER_LIMIT)
|
||||||
println!("[Warning] {}", err);
|
.map_ok(move |chunk| {
|
||||||
err
|
channel.send(chunk);
|
||||||
})
|
Bytes::new()
|
||||||
.flatten()
|
})
|
||||||
|
.inspect_err(|err| warn!("{}", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn media_response(body: Body) -> Response<Body> {
|
fn media_response(body: Body) -> Response<Body> {
|
||||||
|
@ -91,48 +62,80 @@ fn media_response(body: Body) -> Response<Body> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn options() -> App<'static, 'static> {
|
fn player_css() -> impl Reply {
|
||||||
SubCommand::with_name("relay")
|
let css = include_str!("../data/player.css");
|
||||||
.about("Hosts an HTTP-based relay server")
|
with_header(css, CONTENT_TYPE, "text/css")
|
||||||
.arg(Arg::with_name("listen")
|
|
||||||
.help("The address:port to listen to")
|
|
||||||
.required(true))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
fn player_html(channel: impl AsRef<str>) -> impl Reply {
|
||||||
let channel_map = Arc::new(Mutex::new(WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new()));
|
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos();
|
||||||
|
let player = format!(
|
||||||
|
include_str!("../data/player.html"),
|
||||||
|
channel = encode_double_quoted_attribute(channel.as_ref()),
|
||||||
|
cachebust = timestamp
|
||||||
|
);
|
||||||
|
html(player)
|
||||||
|
}
|
||||||
|
|
||||||
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
|
/// Hosts an HTTP-based relay server
|
||||||
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?;
|
#[derive(Args, Debug)]
|
||||||
|
pub struct RelayArgs {
|
||||||
|
/// The address:port to listen to
|
||||||
|
listen: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
|
||||||
|
let channel_map = Arc::new(Mutex::new(
|
||||||
|
WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new(),
|
||||||
|
));
|
||||||
|
let addr_str = args.listen;
|
||||||
|
|
||||||
|
let addrs = addr_str.to_socket_addrs()?;
|
||||||
|
info!("Binding to {:?}", addrs);
|
||||||
|
if addrs.len() == 0 {
|
||||||
|
return Err("Listen address didn't resolve".into());
|
||||||
|
}
|
||||||
|
|
||||||
let channel = path!("live" / String).map(move |name: String| {
|
let channel = path!("live" / String).map(move |name: String| {
|
||||||
let channel = channel_map.lock().unwrap()
|
let channel = channel_map
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
.entry(name.clone())
|
.entry(name.clone())
|
||||||
.or_insert_with(|| Channel::new(name.clone()));
|
.or_insert_with(|| Channel::new(name.clone()));
|
||||||
(channel, name)
|
(channel, name)
|
||||||
});
|
});
|
||||||
|
|
||||||
let head = channel.clone().and(warp::head())
|
let head = channel.clone().and(warp::head()).map(|(_, name)| {
|
||||||
.map(|(_, name)| {
|
info!("HEAD Request For Channel {}", name);
|
||||||
println!("[Info] HEAD Request For Channel {}", name);
|
media_response(Body::empty())
|
||||||
media_response(Body::empty())
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let get = channel.clone().and(warp::get2())
|
let get = channel.clone().and(warp::get()).map(|(channel, name)| {
|
||||||
.map(|(channel, name)| {
|
info!("Listener Connected On Channel {}", name);
|
||||||
println!("[Info] Listener Connected On Channel {}", name);
|
media_response(Body::wrap_stream(get_stream(channel)))
|
||||||
media_response(Body::wrap_stream(get_stream(channel)))
|
});
|
||||||
});
|
|
||||||
|
|
||||||
let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify())
|
let post_put = channel
|
||||||
.and(warp::body::stream()).map(|(channel, name), stream| {
|
.clone()
|
||||||
println!("[Info] Source Connected On Channel {}", name);
|
.and(warp::post().or(warp::put()).unify())
|
||||||
|
.and(warp::body::stream())
|
||||||
|
.map(|(channel, name), stream| {
|
||||||
|
info!("Source Connected On Channel {}", name);
|
||||||
Response::new(Body::wrap_stream(post_stream(channel, stream)))
|
Response::new(Body::wrap_stream(post_stream(channel, stream)))
|
||||||
});
|
});
|
||||||
|
|
||||||
let routes = head
|
let live = head.or(get).or(post_put);
|
||||||
.or(get)
|
let watch = path!("watch" / String).map(player_html);
|
||||||
.or(post_put);
|
let css = path!("static" / "css").map(player_css);
|
||||||
|
|
||||||
Ok(warp::serve(routes).run(addr))
|
let routes = live.or(watch).or(css);
|
||||||
|
|
||||||
|
let mut server_futures: FuturesUnordered<_> = addrs
|
||||||
|
.map(|addr| warp::serve(routes.clone()).try_bind(addr))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
while let Some(_) = server_futures.next().await {}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use bytes::Bytes;
|
||||||
use futures3::prelude::*;
|
use clap::Args;
|
||||||
use hyper13::{client::HttpConnector, Body, Client, Request};
|
use futures::prelude::*;
|
||||||
use std::io::{stdout, Write};
|
use hyper::{client::HttpConnector, Body, Client, Request};
|
||||||
use tokio2::runtime::Runtime;
|
use std::{
|
||||||
|
io::{stdout, Write},
|
||||||
|
pin::Pin,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use stream::iter;
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::{parse_time, stdin_stream};
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
chunk::{Chunk, WebmStream},
|
chunk::{Chunk, WebmStream},
|
||||||
error::WebmetroError,
|
error::WebmetroError,
|
||||||
|
@ -12,60 +17,62 @@ use webmetro::{
|
||||||
stream_parser::StreamEbml,
|
stream_parser::StreamEbml,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn options() -> App<'static, 'static> {
|
type BoxedChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync>>;
|
||||||
SubCommand::with_name("send")
|
|
||||||
.about("PUTs WebM from stdin to a relay server.")
|
/// PUTs WebM from stdin to a relay server.
|
||||||
.arg(Arg::with_name("url")
|
#[derive(Args, Debug)]
|
||||||
.help("The location to upload to")
|
pub struct SendArgs {
|
||||||
.required(true))
|
/// The location to upload to
|
||||||
.arg(Arg::with_name("throttle")
|
url: String,
|
||||||
.long("throttle")
|
/// Slow down upload to "real time" speed as determined by the timestamps (useful for streaming static files)
|
||||||
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
#[clap(long)]
|
||||||
|
throttle: bool,
|
||||||
|
/// Skip approximately n seconds of content before uploading or throttling
|
||||||
|
#[clap(long, short, parse(try_from_str = parse_time))]
|
||||||
|
skip: Option<Duration>,
|
||||||
|
/// Stop uploading after approximately n seconds of content
|
||||||
|
#[clap(long, short, parse(try_from_str = parse_time))]
|
||||||
|
take: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type BoxedChunkStream = Box<
|
#[tokio::main]
|
||||||
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
|
pub async fn run(args: SendArgs) -> Result<(), WebmetroError> {
|
||||||
+ Send
|
let start_time = args.skip.map_or(0, |s| s.as_millis());
|
||||||
+ Sync
|
let stop_time = args
|
||||||
+ Unpin,
|
.take
|
||||||
>;
|
.map_or(std::u128::MAX, |t| t.as_millis() + start_time);
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
// build pipeline
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
let mut chunk_stream: BoxedChunkStream = Box::new(
|
let mut chunk_stream: BoxedChunkStream = Box::pin(
|
||||||
stdin_stream()
|
stdin_stream()
|
||||||
.parse_ebml()
|
.parse_ebml()
|
||||||
.chunk_webm()
|
.chunk_webm()
|
||||||
.map_ok(move |chunk| timecode_fixer.process(chunk)),
|
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
||||||
|
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
|
||||||
);
|
);
|
||||||
|
|
||||||
let url_str = match args.value_of("url") {
|
if args.throttle {
|
||||||
Some(url) => String::from(url),
|
chunk_stream = Box::pin(Throttle::new(chunk_stream));
|
||||||
_ => return Err("Listen address wasn't provided".into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if args.is_present("throttle") {
|
|
||||||
chunk_stream = Box::new(Throttle::new(chunk_stream));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_stream = chunk_stream
|
let chunk_stream = chunk_stream
|
||||||
.map_ok(|webm_chunk| webm_chunk.into_bytes())
|
.map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
|
||||||
|
.try_flatten()
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
eprintln!("{}", &err);
|
warn!("{}", &err);
|
||||||
err
|
err
|
||||||
});
|
});
|
||||||
|
|
||||||
let request_payload = Body::wrap_stream(chunk_stream);
|
let request_payload = Body::wrap_stream(chunk_stream);
|
||||||
|
|
||||||
let request = Request::put(url_str).body(request_payload)?;
|
let request = Request::put(args.url).body(request_payload)?;
|
||||||
let client = Client::builder().build(HttpConnector::new());
|
let client = Client::builder().build(HttpConnector::new());
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(async {
|
let response = client.request(request).await?;
|
||||||
let response = client.request(request).await?;
|
let mut response_stream = response.into_body();
|
||||||
let mut response_stream = response.into_body();
|
while let Some(response_chunk) = response_stream.try_next().await? {
|
||||||
while let Some(response_chunk) = response_stream.next().await.transpose()? {
|
stdout().write_all(&response_chunk)?;
|
||||||
stdout().write_all(&response_chunk)?;
|
}
|
||||||
}
|
Ok(())
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
20
src/data/player.css
Normal file
20
src/data/player.css
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
body {
|
||||||
|
display: flex;
|
||||||
|
flex-flow: column;
|
||||||
|
align-items: center;
|
||||||
|
margin: 0;
|
||||||
|
padding: 0;
|
||||||
|
|
||||||
|
background: #111;
|
||||||
|
color: #777;
|
||||||
|
font-size: 16px;
|
||||||
|
line-height: 16px;
|
||||||
|
font-family: 'Gill Sans', 'Gill Sans MT', Calibri, 'Trebuchet MS', sans-serif;
|
||||||
|
}
|
||||||
|
section {
|
||||||
|
display: flex;
|
||||||
|
flex-flow: column;
|
||||||
|
}
|
||||||
|
video {
|
||||||
|
max-width: 100vw;
|
||||||
|
}
|
15
src/data/player.html
Normal file
15
src/data/player.html
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>Watchog Stream</title>
|
||||||
|
<meta name="viewport" content="initial-scale=1">
|
||||||
|
<link rel="stylesheet" href="../static/css" />
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<section>
|
||||||
|
<video controls autoplay src="../live/{channel}?t={cachebust}"></video>
|
||||||
|
<p>The stream should begin automatically when ready;
|
||||||
|
if the video stutters, try pausing it for a second or two to allow a small buffer.</p>
|
||||||
|
</section>
|
||||||
|
</body>
|
||||||
|
</html>
|
64
src/ebml.rs
64
src/ebml.rs
|
@ -1,6 +1,7 @@
|
||||||
use bytes::{BigEndian, ByteOrder, BufMut};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
|
use bytes::{BufMut};
|
||||||
use custom_error::custom_error;
|
use custom_error::custom_error;
|
||||||
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
|
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
|
||||||
|
|
||||||
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
|
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
|
||||||
pub const DOC_TYPE_ID: u64 = 0x0282;
|
pub const DOC_TYPE_ID: u64 = 0x0282;
|
||||||
|
@ -135,10 +136,10 @@ pub fn encode_varint<T: Write>(varint: Varint, output: &mut T) -> IoResult<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 8]);
|
let mut buffer = [0; 8];
|
||||||
buffer.put_uint_be(number, size);
|
buffer.as_mut().put_uint(number, size);
|
||||||
|
|
||||||
return output.write_all(&buffer.get_ref()[..size]);
|
return output.write_all(&buffer[..size]);
|
||||||
}
|
}
|
||||||
|
|
||||||
const FOUR_FLAG: u64 = 0x10 << (8*3);
|
const FOUR_FLAG: u64 = 0x10 << (8*3);
|
||||||
|
@ -154,10 +155,10 @@ pub fn encode_varint_4<T: Write>(varint: Varint, output: &mut T) -> IoResult<()>
|
||||||
Varint::Value(value) => FOUR_FLAG | value
|
Varint::Value(value) => FOUR_FLAG | value
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 4]);
|
let mut buffer = [0; 4];
|
||||||
buffer.put_u32_be(number as u32);
|
buffer.as_mut().put_u32(number as u32);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])
|
output.write_all(&buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
|
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
|
||||||
|
@ -190,10 +191,10 @@ pub fn encode_bytes<T: Write>(tag: u64, bytes: &[u8], output: &mut T) -> IoResul
|
||||||
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
|
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
|
||||||
encode_tag_header(tag, Varint::Value(8), output)?;
|
encode_tag_header(tag, Varint::Value(8), output)?;
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 8]);
|
let mut buffer = [0; 8];
|
||||||
buffer.put_u64_be(value);
|
buffer.as_mut().put_u64(value);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])
|
output.write_all(&buffer[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EbmlLayout {
|
pub struct EbmlLayout {
|
||||||
|
@ -262,11 +263,10 @@ pub trait FromEbml<'a>: Sized {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bytes::{BytesMut};
|
use bytes::BytesMut;
|
||||||
use crate::ebml::*;
|
use crate::ebml::*;
|
||||||
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
|
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
|
||||||
use crate::ebml::Varint::{Unknown, Value};
|
use crate::ebml::Varint::{Unknown, Value};
|
||||||
use std::io::Cursor;
|
|
||||||
use crate::tests::TEST_FILE;
|
use crate::tests::TEST_FILE;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -298,44 +298,48 @@ mod tests {
|
||||||
fn encode_varints() {
|
fn encode_varints() {
|
||||||
let mut buffer = BytesMut::with_capacity(10).writer();
|
let mut buffer = BytesMut::with_capacity(10).writer();
|
||||||
|
|
||||||
let mut no_space = Cursor::new([0; 0]).writer();
|
let mut no_space = [0; 0];
|
||||||
assert_eq!(no_space.get_ref().remaining_mut(), 0);
|
let mut no_space_writer = no_space.as_mut().writer();
|
||||||
|
assert_eq!(no_space_writer.get_mut().remaining_mut(), 0);
|
||||||
|
|
||||||
let mut six_buffer = Cursor::new([0; 6]).writer();
|
let mut six_buffer = [0; 6];
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
|
let mut six_buffer_writer = six_buffer.as_mut().writer();
|
||||||
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
|
||||||
|
|
||||||
// 1 byte
|
// 1 byte
|
||||||
encode_varint(Varint::Unknown, &mut buffer).unwrap();
|
encode_varint(Varint::Unknown, &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Unknown, &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Unknown, &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(0), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(0), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(1), &mut buffer).unwrap();
|
encode_varint(Varint::Value(1), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(1), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(1), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(126), &mut buffer).unwrap();
|
encode_varint(Varint::Value(126), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(126), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(126), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 2 bytes
|
// 2 bytes
|
||||||
encode_varint(Varint::Value(127), &mut buffer).unwrap();
|
encode_varint(Varint::Value(127), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(127), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(127), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(128), &mut buffer).unwrap();
|
encode_varint(Varint::Value(128), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(128), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(128), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 6 bytes
|
// 6 bytes
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
|
||||||
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer).unwrap();
|
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer_writer).unwrap();
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 0);
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 0);
|
||||||
assert_eq!(&six_buffer.get_ref().get_ref(), &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
assert_eq!(&six_buffer, &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
||||||
six_buffer = Cursor::new([0; 6]).writer();
|
|
||||||
|
let mut six_buffer = [0; 6];
|
||||||
|
let mut six_buffer_writer = six_buffer.as_mut().writer();
|
||||||
|
|
||||||
// 7 bytes
|
// 7 bytes
|
||||||
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
|
||||||
|
@ -347,8 +351,8 @@ mod tests {
|
||||||
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
||||||
|
|
||||||
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 8 bytes
|
// 8 bytes
|
||||||
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();
|
||||||
|
|
|
@ -6,7 +6,6 @@ custom_error!{pub WebmetroError
|
||||||
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
|
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
|
||||||
HttpError{source: http::Error} = "HTTP error: {source}",
|
HttpError{source: http::Error} = "HTTP error: {source}",
|
||||||
HyperError{source: hyper::Error} = "Hyper error: {source}",
|
HyperError{source: hyper::Error} = "Hyper error: {source}",
|
||||||
Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
|
|
||||||
IoError{source: std::io::Error} = "IO error: {source}",
|
IoError{source: std::io::Error} = "IO error: {source}",
|
||||||
WarpError{source: warp::Error} = "Warp error: {source}",
|
WarpError{source: warp::Error} = "Warp error: {source}",
|
||||||
ApplicationError{message: String} = "{message}"
|
ApplicationError{message: String} = "{message}"
|
||||||
|
|
105
src/fixers.rs
105
src/fixers.rs
|
@ -1,23 +1,16 @@
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{
|
use std::task::{Context, Poll};
|
||||||
Context,
|
|
||||||
Poll
|
|
||||||
};
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use tokio2::timer::{
|
use pin_project::pin_project;
|
||||||
delay,
|
use tokio::time::{sleep_until, Duration, Instant, Sleep};
|
||||||
Delay
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::chunk::Chunk;
|
use crate::chunk::Chunk;
|
||||||
use crate::error::WebmetroError;
|
|
||||||
|
|
||||||
pub struct ChunkTimecodeFixer {
|
pub struct ChunkTimecodeFixer {
|
||||||
current_offset: u64,
|
current_offset: u64,
|
||||||
last_observed_timecode: u64,
|
last_observed_timecode: u64,
|
||||||
assumed_duration: u64
|
assumed_duration: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkTimecodeFixer {
|
impl ChunkTimecodeFixer {
|
||||||
|
@ -25,12 +18,12 @@ impl ChunkTimecodeFixer {
|
||||||
ChunkTimecodeFixer {
|
ChunkTimecodeFixer {
|
||||||
current_offset: 0,
|
current_offset: 0,
|
||||||
last_observed_timecode: 0,
|
last_observed_timecode: 0,
|
||||||
assumed_duration: 33
|
assumed_duration: 33,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk {
|
pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
|
||||||
match chunk {
|
match chunk {
|
||||||
Chunk::ClusterHead(ref mut cluster_head) => {
|
Chunk::Cluster(ref mut cluster_head, _) => {
|
||||||
let start = cluster_head.start;
|
let start = cluster_head.start;
|
||||||
if start < self.last_observed_timecode {
|
if start < self.last_observed_timecode {
|
||||||
let next_timecode = self.last_observed_timecode + self.assumed_duration;
|
let next_timecode = self.last_observed_timecode + self.assumed_duration;
|
||||||
|
@ -49,35 +42,30 @@ impl ChunkTimecodeFixer {
|
||||||
pub struct StartingPointFinder<S> {
|
pub struct StartingPointFinder<S> {
|
||||||
stream: S,
|
stream: S,
|
||||||
seen_header: bool,
|
seen_header: bool,
|
||||||
seen_keyframe: bool
|
seen_keyframe: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
|
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> {
|
||||||
{
|
|
||||||
type Item = Result<Chunk, S::Error>;
|
type Item = Result<Chunk, S::Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
|
fn poll_next(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<Option<Result<Chunk, S::Error>>> {
|
||||||
loop {
|
loop {
|
||||||
return match self.stream.try_poll_next_unpin(cx) {
|
return match self.stream.try_poll_next_unpin(cx) {
|
||||||
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => {
|
Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => {
|
||||||
if cluster_head.keyframe {
|
if cluster_head.keyframe {
|
||||||
self.seen_keyframe = true;
|
self.seen_keyframe = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.seen_keyframe {
|
if self.seen_keyframe {
|
||||||
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head))))
|
Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body))))
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => {
|
chunk @ Poll::Ready(Some(Ok(Chunk::Headers { .. }))) => {
|
||||||
if self.seen_keyframe {
|
|
||||||
chunk
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
|
|
||||||
if self.seen_header {
|
if self.seen_header {
|
||||||
// new stream starting, we don't need a new header but should wait for a safe spot to resume
|
// new stream starting, we don't need a new header but should wait for a safe spot to resume
|
||||||
self.seen_keyframe = false;
|
self.seen_keyframe = false;
|
||||||
|
@ -86,17 +74,20 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
|
||||||
self.seen_header = true;
|
self.seen_header = true;
|
||||||
chunk
|
chunk
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
chunk => chunk
|
chunk => chunk,
|
||||||
}
|
};
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project]
|
||||||
pub struct Throttle<S> {
|
pub struct Throttle<S> {
|
||||||
|
#[pin]
|
||||||
stream: S,
|
stream: S,
|
||||||
start_time: Instant,
|
start_time: Option<Instant>,
|
||||||
sleep: Delay
|
#[pin]
|
||||||
|
sleep: Sleep,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Throttle<S> {
|
impl<S> Throttle<S> {
|
||||||
|
@ -104,34 +95,46 @@ impl<S> Throttle<S> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
Throttle {
|
Throttle {
|
||||||
stream: wrap,
|
stream: wrap,
|
||||||
start_time: now,
|
start_time: None,
|
||||||
sleep: delay(now)
|
sleep: sleep_until(now),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S>
|
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S> {
|
||||||
{
|
type Item = Result<Chunk, S::Error>;
|
||||||
type Item = Result<Chunk, WebmetroError>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
fn poll_next(
|
||||||
match self.sleep.poll_unpin(cx) {
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Poll<Option<Result<Chunk, S::Error>>> {
|
||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
match this.sleep.as_mut().poll(cx) {
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
Poll::Ready(()) => { /* can continue */ },
|
Poll::Ready(()) => { /* can continue */ }
|
||||||
}
|
}
|
||||||
|
|
||||||
let next_chunk = self.stream.try_poll_next_unpin(cx);
|
let next_chunk = this.stream.try_poll_next_unpin(cx);
|
||||||
if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
|
if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk {
|
||||||
// snooze until real time has "caught up" to the stream
|
|
||||||
let offset = Duration::from_millis(cluster_head.end);
|
let offset = Duration::from_millis(cluster_head.end);
|
||||||
let sleep_until = self.start_time + offset;
|
// we have actual data, so start the clock if we haven't yet;
|
||||||
self.sleep.reset(sleep_until);
|
// if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero
|
||||||
|
let start_time = this
|
||||||
|
.start_time
|
||||||
|
.get_or_insert_with(|| Instant::now() - offset);
|
||||||
|
// snooze until real time has "caught up" to the stream
|
||||||
|
let sleep_until = *start_time + offset;
|
||||||
|
this.sleep.reset(sleep_until);
|
||||||
}
|
}
|
||||||
next_chunk
|
next_chunk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
|
pub trait ChunkStream
|
||||||
|
where
|
||||||
|
Self: Sized + TryStream<Ok = Chunk>,
|
||||||
|
{
|
||||||
/*fn fix_timecodes(self) -> Map<_> {
|
/*fn fix_timecodes(self) -> Map<_> {
|
||||||
let fixer = ;
|
let fixer = ;
|
||||||
self.map(move |chunk| {
|
self.map(move |chunk| {
|
||||||
|
@ -144,7 +147,7 @@ pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
|
||||||
StartingPointFinder {
|
StartingPointFinder {
|
||||||
stream: self,
|
stream: self,
|
||||||
seen_header: false,
|
seen_header: false,
|
||||||
seen_keyframe: false
|
seen_keyframe: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
#[macro_use] extern crate log;
|
||||||
|
|
||||||
pub mod ebml;
|
pub mod ebml;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -1,41 +1,37 @@
|
||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
|
||||||
mod commands;
|
mod commands;
|
||||||
|
|
||||||
use clap::{App, AppSettings, crate_version};
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
use crate::commands::{
|
/// Utilities for broadcasting & relaying live WebM video/audio streams
|
||||||
relay,
|
#[derive(Parser, Debug)]
|
||||||
filter,
|
#[clap(version)]
|
||||||
send,
|
struct Args {
|
||||||
dump
|
#[clap(subcommand)]
|
||||||
};
|
command: Command,
|
||||||
|
}
|
||||||
|
|
||||||
fn options() -> App<'static, 'static> {
|
#[derive(Subcommand, Debug)]
|
||||||
App::new("webmetro")
|
enum Command {
|
||||||
.version(crate_version!())
|
Dump(commands::dump::DumpArgs),
|
||||||
.about("Utilities for broadcasting & relaying live WebM video/audio streams")
|
Filter(commands::filter::FilterArgs),
|
||||||
.setting(AppSettings::DisableHelpSubcommand)
|
Relay(commands::relay::RelayArgs),
|
||||||
.setting(AppSettings::VersionlessSubcommands)
|
Send(commands::send::SendArgs),
|
||||||
.subcommand(relay::options())
|
|
||||||
.subcommand(filter::options())
|
|
||||||
.subcommand(send::options())
|
|
||||||
.subcommand(dump::options())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let args = options().get_matches();
|
env_logger::init();
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
match args.subcommand() {
|
match args.command {
|
||||||
("filter", Some(sub_args)) => filter::run(sub_args),
|
Command::Dump(args) => commands::dump::run(args),
|
||||||
("relay", Some(sub_args)) => relay::run(sub_args),
|
Command::Filter(args) => commands::filter::run(args),
|
||||||
("send", Some(sub_args)) => send::run(sub_args),
|
Command::Relay(args) => commands::relay::run(args),
|
||||||
("dump", Some(sub_args)) => dump::run(sub_args),
|
Command::Send(args) => commands::send::run(args),
|
||||||
_ => {
|
}
|
||||||
options().print_help().unwrap();
|
.unwrap_or_else(|err| {
|
||||||
println!("");
|
error!("{}", err);
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}.unwrap_or_else(|err| {
|
|
||||||
eprintln!("Error: {}", err);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use futures3::stream::{Stream, StreamExt, TryStream};
|
use futures::{
|
||||||
|
stream::{Stream, StreamExt},
|
||||||
|
TryStreamExt,
|
||||||
|
};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use crate::ebml::FromEbml;
|
use crate::ebml::FromEbml;
|
||||||
|
@ -22,11 +25,7 @@ impl<S> EbmlStreamingParser<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait StreamEbml: Sized + TryStream + Unpin
|
pub trait StreamEbml: Sized {
|
||||||
where
|
|
||||||
Self: Sized + TryStream + Unpin,
|
|
||||||
Self::Ok: Buf,
|
|
||||||
{
|
|
||||||
fn parse_ebml(self) -> EbmlStreamingParser<Self> {
|
fn parse_ebml(self) -> EbmlStreamingParser<Self> {
|
||||||
EbmlStreamingParser {
|
EbmlStreamingParser {
|
||||||
stream: self,
|
stream: self,
|
||||||
|
@ -37,29 +36,28 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> StreamEbml for S where WebmetroError: From<E>
|
||||||
|
{}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> EbmlStreamingParser<S>
|
||||||
|
where
|
||||||
|
WebmetroError: From<E>,
|
||||||
|
{
|
||||||
pub fn poll_event<'a, T: FromEbml<'a>>(
|
pub fn poll_event<'a, T: FromEbml<'a>>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
) -> Poll<Option<Result<T, WebmetroError>>> {
|
) -> Poll<Option<Result<T, WebmetroError>>> {
|
||||||
loop {
|
loop {
|
||||||
match T::check_space(&self.buffer)? {
|
if let Some(info) = T::check_space(&self.buffer)? {
|
||||||
None => {
|
self.borrowed = self.buffer.split_to(info.element_len).freeze();
|
||||||
// need to refill buffer, below
|
self.borrowed.advance(info.body_offset);
|
||||||
}
|
return Poll::Ready(Some(
|
||||||
Some(info) => {
|
T::decode(info.element_id, &self.borrowed).map_err(Into::into),
|
||||||
let mut bytes = self.buffer.split_to(info.element_len).freeze();
|
));
|
||||||
bytes.advance(info.body_offset);
|
|
||||||
self.borrowed = bytes;
|
|
||||||
return Poll::Ready(Some(T::decode(
|
|
||||||
info.element_id,
|
|
||||||
&self.borrowed,
|
|
||||||
).map_err(Into::into)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// need to refill buffer
|
||||||
|
|
||||||
if let Some(limit) = self.buffer_size_limit {
|
if let Some(limit) = self.buffer_size_limit {
|
||||||
if limit <= self.buffer.len() {
|
if limit <= self.buffer.len() {
|
||||||
return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded)));
|
return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded)));
|
||||||
|
@ -77,15 +75,12 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
|
|
||||||
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
|
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
|
||||||
loop {
|
loop {
|
||||||
if let Some(info) = T::check_space(&self.buffer)? {
|
if let Some(info) = T::check_space(&self.buffer)? {
|
||||||
let mut bytes = self.buffer.split_to(info.element_len).freeze();
|
self.borrowed = self.buffer.split_to(info.element_len).freeze();
|
||||||
bytes.advance(info.body_offset);
|
self.borrowed.advance(info.body_offset);
|
||||||
self.borrowed = bytes;
|
|
||||||
return Ok(Some(T::decode(info.element_id, &self.borrowed)?));
|
return Ok(Some(T::decode(info.element_id, &self.borrowed)?));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +91,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.stream.next().await.transpose()? {
|
match self.stream.try_next().await? {
|
||||||
Some(refill) => {
|
Some(refill) => {
|
||||||
self.buffer.reserve(refill.remaining());
|
self.buffer.reserve(refill.remaining());
|
||||||
self.buffer.put(refill);
|
self.buffer.put(refill);
|
||||||
|
@ -112,8 +107,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bytes::IntoBuf;
|
use futures::{future::poll_fn, stream::StreamExt, FutureExt};
|
||||||
use futures3::{future::poll_fn, stream::StreamExt, FutureExt};
|
|
||||||
use matches::assert_matches;
|
use matches::assert_matches;
|
||||||
use std::task::Poll::*;
|
use std::task::Poll::*;
|
||||||
|
|
||||||
|
@ -130,8 +124,8 @@ mod tests {
|
||||||
&ENCODE_WEBM_TEST_FILE[40..],
|
&ENCODE_WEBM_TEST_FILE[40..],
|
||||||
];
|
];
|
||||||
|
|
||||||
let mut stream_parser = futures3::stream::iter(pieces.iter())
|
let mut stream_parser = futures::stream::iter(pieces.iter())
|
||||||
.map(|bytes| Ok(bytes.into_buf()))
|
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
|
||||||
.parse_ebml();
|
.parse_ebml();
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
|
@ -182,8 +176,8 @@ mod tests {
|
||||||
];
|
];
|
||||||
|
|
||||||
async {
|
async {
|
||||||
let mut parser = futures3::stream::iter(pieces.iter())
|
let mut parser = futures::stream::iter(pieces.iter())
|
||||||
.map(|bytes| Ok(bytes.into_buf()))
|
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
|
||||||
.parse_ebml();
|
.parse_ebml();
|
||||||
|
|
||||||
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
|
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
|
||||||
|
|
15
src/webm.rs
15
src/webm.rs
|
@ -1,5 +1,6 @@
|
||||||
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
|
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
|
||||||
use bytes::{BigEndian, BufMut, ByteOrder};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
|
use bytes::BufMut;
|
||||||
use crate::ebml::*;
|
use crate::ebml::*;
|
||||||
use crate::iterator::ebml_iter;
|
use crate::iterator::ebml_iter;
|
||||||
use crate::iterator::EbmlIterator;
|
use crate::iterator::EbmlIterator;
|
||||||
|
@ -103,11 +104,12 @@ pub fn encode_simple_block<T: Write>(block: SimpleBlock, output: &mut T) -> IoRe
|
||||||
|
|
||||||
encode_varint(Varint::Value(track), output)?;
|
encode_varint(Varint::Value(track), output)?;
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 3]);
|
let mut buffer = [0; 3];
|
||||||
buffer.put_i16_be(timecode);
|
let mut cursor = buffer.as_mut();
|
||||||
buffer.put_u8(flags);
|
cursor.put_i16(timecode);
|
||||||
|
cursor.put_u8(flags);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])?;
|
output.write_all(&buffer)?;
|
||||||
output.write_all(data)
|
output.write_all(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +133,7 @@ pub fn encode_webm_element<T: Write + Seek>(element: WebmElement, output: &mut T
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::io::Cursor;
|
||||||
use crate::tests::{
|
use crate::tests::{
|
||||||
TEST_FILE,
|
TEST_FILE,
|
||||||
ENCODE_WEBM_TEST_FILE
|
ENCODE_WEBM_TEST_FILE
|
||||||
|
|
Loading…
Reference in a new issue