Compare commits

...

44 commits

Author SHA1 Message Date
1ce026eb2e Include a simple built-in HTML player 2023-04-28 18:06:28 -04:00
3bb6641a53 Add escaping dependencies 2023-04-28 18:04:26 -04:00
fba1829970 Remove unused header 2023-04-28 16:15:28 -04:00
fc3724683a Create Buster build for older libc compatibility 2023-04-08 21:55:02 -04:00
4db64df278 Use another if let in stream parser 2023-02-03 23:10:50 -05:00
884aa37888 convert Chunk to external iteration 2022-06-10 17:15:03 -04:00
2b88d09d0f Teach filter subcommand to recognize --skip and --take options 2022-05-22 20:50:47 -04:00
bfe7d4b1d7 Update argument parsing to clap v3 2022-05-22 20:37:03 -04:00
15520f26bd Update Tokio/Hyper/Warp-related dependencies 2022-05-22 00:56:53 -04:00
fc4e0577c8 Trivial dependency updates 2022-05-22 00:06:13 -04:00
8cecb2166f remove dependency on odds now that Vec::retain_mut is stable 2022-05-21 22:40:07 -04:00
3bc46210e4 Teach send subcommand to recognize --skip and --take options 2020-09-11 22:57:23 -04:00
18fb8390a0 ensure Throttle never waits before the first chunk of data, even if it's timestamped after zero 2020-09-11 22:56:24 -04:00
43acf93fa9 Fuse ClusterHead & ClusterBody chunks into one chunk type
(there is still a technical body chunk variant for iteration purposes, but it won't be produced by the parser)
2020-09-11 20:34:40 -04:00
9cc7d8064d Represent Chunks as iterables of Bytes 2020-09-11 19:34:27 -04:00
2fb8408ebc Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start) 2020-09-10 18:11:48 -04:00
c8124ed388 Add INFO logging for channel creation/garbage-collection 2020-09-10 01:22:36 -04:00
4dc8ec1bbd when disconnecting, clean up the header chunk so subsequent clients don't get a potentially incorrect initialization segment 2020-08-10 21:40:53 -04:00
d585ad7b31 use ergonomic try_next() combinator instead of transpose() 2020-05-09 00:17:22 -04:00
5a6d1e764d simplify channel streams 2020-05-09 00:06:29 -04:00
4b923ebed5 Update CHANGELOG.md 2020-05-08 22:34:35 -04:00
17bf7f0eef Merge branch 'warp02' 2020-05-08 22:21:00 -04:00
bb013761b1 remove old tokio helper crates that aren't used anymore, simplify Cargo.toml versions 2020-05-08 22:20:35 -04:00
1a6764b778 bump odds version 2020-05-08 22:07:53 -04:00
c422a1c3f3 Simplify/sanify some types 2020-05-08 22:02:49 -04:00
00ec517e78 Big-bang update tokio, warp, futures, & hyper to new versions. 2020-05-07 23:14:43 -04:00
1273b4adff WIP: update lib to bytes 0.5 2020-05-07 00:34:23 -04:00
f4f752548e Merge branch 'chunk_refactor' 2020-05-06 21:50:45 -04:00
9274fabeea impl Iterator for Chunk 2020-05-06 21:50:03 -04:00
dbcbf2831e Remove unused (and soon-to-be meaningless) AsRef impl for ClusterHead 2019-11-28 11:09:02 -05:00
2400720d03 Use a Bytes[Mut] for all Chunk data 2019-11-28 00:17:52 -05:00
5e2e1bcf83 replace a number of intermediate states with a "pending" Option 2019-11-26 21:26:08 -05:00
3da59e2d96 start next development cycle 2019-11-20 00:33:13 -05:00
Tangent Wantwight
3ed514c99c Release v0.2.2
Add a project changelog
2019-11-20 00:24:57 -05:00
Tangent Wantwight
0c48d59e3a Merge branch 'multibind' 2019-11-20 00:24:01 -05:00
Tangent Wantwight
3f7adb3bd6 Bind to multiple addresses if given a DNS name 2019-11-20 00:15:00 -05:00
Tangent Wantwight
ca1ade1341 Revert "Removed old Tokio version from Cargo.toml"
This reverts commit 7e85a8750b.
2019-11-20 00:13:51 -05:00
Tangent Wantwight
c6a1e3daf3 Merge branch 'log' 2019-11-19 20:06:47 -05:00
Tangent Wantwight
8fa16152f8 Add & init env_logger 2019-11-19 20:06:28 -05:00
Tangent Wantwight
ba1d921a7e Use log crate & macros instead of prints. 2019-11-19 20:06:28 -05:00
Tangent Wantwight
2f129d6986 permit some dependencies to increase version now 2019-11-19 20:06:28 -05:00
Tangent Wantwight
22c6b0cfc8 document 0.2.2 WIP 2019-11-19 19:45:59 -05:00
Tangent Wantwight
188bf23803 bump patch version for dependency/architecture changes 2019-11-17 22:45:34 -05:00
Tangent Wantwight
77401e9f51 Merge branch 'nightly' - async stable now 2019-11-17 22:44:13 -05:00
21 changed files with 1334 additions and 1914 deletions

23
CHANGELOG.md Normal file
View file

@ -0,0 +1,23 @@
## v0.3.1-dev
- Teach filter subcommand to recognize --skip and --take options
- MSRV now rustc 1.61
- forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs.
- Add INFO logging for channel creation/garbage-collection
- Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start)
- Teach send subcommand to recognize --skip and --take options
## v0.3.0
- update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1
## v0.2.2
- use the `log` and `env_logger` crates for logging; the `RUST_LOG` environment variable configures the logging level.
- see [the env_logger documentation](https://docs.rs/env_logger/*/env_logger/) for more information
- support listening on multiple addresses if given a DNS name instead of an IP address. All bindings reference the same namespace for channels, but this allows, e.g., binding to both IPv4 and IPv6 `localhost`.
- released November 20, 2019
## v0.2.1
- update most internals to use `std::future`
## v0.2.0
- support proxying an arbitrary number of streams at `/live/$NAME`
- released October 27, 2018

2100
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,20 +1,24 @@
[package] [package]
name = "webmetro" name = "webmetro"
version = "0.2.0" version = "0.3.1-dev"
authors = ["Tangent 128 <Tangent128@gmail.com>"] authors = ["Tangent 128 <Tangent128@gmail.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
bytes = "0.4.12" byteorder = "1"
clap = "2.33.0" bytes = "1"
custom_error = "1.7" clap = { version="^3.1.18", features=["cargo", "derive"] }
futures = "0.1.29" custom_error = "^1.7"
futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] } env_logger = "^0.9"
http = "0.1.18" futures = "^0.3"
hyper = "0.12.35" http = "^0.2"
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] } html-escape = "0.2.13"
matches = "0.1.8" hyper = "^0.14"
odds = { version = "0.3.1", features = ["std-vec"] } log = "^0.4.8"
tokio2 = { package = "tokio", version="0.2.0-alpha.6" } matches = "^0.1"
warp = "0.1.20" pin-project = "1"
weak-table = "0.2.3" tokio = { version="^1.18", features = ["io-std", "macros", "net", "rt", "rt-multi-thread", "time"] }
tokio-util = { version="^0.7", features=["codec"] }
urlencoding = "2.1.2"
warp = "^0.3"
weak-table = "^0.3"

8
build-buster.sh Executable file
View file

@ -0,0 +1,8 @@
#!/bin/sh
# this will probably work with docker just as well as podman
podman run --rm \
-v ..:/usr/src/build/ \
-v ./buster-target:/usr/src/build/webmetro/target \
-w /usr/src/build/webmetro \
rust:1-buster cargo build --release

1
buster-target/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
*

View file

@ -1,24 +1,11 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{ use std::sync::{Arc, Mutex};
Context, use std::task::{Context, Poll};
Poll
};
use std::sync::{
Arc,
Mutex
};
use futures3::{ use futures::{
channel::mpsc::{ channel::mpsc::{channel as mpsc_channel, Receiver, Sender},
channel as mpsc_channel,
Sender,
Receiver
},
Sink,
Stream, Stream,
Never
}; };
use odds::vec::VecExt;
use crate::chunk::Chunk; use crate::chunk::Chunk;
@ -30,77 +17,66 @@ use crate::chunk::Chunk;
pub struct Channel { pub struct Channel {
pub name: String, pub name: String,
header_chunk: Option<Chunk>, header_chunk: Option<Chunk>,
listeners: Vec<Sender<Chunk>> listeners: Vec<Sender<Chunk>>,
} }
pub type Handle = Arc<Mutex<Channel>>; pub type Handle = Arc<Mutex<Channel>>;
impl Channel { impl Channel {
pub fn new(name: String) -> Handle { pub fn new(name: String) -> Handle {
info!("Opening Channel {}", name);
Arc::new(Mutex::new(Channel { Arc::new(Mutex::new(Channel {
name, name,
header_chunk: None, header_chunk: None,
listeners: Vec::new() listeners: Vec::new(),
})) }))
} }
} }
impl Drop for Channel {
fn drop(&mut self) {
info!("Closing Channel {}", self.name);
}
}
pub struct Transmitter { pub struct Transmitter {
channel: Handle channel: Handle,
} }
impl Transmitter { impl Transmitter {
pub fn new(channel_arc: Handle) -> Self { pub fn new(channel_arc: Handle) -> Self {
Transmitter { Transmitter {
channel: channel_arc channel: channel_arc,
} }
} }
}
impl Sink<Chunk> for Transmitter { pub fn send(&self, chunk: Chunk) {
type Error = Never; // never errors, slow clients are simply dropped
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Never>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, chunk: Chunk) -> Result<(), Never> {
let mut channel = self.channel.lock().expect("Locking channel"); let mut channel = self.channel.lock().expect("Locking channel");
if let Chunk::Headers { .. } = chunk { if let Chunk::Headers { .. } = chunk {
channel.header_chunk = Some(chunk.clone()); channel.header_chunk = Some(chunk.clone());
} }
channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); channel
.listeners
Ok(()) .retain_mut(|listener| listener.start_send(chunk.clone()).is_ok());
} }
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> { impl Drop for Transmitter {
let mut channel = self.channel.lock().expect("Locking channel"); fn drop(&mut self) {
let mut result = Poll::Ready(Ok(())); if let Ok(mut channel) = self.channel.lock() {
// when disconnecting, clean up the header chunk so subsequent
// just disconnect any erroring listeners // clients don't get a potentially incorrect initialization segment
channel.listeners.retain_mut(|listener| match Pin::new(listener).poll_flush(cx) { channel.header_chunk = None;
Poll::Pending => {result = Poll::Pending; true}, }
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(_)) => false,
});
result
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Never>> {
// don't actually disconnect listeners, since other sources may want to transmit to this channel;
// just ensure we've sent everything we can out
self.poll_flush(cx)
} }
} }
pub struct Listener { pub struct Listener {
/// not used in operation, but its refcount keeps the channel alive when there's no Transmitter /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter
_channel: Handle, _channel: Handle,
receiver: Receiver<Chunk> receiver: Receiver<Chunk>,
} }
impl Listener { impl Listener {
@ -111,7 +87,9 @@ impl Listener {
let mut channel = channel_arc.lock().expect("Locking channel"); let mut channel = channel_arc.lock().expect("Locking channel");
if let Some(ref chunk) = channel.header_chunk { if let Some(ref chunk) = channel.header_chunk {
sender.start_send(chunk.clone()).expect("Queuing existing header chunk"); sender
.start_send(chunk.clone())
.expect("Queuing existing header chunk");
} }
channel.listeners.push(sender); channel.listeners.push(sender);
@ -119,7 +97,7 @@ impl Listener {
Listener { Listener {
_channel: channel_arc, _channel: channel_arc,
receiver: receiver receiver: receiver,
} }
} }
} }

View file

@ -1,24 +1,23 @@
use bytes::{Buf, Bytes}; use crate::error::WebmetroError;
use futures3::prelude::*; use crate::stream_parser::EbmlStreamingParser;
use crate::webm::*;
use bytes::{Buf, Bytes, BytesMut};
use futures::prelude::*;
use std::{ use std::{
io::Cursor, io::Cursor,
mem, mem,
pin::Pin, pin::Pin,
task::{Context, Poll, Poll::*}, task::{Context, Poll, Poll::*},
}; };
use crate::stream_parser::EbmlStreamingParser;
use crate::error::WebmetroError;
use crate::webm::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ClusterHead { pub struct ClusterHead {
pub keyframe: bool, pub keyframe: bool,
pub start: u64, pub start: u64,
pub end: u64, pub end: u64,
/// space for a Cluster tag and a Timecode tag /// a Cluster tag and a Timecode tag together take at most 15 bytes;
/// TODO: consider using a BytesMut here for simplicity /// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems
bytes: [u8;16], bytes: BytesMut,
bytes_used: u8
} }
impl ClusterHead { impl ClusterHead {
@ -27,8 +26,7 @@ impl ClusterHead {
keyframe: false, keyframe: false,
start: 0, start: 0,
end: 0, end: 0,
bytes: [0;16], bytes: BytesMut::with_capacity(15),
bytes_used: 0
}; };
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
cluster_head cluster_head
@ -37,11 +35,14 @@ impl ClusterHead {
let delta = self.end - self.start; let delta = self.end - self.start;
self.start = timecode; self.start = timecode;
self.end = self.start + delta; self.end = self.start + delta;
let mut cursor = Cursor::new(self.bytes.as_mut()); let mut buffer = [0; 15];
let mut cursor = Cursor::new(buffer.as_mut());
// buffer is sized so these should never fail // buffer is sized so these should never fail
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
self.bytes_used = cursor.position() as u8; self.bytes.clear();
let len = cursor.position() as usize;
self.bytes.extend_from_slice(&buffer[..len]);
} }
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
let absolute_timecode = self.start + (timecode as u64); let absolute_timecode = self.start + (timecode as u64);
@ -51,41 +52,52 @@ impl ClusterHead {
} }
} }
impl AsRef<[u8]> for ClusterHead {
fn as_ref(&self) -> &[u8] {
self.bytes[..self.bytes_used as usize].as_ref()
}
}
/// A chunk of WebM data /// A chunk of WebM data
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Chunk { pub enum Chunk {
Headers { Headers { bytes: Bytes },
bytes: Bytes Cluster(ClusterHead, Bytes),
},
ClusterHead(ClusterHead),
ClusterBody {
bytes: Bytes
}
} }
impl Chunk { impl Chunk {
/// converts this chunk of data into a Bytes object, perhaps to send over the network pub fn overlaps(&self, start: u128, stop: u128) -> bool {
pub fn into_bytes(self) -> Bytes {
match self { match self {
Chunk::Headers {bytes, ..} => bytes, Chunk::Cluster(head, _) => head.start as u128 <= stop && head.end as u128 >= start,
Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()), _ => true,
Chunk::ClusterBody {bytes, ..} => bytes
} }
} }
} }
impl AsRef<[u8]> for Chunk { impl IntoIterator for Chunk {
fn as_ref(&self) -> &[u8] { type Item = Bytes;
type IntoIter = Iter;
fn into_iter(self) -> Self::IntoIter {
match self { match self {
&Chunk::Headers {ref bytes, ..} => bytes.as_ref(), Chunk::Headers { bytes } => Iter::Buffer(bytes),
&Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes),
&Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref() }
}
}
pub enum Iter {
Cluster(ClusterHead, Bytes),
Buffer(Bytes),
Empty,
}
impl Iterator for Iter {
type Item = Bytes;
fn next(&mut self) -> Option<Bytes> {
let iter = mem::replace(self, Iter::Empty);
match iter {
Iter::Cluster(ClusterHead { bytes: head, .. }, body) => {
*self = Iter::Buffer(body);
Some(head.freeze())
}
Iter::Buffer(bytes) => Some(bytes),
Iter::Empty => None,
} }
} }
} }
@ -95,19 +107,13 @@ enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// ClusterHead & body buffer // ClusterHead & body buffer
BuildingCluster(ClusterHead, Cursor<Vec<u8>>), BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
EmittingClusterBody(Vec<u8>), End,
EmittingClusterBodyBeforeNewHeader {
body: Vec<u8>,
new_header: Cursor<Vec<u8>>
},
EmittingFinalClusterBody(Vec<u8>),
End
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: EbmlStreamingParser<S>, source: EbmlStreamingParser<S>,
buffer_size_limit: Option<usize>, buffer_size_limit: Option<usize>,
state: ChunkerState state: ChunkerState,
} }
impl<S> WebmChunker<S> { impl<S> WebmChunker<S> {
@ -120,7 +126,11 @@ impl<S> WebmChunker<S> {
} }
} }
fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usize>) -> Result<(), WebmetroError> { fn encode(
element: WebmElement,
buffer: &mut Cursor<Vec<u8>>,
limit: Option<usize>,
) -> Result<(), WebmetroError> {
if let Some(limit) = limit { if let Some(limit) = limit {
if limit <= buffer.get_ref().len() { if limit <= buffer.get_ref().len() {
return Err(WebmetroError::ResourcesExceeded); return Err(WebmetroError::ResourcesExceeded);
@ -130,11 +140,16 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
encode_webm_element(element, buffer).map_err(|err| err.into()) encode_webm_element(element, buffer).map_err(|err| err.into())
} }
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S> impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S>
where
WebmetroError: From<E>,
{ {
type Item = Result<Chunk, WebmetroError>; type Item = Result<Chunk, WebmetroError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut(); let mut chunker = self.get_mut();
loop { loop {
match chunker.state { match chunker.state {
@ -145,116 +160,117 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
Ready(None) => return Ready(None), Ready(None) => return Ready(None),
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer =
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {
bytes: Bytes::from(liberated_buffer.into_inner()),
};
chunker.state = ChunkerState::BuildingCluster( chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new()),
); );
return Ready(Some(Ok(header_chunk))); return Ready(Some(Ok(header_chunk)));
}, }
WebmElement::Info => {}, WebmElement::Info => {}
WebmElement::Void => {}, WebmElement::Void => {}
WebmElement::Unknown(_) => {}, WebmElement::Unknown(_) => {}
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
} },
} }
}, }
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match chunker.source.poll_event(cx) { match chunker.source.poll_event(cx) {
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Pending => return Pending, Pending => return Pending,
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::EbmlHead | WebmElement::Segment => { WebmElement::EbmlHead | WebmElement::Segment => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { match encode(
element,
&mut new_header_cursor,
chunker.buffer_size_limit,
) {
Ok(_) => { Ok(_) => {
chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{ chunker.state =
body: liberated_buffer.into_inner(), ChunkerState::BuildingHeader(new_header_cursor);
new_header: new_header_cursor return Ready(Some(Ok(Chunk::Cluster(
}; liberated_cluster_head,
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); Bytes::from(liberated_buffer.into_inner()),
}, ))));
}
Err(err) => { Err(err) => {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
}, }
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()); return Ready(Some(Ok(Chunk::Cluster(
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); liberated_cluster_head,
}, Bytes::from(liberated_buffer.into_inner()),
))));
}
WebmElement::Timecode(timecode) => { WebmElement::Timecode(timecode) => {
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
}, }
WebmElement::SimpleBlock(ref block) => { WebmElement::SimpleBlock(ref block) => {
if (block.flags & 0b10000000) != 0 { if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) { if let Err(err) = encode(
WebmElement::SimpleBlock(*block),
buffer,
chunker.buffer_size_limit,
) {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
}, }
WebmElement::Info => {}, WebmElement::Info => {}
WebmElement::Void => {}, WebmElement::Void => {}
WebmElement::Unknown(_) => {}, WebmElement::Unknown(_) => {}
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
}, }
}, },
Ready(None) => { Ready(None) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()); chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
} }
} }
}, }
ChunkerState::EmittingClusterBody(ref mut buffer) => { ChunkerState::End => return Ready(None),
let liberated_buffer = mem::replace(buffer, Vec::new());
chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0),
Cursor::new(Vec::new())
);
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
},
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
let liberated_body = mem::replace(body, Vec::new());
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
},
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new());
chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
},
ChunkerState::End => return Ready(None)
}; };
} }
} }
@ -271,7 +287,7 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
WebmChunker { WebmChunker {
source: self, source: self,
buffer_size_limit: None, buffer_size_limit: None,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
} }
} }
} }

View file

@ -1,35 +1,27 @@
use clap::{App, AppSettings, ArgMatches, SubCommand}; use clap::Args;
use tokio2::runtime::Runtime;
use super::stdin_stream; use super::stdin_stream;
use webmetro::{ use webmetro::{
error::WebmetroError, error::WebmetroError,
stream_parser::StreamEbml, stream_parser::StreamEbml,
webm::{ webm::{SimpleBlock, WebmElement::*},
SimpleBlock,
WebmElement::*
}
}; };
pub fn options() -> App<'static, 'static> { /// Dumps WebM parsing events from parsing stdin
SubCommand::with_name("dump") #[derive(Args, Debug)]
.setting(AppSettings::Hidden) pub struct DumpArgs;
.about("Dumps WebM parsing events from parsing stdin")
}
pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
#[tokio::main]
pub async fn run(_args: DumpArgs) -> Result<(), WebmetroError> {
let mut events = stdin_stream().parse_ebml(); let mut events = stdin_stream().parse_ebml();
Runtime::new().unwrap().block_on(async { while let Some(element) = events.next().await? {
while let Some(element) = events.next().await? { match element {
match element { // suppress printing byte arrays
// suppress printing byte arrays Tracks(slice) => println!("Tracks[{}]", slice.len()),
Tracks(slice) => println!("Tracks[{}]", slice.len()), SimpleBlock(SimpleBlock { timecode, .. }) => println!("SimpleBlock@{}", timecode),
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), other => println!("{:?}", other),
other => println!("{:?}", other)
}
} }
Ok(()) }
}) Ok(())
} }

View file

@ -1,49 +1,53 @@
use std::{ use std::{io, io::prelude::*, pin::Pin, time::Duration};
io,
io::prelude::*
};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::Args;
use futures3::prelude::*; use futures::prelude::*;
use futures3::future::ready;
use tokio2::runtime::Runtime;
use super::stdin_stream; use super::{parse_time, stdin_stream};
use webmetro::{ use webmetro::{
chunk::{ chunk::{Chunk, WebmStream},
Chunk,
WebmStream
},
error::WebmetroError, error::WebmetroError,
fixers::{ fixers::{ChunkTimecodeFixer, Throttle},
ChunkTimecodeFixer, stream_parser::StreamEbml,
Throttle,
},
stream_parser::StreamEbml
}; };
pub fn options() -> App<'static, 'static> { /// Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.
SubCommand::with_name("filter") #[derive(Args, Debug)]
.about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") pub struct FilterArgs {
.arg(Arg::with_name("throttle") /// Slow down output to "real time" speed as determined by the timestamps (useful for streaming static files)
.long("throttle") #[clap(long)]
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) throttle: bool,
/// Skip approximately n seconds of content before uploading or throttling
#[clap(long, short, parse(try_from_str = parse_time))]
skip: Option<Duration>,
/// Stop uploading after approximately n seconds of content
#[clap(long, short, parse(try_from_str = parse_time))]
take: Option<Duration>,
} }
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { #[tokio::main]
let mut timecode_fixer = ChunkTimecodeFixer::new(); pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> {
let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new( let start_time = args.skip.map_or(0, |s| s.as_millis());
stdin_stream() let stop_time = args
.parse_ebml() .take
.chunk_webm() .map_or(std::u128::MAX, |t| t.as_millis() + start_time);
.map_ok(move |chunk| timecode_fixer.process(chunk))
);
if args.is_present("throttle") { let mut timecode_fixer = ChunkTimecodeFixer::new();
chunk_stream = Box::new(Throttle::new(chunk_stream)); let mut chunk_stream: Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send>> =
Box::pin(
stdin_stream()
.parse_ebml()
.chunk_webm()
.map_ok(move |chunk| timecode_fixer.process(chunk))
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
);
if args.throttle {
chunk_stream = Box::pin(Throttle::new(chunk_stream));
} }
Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { while let Some(chunk) = chunk_stream.next().await {
ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)) chunk?.into_iter().try_for_each(|buffer| io::stdout().write_all(&buffer))?;
})) }
Ok(())
} }

View file

@ -1,7 +1,8 @@
use std::io::Cursor; use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use futures3::TryStreamExt; use futures::{Stream, TryStreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use webmetro::error::WebmetroError; use webmetro::error::WebmetroError;
pub mod dump; pub mod dump;
@ -12,13 +13,15 @@ pub mod send;
/// An adapter that makes chunks of bytes from stdin available as a Stream; /// An adapter that makes chunks of bytes from stdin available as a Stream;
/// is NOT actually async, and just uses blocking read. Don't use more than /// is NOT actually async, and just uses blocking read. Don't use more than
/// one at once, who knows who gets which bytes. /// one at once, who knows who gets which bytes.
pub fn stdin_stream() -> impl futures3::TryStream< pub fn stdin_stream() -> impl Stream<Item = Result<Bytes, std::io::Error>> + Sized + Unpin {
Item = Result<Cursor<Bytes>, WebmetroError>, FramedRead::new(tokio::io::stdin(), BytesCodec::new()).map_ok(|bytes| bytes.freeze())
Ok = Cursor<Bytes>, }
Error = WebmetroError,
> + Sized pub fn parse_time(arg: &str) -> Result<Duration, WebmetroError> {
+ Unpin { match arg.parse() {
tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new()) Ok(secs) => Ok(Duration::from_secs(secs)),
.map_ok(|bytes| Cursor::new(bytes.freeze())) Err(err) => Err(WebmetroError::ApplicationError {
.map_err(WebmetroError::from) message: err.to_string(),
}),
}
} }

View file

@ -1,85 +1,56 @@
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::sync::{ use std::sync::{Arc, Mutex, Weak};
Arc, use std::time::{SystemTime, UNIX_EPOCH, Duration};
Mutex,
Weak
};
use bytes::{Bytes, Buf}; use bytes::{Buf, Bytes};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::Args;
use futures::{ use futures::{prelude::*, stream::FuturesUnordered, Stream};
Future, use html_escape::encode_double_quoted_attribute;
Stream,
Sink,
stream::empty
};
use futures3::{
compat::{
Compat,
CompatSink,
Compat01As03,
},
Never,
prelude::*,
};
use hyper::{ use hyper::{
Body, header::{CACHE_CONTROL, CONTENT_TYPE},
Response, Body, Response,
header::{
CACHE_CONTROL,
CONTENT_TYPE
}
};
use warp::{
self,
Filter,
path
};
use weak_table::{
WeakValueHashMap
}; };
use stream::iter;
use warp::reply::{html, with_header};
use warp::{self, path, Filter, Reply};
use weak_table::WeakValueHashMap;
use webmetro::{ use webmetro::{
channel::{ channel::{Channel, Handle, Listener, Transmitter},
Channel, chunk::Chunk,
Handle,
Listener,
Transmitter
},
chunk::WebmStream, chunk::WebmStream,
error::WebmetroError, error::WebmetroError,
fixers::{ fixers::{ChunkStream, ChunkTimecodeFixer},
ChunkStream, stream_parser::StreamEbml,
ChunkTimecodeFixer,
},
stream_parser::StreamEbml
}; };
const BUFFER_LIMIT: usize = 2 * 1024 * 1024; const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
Compat::new(Listener::new(channel).map(|c| Ok(c)) Listener::new(channel)
.map_ok(move |chunk| timecode_fixer.process(chunk)) .map(|c| Result::<Chunk, WebmetroError>::Ok(c))
.find_starting_point() .map_ok(move |chunk| timecode_fixer.process(chunk))
.map_ok(|webm_chunk| webm_chunk.into_bytes()) .find_starting_point()
.map_err(|err: Never| match err {})) .map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
.try_flatten()
} }
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> { fn post_stream(
let source = Compat01As03::new(stream channel: Handle,
.map_err(WebmetroError::from)) stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
.parse_ebml().with_soft_limit(BUFFER_LIMIT) ) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
.chunk_webm().with_soft_limit(BUFFER_LIMIT); let channel = Transmitter::new(channel);
let sink = CompatSink::new(Transmitter::new(channel)); stream
.map_err(WebmetroError::from)
Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) .parse_ebml()
.into_stream() .with_soft_limit(BUFFER_LIMIT)
.map(|_| empty()) .chunk_webm()
.map_err(|err| { .with_soft_limit(BUFFER_LIMIT)
println!("[Warning] {}", err); .map_ok(move |chunk| {
err channel.send(chunk);
}) Bytes::new()
.flatten() })
.inspect_err(|err| warn!("{}", err))
} }
fn media_response(body: Body) -> Response<Body> { fn media_response(body: Body) -> Response<Body> {
@ -91,48 +62,80 @@ fn media_response(body: Body) -> Response<Body> {
.unwrap() .unwrap()
} }
pub fn options() -> App<'static, 'static> { fn player_css() -> impl Reply {
SubCommand::with_name("relay") let css = include_str!("../data/player.css");
.about("Hosts an HTTP-based relay server") with_header(css, CONTENT_TYPE, "text/css")
.arg(Arg::with_name("listen")
.help("The address:port to listen to")
.required(true))
} }
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { fn player_html(channel: impl AsRef<str>) -> impl Reply {
let channel_map = Arc::new(Mutex::new(WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new())); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO).as_nanos();
let player = format!(
include_str!("../data/player.html"),
channel = encode_double_quoted_attribute(channel.as_ref()),
cachebust = timestamp
);
html(player)
}
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; /// Hosts an HTTP-based relay server
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; #[derive(Args, Debug)]
pub struct RelayArgs {
/// The address:port to listen to
listen: String,
}
#[tokio::main]
pub async fn run(args: RelayArgs) -> Result<(), WebmetroError> {
let channel_map = Arc::new(Mutex::new(
WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new(),
));
let addr_str = args.listen;
let addrs = addr_str.to_socket_addrs()?;
info!("Binding to {:?}", addrs);
if addrs.len() == 0 {
return Err("Listen address didn't resolve".into());
}
let channel = path!("live" / String).map(move |name: String| { let channel = path!("live" / String).map(move |name: String| {
let channel = channel_map.lock().unwrap() let channel = channel_map
.lock()
.unwrap()
.entry(name.clone()) .entry(name.clone())
.or_insert_with(|| Channel::new(name.clone())); .or_insert_with(|| Channel::new(name.clone()));
(channel, name) (channel, name)
}); });
let head = channel.clone().and(warp::head()) let head = channel.clone().and(warp::head()).map(|(_, name)| {
.map(|(_, name)| { info!("HEAD Request For Channel {}", name);
println!("[Info] HEAD Request For Channel {}", name); media_response(Body::empty())
media_response(Body::empty()) });
});
let get = channel.clone().and(warp::get2()) let get = channel.clone().and(warp::get()).map(|(channel, name)| {
.map(|(channel, name)| { info!("Listener Connected On Channel {}", name);
println!("[Info] Listener Connected On Channel {}", name); media_response(Body::wrap_stream(get_stream(channel)))
media_response(Body::wrap_stream(get_stream(channel))) });
});
let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify()) let post_put = channel
.and(warp::body::stream()).map(|(channel, name), stream| { .clone()
println!("[Info] Source Connected On Channel {}", name); .and(warp::post().or(warp::put()).unify())
.and(warp::body::stream())
.map(|(channel, name), stream| {
info!("Source Connected On Channel {}", name);
Response::new(Body::wrap_stream(post_stream(channel, stream))) Response::new(Body::wrap_stream(post_stream(channel, stream)))
}); });
let routes = head let live = head.or(get).or(post_put);
.or(get) let watch = path!("watch" / String).map(player_html);
.or(post_put); let css = path!("static" / "css").map(player_css);
Ok(warp::serve(routes).run(addr)) let routes = live.or(watch).or(css);
let mut server_futures: FuturesUnordered<_> = addrs
.map(|addr| warp::serve(routes.clone()).try_bind(addr))
.collect();
while let Some(_) = server_futures.next().await {}
Ok(())
} }

View file

@ -1,10 +1,15 @@
use clap::{App, Arg, ArgMatches, SubCommand}; use bytes::Bytes;
use futures3::prelude::*; use clap::Args;
use hyper13::{client::HttpConnector, Body, Client, Request}; use futures::prelude::*;
use std::io::{stdout, Write}; use hyper::{client::HttpConnector, Body, Client, Request};
use tokio2::runtime::Runtime; use std::{
io::{stdout, Write},
pin::Pin,
time::Duration,
};
use stream::iter;
use super::stdin_stream; use super::{parse_time, stdin_stream};
use webmetro::{ use webmetro::{
chunk::{Chunk, WebmStream}, chunk::{Chunk, WebmStream},
error::WebmetroError, error::WebmetroError,
@ -12,60 +17,62 @@ use webmetro::{
stream_parser::StreamEbml, stream_parser::StreamEbml,
}; };
pub fn options() -> App<'static, 'static> { type BoxedChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync>>;
SubCommand::with_name("send")
.about("PUTs WebM from stdin to a relay server.") /// PUTs WebM from stdin to a relay server.
.arg(Arg::with_name("url") #[derive(Args, Debug)]
.help("The location to upload to") pub struct SendArgs {
.required(true)) /// The location to upload to
.arg(Arg::with_name("throttle") url: String,
.long("throttle") /// Slow down upload to "real time" speed as determined by the timestamps (useful for streaming static files)
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) #[clap(long)]
throttle: bool,
/// Skip approximately n seconds of content before uploading or throttling
#[clap(long, short, parse(try_from_str = parse_time))]
skip: Option<Duration>,
/// Stop uploading after approximately n seconds of content
#[clap(long, short, parse(try_from_str = parse_time))]
take: Option<Duration>,
} }
type BoxedChunkStream = Box< #[tokio::main]
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> pub async fn run(args: SendArgs) -> Result<(), WebmetroError> {
+ Send let start_time = args.skip.map_or(0, |s| s.as_millis());
+ Sync let stop_time = args
+ Unpin, .take
>; .map_or(std::u128::MAX, |t| t.as_millis() + start_time);
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { // build pipeline
let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut timecode_fixer = ChunkTimecodeFixer::new();
let mut chunk_stream: BoxedChunkStream = Box::new( let mut chunk_stream: BoxedChunkStream = Box::pin(
stdin_stream() stdin_stream()
.parse_ebml() .parse_ebml()
.chunk_webm() .chunk_webm()
.map_ok(move |chunk| timecode_fixer.process(chunk)), .map_ok(move |chunk| timecode_fixer.process(chunk))
.try_filter(move |chunk| future::ready(chunk.overlaps(start_time, stop_time))),
); );
let url_str = match args.value_of("url") { if args.throttle {
Some(url) => String::from(url), chunk_stream = Box::pin(Throttle::new(chunk_stream));
_ => return Err("Listen address wasn't provided".into()),
};
if args.is_present("throttle") {
chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
let chunk_stream = chunk_stream let chunk_stream = chunk_stream
.map_ok(|webm_chunk| webm_chunk.into_bytes()) .map_ok(|webm_chunk| iter(webm_chunk).map(Result::<Bytes, WebmetroError>::Ok))
.try_flatten()
.map_err(|err| { .map_err(|err| {
eprintln!("{}", &err); warn!("{}", &err);
err err
}); });
let request_payload = Body::wrap_stream(chunk_stream); let request_payload = Body::wrap_stream(chunk_stream);
let request = Request::put(url_str).body(request_payload)?; let request = Request::put(args.url).body(request_payload)?;
let client = Client::builder().build(HttpConnector::new()); let client = Client::builder().build(HttpConnector::new());
Runtime::new().unwrap().block_on(async { let response = client.request(request).await?;
let response = client.request(request).await?; let mut response_stream = response.into_body();
let mut response_stream = response.into_body(); while let Some(response_chunk) = response_stream.try_next().await? {
while let Some(response_chunk) = response_stream.next().await.transpose()? { stdout().write_all(&response_chunk)?;
stdout().write_all(&response_chunk)?; }
} Ok(())
Ok(())
})
} }

20
src/data/player.css Normal file
View file

@ -0,0 +1,20 @@
body {
display: flex;
flex-flow: column;
align-items: center;
margin: 0;
padding: 0;
background: #111;
color: #777;
font-size: 16px;
line-height: 16px;
font-family: 'Gill Sans', 'Gill Sans MT', Calibri, 'Trebuchet MS', sans-serif;
}
section {
display: flex;
flex-flow: column;
}
video {
max-width: 100vw;
}

15
src/data/player.html Normal file
View file

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html>
<head>
<title>Watchog Stream</title>
<meta name="viewport" content="initial-scale=1">
<link rel="stylesheet" href="../static/css" />
</head>
<body>
<section>
<video controls autoplay src="../live/{channel}?t={cachebust}"></video>
<p>The stream should begin automatically when ready;
if the video stutters, try pausing it for a second or two to allow a small buffer.</p>
</section>
</body>
</html>

View file

@ -1,6 +1,7 @@
use bytes::{BigEndian, ByteOrder, BufMut}; use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut};
use custom_error::custom_error; use custom_error::custom_error;
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3; pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
pub const DOC_TYPE_ID: u64 = 0x0282; pub const DOC_TYPE_ID: u64 = 0x0282;
@ -135,10 +136,10 @@ pub fn encode_varint<T: Write>(varint: Varint, output: &mut T) -> IoResult<()> {
} }
}; };
let mut buffer = Cursor::new([0; 8]); let mut buffer = [0; 8];
buffer.put_uint_be(number, size); buffer.as_mut().put_uint(number, size);
return output.write_all(&buffer.get_ref()[..size]); return output.write_all(&buffer[..size]);
} }
const FOUR_FLAG: u64 = 0x10 << (8*3); const FOUR_FLAG: u64 = 0x10 << (8*3);
@ -154,10 +155,10 @@ pub fn encode_varint_4<T: Write>(varint: Varint, output: &mut T) -> IoResult<()>
Varint::Value(value) => FOUR_FLAG | value Varint::Value(value) => FOUR_FLAG | value
}; };
let mut buffer = Cursor::new([0; 4]); let mut buffer = [0; 4];
buffer.put_u32_be(number as u32); buffer.as_mut().put_u32(number as u32);
output.write_all(&buffer.get_ref()[..]) output.write_all(&buffer)
} }
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> { pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
@ -190,10 +191,10 @@ pub fn encode_bytes<T: Write>(tag: u64, bytes: &[u8], output: &mut T) -> IoResul
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> { pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
encode_tag_header(tag, Varint::Value(8), output)?; encode_tag_header(tag, Varint::Value(8), output)?;
let mut buffer = Cursor::new([0; 8]); let mut buffer = [0; 8];
buffer.put_u64_be(value); buffer.as_mut().put_u64(value);
output.write_all(&buffer.get_ref()[..]) output.write_all(&buffer[..])
} }
pub struct EbmlLayout { pub struct EbmlLayout {
@ -262,11 +263,10 @@ pub trait FromEbml<'a>: Sized {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::{BytesMut}; use bytes::BytesMut;
use crate::ebml::*; use crate::ebml::*;
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId}; use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
use crate::ebml::Varint::{Unknown, Value}; use crate::ebml::Varint::{Unknown, Value};
use std::io::Cursor;
use crate::tests::TEST_FILE; use crate::tests::TEST_FILE;
#[test] #[test]
@ -298,44 +298,48 @@ mod tests {
fn encode_varints() { fn encode_varints() {
let mut buffer = BytesMut::with_capacity(10).writer(); let mut buffer = BytesMut::with_capacity(10).writer();
let mut no_space = Cursor::new([0; 0]).writer(); let mut no_space = [0; 0];
assert_eq!(no_space.get_ref().remaining_mut(), 0); let mut no_space_writer = no_space.as_mut().writer();
assert_eq!(no_space_writer.get_mut().remaining_mut(), 0);
let mut six_buffer = Cursor::new([0; 6]).writer(); let mut six_buffer = [0; 6];
assert_eq!(six_buffer.get_ref().remaining_mut(), 6); let mut six_buffer_writer = six_buffer.as_mut().writer();
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
// 1 byte // 1 byte
encode_varint(Varint::Unknown, &mut buffer).unwrap(); encode_varint(Varint::Unknown, &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
assert_eq!(encode_varint(Varint::Unknown, &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Unknown, &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(0), &mut buffer).unwrap(); encode_varint(Varint::Value(0), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
assert_eq!(encode_varint(Varint::Value(0), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(1), &mut buffer).unwrap(); encode_varint(Varint::Value(1), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
assert_eq!(encode_varint(Varint::Value(1), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(1), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(126), &mut buffer).unwrap(); encode_varint(Varint::Value(126), &mut buffer).unwrap();
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref()); assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
assert_eq!(encode_varint(Varint::Value(126), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(126), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
// 2 bytes // 2 bytes
encode_varint(Varint::Value(127), &mut buffer).unwrap(); encode_varint(Varint::Value(127), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref()); assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
assert_eq!(encode_varint(Varint::Value(127), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(127), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
encode_varint(Varint::Value(128), &mut buffer).unwrap(); encode_varint(Varint::Value(128), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref()); assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
assert_eq!(encode_varint(Varint::Value(128), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(128), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
// 6 bytes // 6 bytes
assert_eq!(six_buffer.get_ref().remaining_mut(), 6); assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer).unwrap(); encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer_writer).unwrap();
assert_eq!(six_buffer.get_ref().remaining_mut(), 0); assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 0);
assert_eq!(&six_buffer.get_ref().get_ref(), &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); assert_eq!(&six_buffer, &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
six_buffer = Cursor::new([0; 6]).writer();
let mut six_buffer = [0; 6];
let mut six_buffer_writer = six_buffer.as_mut().writer();
// 7 bytes // 7 bytes
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap(); encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
@ -347,8 +351,8 @@ mod tests {
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap(); encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer).unwrap_err().kind(), ErrorKind::WriteZero); assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer_writer).unwrap_err().kind(), ErrorKind::WriteZero);
// 8 bytes // 8 bytes
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap(); encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();

View file

@ -6,7 +6,6 @@ custom_error!{pub WebmetroError
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}", EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
HttpError{source: http::Error} = "HTTP error: {source}", HttpError{source: http::Error} = "HTTP error: {source}",
HyperError{source: hyper::Error} = "Hyper error: {source}", HyperError{source: hyper::Error} = "Hyper error: {source}",
Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
IoError{source: std::io::Error} = "IO error: {source}", IoError{source: std::io::Error} = "IO error: {source}",
WarpError{source: warp::Error} = "Warp error: {source}", WarpError{source: warp::Error} = "Warp error: {source}",
ApplicationError{message: String} = "{message}" ApplicationError{message: String} = "{message}"

View file

@ -1,23 +1,16 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{ use std::task::{Context, Poll};
Context,
Poll
};
use std::time::{Duration, Instant};
use futures3::prelude::*; use futures::prelude::*;
use tokio2::timer::{ use pin_project::pin_project;
delay, use tokio::time::{sleep_until, Duration, Instant, Sleep};
Delay
};
use crate::chunk::Chunk; use crate::chunk::Chunk;
use crate::error::WebmetroError;
pub struct ChunkTimecodeFixer { pub struct ChunkTimecodeFixer {
current_offset: u64, current_offset: u64,
last_observed_timecode: u64, last_observed_timecode: u64,
assumed_duration: u64 assumed_duration: u64,
} }
impl ChunkTimecodeFixer { impl ChunkTimecodeFixer {
@ -25,12 +18,12 @@ impl ChunkTimecodeFixer {
ChunkTimecodeFixer { ChunkTimecodeFixer {
current_offset: 0, current_offset: 0,
last_observed_timecode: 0, last_observed_timecode: 0,
assumed_duration: 33 assumed_duration: 33,
} }
} }
pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk { pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
match chunk { match chunk {
Chunk::ClusterHead(ref mut cluster_head) => { Chunk::Cluster(ref mut cluster_head, _) => {
let start = cluster_head.start; let start = cluster_head.start;
if start < self.last_observed_timecode { if start < self.last_observed_timecode {
let next_timecode = self.last_observed_timecode + self.assumed_duration; let next_timecode = self.last_observed_timecode + self.assumed_duration;
@ -49,35 +42,30 @@ impl ChunkTimecodeFixer {
pub struct StartingPointFinder<S> { pub struct StartingPointFinder<S> {
stream: S, stream: S,
seen_header: bool, seen_header: bool,
seen_keyframe: bool seen_keyframe: bool,
} }
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S> {
{
type Item = Result<Chunk, S::Error>; type Item = Result<Chunk, S::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> { fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
loop { loop {
return match self.stream.try_poll_next_unpin(cx) { return match self.stream.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => { Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body)))) => {
if cluster_head.keyframe { if cluster_head.keyframe {
self.seen_keyframe = true; self.seen_keyframe = true;
} }
if self.seen_keyframe { if self.seen_keyframe {
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) Poll::Ready(Some(Ok(Chunk::Cluster(cluster_head, cluster_body))))
} else { } else {
continue; continue;
} }
}, }
chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => { chunk @ Poll::Ready(Some(Ok(Chunk::Headers { .. }))) => {
if self.seen_keyframe {
chunk
} else {
continue;
}
},
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
if self.seen_header { if self.seen_header {
// new stream starting, we don't need a new header but should wait for a safe spot to resume // new stream starting, we don't need a new header but should wait for a safe spot to resume
self.seen_keyframe = false; self.seen_keyframe = false;
@ -86,17 +74,20 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
self.seen_header = true; self.seen_header = true;
chunk chunk
} }
}, }
chunk => chunk chunk => chunk,
} };
}; }
} }
} }
#[pin_project]
pub struct Throttle<S> { pub struct Throttle<S> {
#[pin]
stream: S, stream: S,
start_time: Instant, start_time: Option<Instant>,
sleep: Delay #[pin]
sleep: Sleep,
} }
impl<S> Throttle<S> { impl<S> Throttle<S> {
@ -104,34 +95,46 @@ impl<S> Throttle<S> {
let now = Instant::now(); let now = Instant::now();
Throttle { Throttle {
stream: wrap, stream: wrap,
start_time: now, start_time: None,
sleep: delay(now) sleep: sleep_until(now),
} }
} }
} }
impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S> impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S> {
{ type Item = Result<Chunk, S::Error>;
type Item = Result<Chunk, WebmetroError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(
match self.sleep.poll_unpin(cx) { self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, S::Error>>> {
let mut this = self.project();
match this.sleep.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(()) => { /* can continue */ }, Poll::Ready(()) => { /* can continue */ }
} }
let next_chunk = self.stream.try_poll_next_unpin(cx); let next_chunk = this.stream.try_poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk {
// snooze until real time has "caught up" to the stream
let offset = Duration::from_millis(cluster_head.end); let offset = Duration::from_millis(cluster_head.end);
let sleep_until = self.start_time + offset; // we have actual data, so start the clock if we haven't yet;
self.sleep.reset(sleep_until); // if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero
let start_time = this
.start_time
.get_or_insert_with(|| Instant::now() - offset);
// snooze until real time has "caught up" to the stream
let sleep_until = *start_time + offset;
this.sleep.reset(sleep_until);
} }
next_chunk next_chunk
} }
} }
pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> { pub trait ChunkStream
where
Self: Sized + TryStream<Ok = Chunk>,
{
/*fn fix_timecodes(self) -> Map<_> { /*fn fix_timecodes(self) -> Map<_> {
let fixer = ; let fixer = ;
self.map(move |chunk| { self.map(move |chunk| {
@ -144,7 +147,7 @@ pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
StartingPointFinder { StartingPointFinder {
stream: self, stream: self,
seen_header: false, seen_header: false,
seen_keyframe: false seen_keyframe: false,
} }
} }

View file

@ -1,3 +1,4 @@
#[macro_use] extern crate log;
pub mod ebml; pub mod ebml;
pub mod error; pub mod error;

View file

@ -1,41 +1,37 @@
#[macro_use]
extern crate log;
mod commands; mod commands;
use clap::{App, AppSettings, crate_version}; use clap::{Parser, Subcommand};
use crate::commands::{ /// Utilities for broadcasting & relaying live WebM video/audio streams
relay, #[derive(Parser, Debug)]
filter, #[clap(version)]
send, struct Args {
dump #[clap(subcommand)]
}; command: Command,
}
fn options() -> App<'static, 'static> { #[derive(Subcommand, Debug)]
App::new("webmetro") enum Command {
.version(crate_version!()) Dump(commands::dump::DumpArgs),
.about("Utilities for broadcasting & relaying live WebM video/audio streams") Filter(commands::filter::FilterArgs),
.setting(AppSettings::DisableHelpSubcommand) Relay(commands::relay::RelayArgs),
.setting(AppSettings::VersionlessSubcommands) Send(commands::send::SendArgs),
.subcommand(relay::options())
.subcommand(filter::options())
.subcommand(send::options())
.subcommand(dump::options())
} }
fn main() { fn main() {
let args = options().get_matches(); env_logger::init();
let args = Args::parse();
match args.subcommand() { match args.command {
("filter", Some(sub_args)) => filter::run(sub_args), Command::Dump(args) => commands::dump::run(args),
("relay", Some(sub_args)) => relay::run(sub_args), Command::Filter(args) => commands::filter::run(args),
("send", Some(sub_args)) => send::run(sub_args), Command::Relay(args) => commands::relay::run(args),
("dump", Some(sub_args)) => dump::run(sub_args), Command::Send(args) => commands::send::run(args),
_ => { }
options().print_help().unwrap(); .unwrap_or_else(|err| {
println!(""); error!("{}", err);
Ok(())
}
}.unwrap_or_else(|err| {
eprintln!("Error: {}", err);
}); });
} }

View file

@ -1,5 +1,8 @@
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures3::stream::{Stream, StreamExt, TryStream}; use futures::{
stream::{Stream, StreamExt},
TryStreamExt,
};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::ebml::FromEbml; use crate::ebml::FromEbml;
@ -22,11 +25,7 @@ impl<S> EbmlStreamingParser<S> {
} }
} }
pub trait StreamEbml: Sized + TryStream + Unpin pub trait StreamEbml: Sized {
where
Self: Sized + TryStream + Unpin,
Self::Ok: Buf,
{
fn parse_ebml(self) -> EbmlStreamingParser<Self> { fn parse_ebml(self) -> EbmlStreamingParser<Self> {
EbmlStreamingParser { EbmlStreamingParser {
stream: self, stream: self,
@ -37,29 +36,28 @@ where
} }
} }
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {} impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> StreamEbml for S where WebmetroError: From<E>
{}
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> { impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> EbmlStreamingParser<S>
where
WebmetroError: From<E>,
{
pub fn poll_event<'a, T: FromEbml<'a>>( pub fn poll_event<'a, T: FromEbml<'a>>(
&'a mut self, &'a mut self,
cx: &mut Context, cx: &mut Context,
) -> Poll<Option<Result<T, WebmetroError>>> { ) -> Poll<Option<Result<T, WebmetroError>>> {
loop { loop {
match T::check_space(&self.buffer)? { if let Some(info) = T::check_space(&self.buffer)? {
None => { self.borrowed = self.buffer.split_to(info.element_len).freeze();
// need to refill buffer, below self.borrowed.advance(info.body_offset);
} return Poll::Ready(Some(
Some(info) => { T::decode(info.element_id, &self.borrowed).map_err(Into::into),
let mut bytes = self.buffer.split_to(info.element_len).freeze(); ));
bytes.advance(info.body_offset);
self.borrowed = bytes;
return Poll::Ready(Some(T::decode(
info.element_id,
&self.borrowed,
).map_err(Into::into)));
}
} }
// need to refill buffer
if let Some(limit) = self.buffer_size_limit { if let Some(limit) = self.buffer_size_limit {
if limit <= self.buffer.len() { if limit <= self.buffer.len() {
return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded))); return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded)));
@ -77,15 +75,12 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
} }
} }
} }
}
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> { pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
loop { loop {
if let Some(info) = T::check_space(&self.buffer)? { if let Some(info) = T::check_space(&self.buffer)? {
let mut bytes = self.buffer.split_to(info.element_len).freeze(); self.borrowed = self.buffer.split_to(info.element_len).freeze();
bytes.advance(info.body_offset); self.borrowed.advance(info.body_offset);
self.borrowed = bytes;
return Ok(Some(T::decode(info.element_id, &self.borrowed)?)); return Ok(Some(T::decode(info.element_id, &self.borrowed)?));
} }
@ -96,7 +91,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
} }
} }
match self.stream.next().await.transpose()? { match self.stream.try_next().await? {
Some(refill) => { Some(refill) => {
self.buffer.reserve(refill.remaining()); self.buffer.reserve(refill.remaining());
self.buffer.put(refill); self.buffer.put(refill);
@ -112,8 +107,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::IntoBuf; use futures::{future::poll_fn, stream::StreamExt, FutureExt};
use futures3::{future::poll_fn, stream::StreamExt, FutureExt};
use matches::assert_matches; use matches::assert_matches;
use std::task::Poll::*; use std::task::Poll::*;
@ -130,8 +124,8 @@ mod tests {
&ENCODE_WEBM_TEST_FILE[40..], &ENCODE_WEBM_TEST_FILE[40..],
]; ];
let mut stream_parser = futures3::stream::iter(pieces.iter()) let mut stream_parser = futures::stream::iter(pieces.iter())
.map(|bytes| Ok(bytes.into_buf())) .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
.parse_ebml(); .parse_ebml();
assert_matches!( assert_matches!(
@ -182,8 +176,8 @@ mod tests {
]; ];
async { async {
let mut parser = futures3::stream::iter(pieces.iter()) let mut parser = futures::stream::iter(pieces.iter())
.map(|bytes| Ok(bytes.into_buf())) .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
.parse_ebml(); .parse_ebml();
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead)); assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));

View file

@ -1,5 +1,6 @@
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek}; use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
use bytes::{BigEndian, BufMut, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::BufMut;
use crate::ebml::*; use crate::ebml::*;
use crate::iterator::ebml_iter; use crate::iterator::ebml_iter;
use crate::iterator::EbmlIterator; use crate::iterator::EbmlIterator;
@ -103,11 +104,12 @@ pub fn encode_simple_block<T: Write>(block: SimpleBlock, output: &mut T) -> IoRe
encode_varint(Varint::Value(track), output)?; encode_varint(Varint::Value(track), output)?;
let mut buffer = Cursor::new([0; 3]); let mut buffer = [0; 3];
buffer.put_i16_be(timecode); let mut cursor = buffer.as_mut();
buffer.put_u8(flags); cursor.put_i16(timecode);
cursor.put_u8(flags);
output.write_all(&buffer.get_ref()[..])?; output.write_all(&buffer)?;
output.write_all(data) output.write_all(data)
} }
@ -131,6 +133,7 @@ pub fn encode_webm_element<T: Write + Seek>(element: WebmElement, output: &mut T
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Cursor;
use crate::tests::{ use crate::tests::{
TEST_FILE, TEST_FILE,
ENCODE_WEBM_TEST_FILE ENCODE_WEBM_TEST_FILE