Merge branch 'warp02'
This commit is contained in:
commit
17bf7f0eef
14 changed files with 575 additions and 1211 deletions
1443
Cargo.lock
generated
1443
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
25
Cargo.toml
25
Cargo.toml
|
@ -5,21 +5,18 @@ authors = ["Tangent 128 <Tangent128@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "^0.4.12"
|
byteorder = "1"
|
||||||
clap = "^2.33.0"
|
bytes = "^0.5"
|
||||||
|
clap = "^2.33"
|
||||||
custom_error = "^1.7"
|
custom_error = "^1.7"
|
||||||
env_logger = "^0.7"
|
env_logger = "^0.7"
|
||||||
futures = "0.1.29"
|
futures = "^0.3"
|
||||||
futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
|
http = "^0.2"
|
||||||
http = "^0.1.18"
|
hyper = "^0.13"
|
||||||
hyper = "^0.12.35"
|
|
||||||
hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
|
|
||||||
log = "^0.4.8"
|
log = "^0.4.8"
|
||||||
matches = "^0.1.8"
|
matches = "^0.1"
|
||||||
odds = { version = "0.3.1", features = ["std-vec"] }
|
odds = { version = "^0.4", features = ["std-vec"] }
|
||||||
tokio = "0.1.22"
|
tokio = { version="^0.2", features = ["io-std", "tcp", "macros", "rt-threaded", "time"] }
|
||||||
tokio2 = { package = "tokio", version="0.2.0-alpha.6" }
|
tokio-util = "^0.3"
|
||||||
tokio-codec = "0.1.1"
|
warp = "^0.2"
|
||||||
tokio-io = "0.1.12"
|
|
||||||
warp = "0.1.20"
|
|
||||||
weak-table = "^0.2.3"
|
weak-table = "^0.2.3"
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::sync::{
|
||||||
Mutex
|
Mutex
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures3::{
|
use futures::{
|
||||||
channel::mpsc::{
|
channel::mpsc::{
|
||||||
channel as mpsc_channel,
|
channel as mpsc_channel,
|
||||||
Sender,
|
Sender,
|
||||||
|
@ -16,7 +16,7 @@ use futures3::{
|
||||||
},
|
},
|
||||||
Sink,
|
Sink,
|
||||||
Stream,
|
Stream,
|
||||||
Never
|
never::Never,
|
||||||
};
|
};
|
||||||
use odds::vec::VecExt;
|
use odds::vec::VecExt;
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use std::{
|
use std::{
|
||||||
io::Cursor,
|
io::Cursor,
|
||||||
mem,
|
mem,
|
||||||
|
@ -141,7 +141,9 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
|
||||||
encode_webm_element(element, buffer).map_err(|err| err.into())
|
encode_webm_element(element, buffer).map_err(|err| err.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S>
|
||||||
|
where
|
||||||
|
WebmetroError: From<E>,
|
||||||
{
|
{
|
||||||
type Item = Result<Chunk, WebmetroError>;
|
type Item = Result<Chunk, WebmetroError>;
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use clap::{App, AppSettings, ArgMatches, SubCommand};
|
use clap::{App, AppSettings, ArgMatches, SubCommand};
|
||||||
use tokio2::runtime::Runtime;
|
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::stdin_stream;
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
|
@ -17,19 +16,18 @@ pub fn options() -> App<'static, 'static> {
|
||||||
.about("Dumps WebM parsing events from parsing stdin")
|
.about("Dumps WebM parsing events from parsing stdin")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
|
#[tokio::main]
|
||||||
|
pub async fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
|
|
||||||
let mut events = stdin_stream().parse_ebml();
|
let mut events = stdin_stream().parse_ebml();
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(async {
|
while let Some(element) = events.next().await? {
|
||||||
while let Some(element) = events.next().await? {
|
match element {
|
||||||
match element {
|
// suppress printing byte arrays
|
||||||
// suppress printing byte arrays
|
Tracks(slice) => println!("Tracks[{}]", slice.len()),
|
||||||
Tracks(slice) => println!("Tracks[{}]", slice.len()),
|
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
|
||||||
SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
|
other => println!("{:?}", other)
|
||||||
other => println!("{:?}", other)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
})
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,25 +1,14 @@
|
||||||
use std::{
|
use std::{io, io::prelude::*};
|
||||||
io,
|
|
||||||
io::prelude::*
|
|
||||||
};
|
|
||||||
|
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use clap::{App, Arg, ArgMatches, SubCommand};
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures3::future::ready;
|
|
||||||
use tokio2::runtime::Runtime;
|
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::stdin_stream;
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
chunk::{
|
chunk::{Chunk, WebmStream},
|
||||||
Chunk,
|
|
||||||
WebmStream
|
|
||||||
},
|
|
||||||
error::WebmetroError,
|
error::WebmetroError,
|
||||||
fixers::{
|
fixers::{ChunkTimecodeFixer, Throttle},
|
||||||
ChunkTimecodeFixer,
|
stream_parser::StreamEbml,
|
||||||
Throttle,
|
|
||||||
},
|
|
||||||
stream_parser::StreamEbml
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn options() -> App<'static, 'static> {
|
pub fn options() -> App<'static, 'static> {
|
||||||
|
@ -30,22 +19,23 @@ pub fn options() -> App<'static, 'static> {
|
||||||
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
.help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
#[tokio::main]
|
||||||
|
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
|
let mut chunk_stream: Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Unpin> =
|
||||||
stdin_stream()
|
Box::new(
|
||||||
.parse_ebml()
|
stdin_stream()
|
||||||
.chunk_webm()
|
.parse_ebml()
|
||||||
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
.chunk_webm()
|
||||||
);
|
.map_ok(move |chunk| timecode_fixer.process(chunk)),
|
||||||
|
);
|
||||||
|
|
||||||
if args.is_present("throttle") {
|
if args.is_present("throttle") {
|
||||||
chunk_stream = Box::new(Throttle::new(chunk_stream));
|
chunk_stream = Box::new(Throttle::new(chunk_stream));
|
||||||
}
|
}
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|mut chunk| {
|
while let Some(chunk) = chunk_stream.next().await {
|
||||||
ready(chunk.try_for_each(|buffer|
|
chunk?.try_for_each(|buffer| io::stdout().write_all(&buffer))?;
|
||||||
io::stdout().write_all(&buffer).map_err(WebmetroError::from)
|
}
|
||||||
))
|
Ok(())
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
use std::io::Cursor;
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures3::TryStreamExt;
|
use futures::{Stream, TryStreamExt};
|
||||||
use webmetro::error::WebmetroError;
|
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||||
|
|
||||||
pub mod dump;
|
pub mod dump;
|
||||||
pub mod filter;
|
pub mod filter;
|
||||||
|
@ -12,13 +10,7 @@ pub mod send;
|
||||||
/// An adapter that makes chunks of bytes from stdin available as a Stream;
|
/// An adapter that makes chunks of bytes from stdin available as a Stream;
|
||||||
/// is NOT actually async, and just uses blocking read. Don't use more than
|
/// is NOT actually async, and just uses blocking read. Don't use more than
|
||||||
/// one at once, who knows who gets which bytes.
|
/// one at once, who knows who gets which bytes.
|
||||||
pub fn stdin_stream() -> impl futures3::TryStream<
|
pub fn stdin_stream() -> impl Stream<Item = Result<Bytes, std::io::Error>> + Sized + Unpin {
|
||||||
Item = Result<Cursor<Bytes>, WebmetroError>,
|
FramedRead::new(tokio::io::stdin(), BytesCodec::new())
|
||||||
Ok = Cursor<Bytes>,
|
.map_ok(|bytes| bytes.freeze())
|
||||||
Error = WebmetroError,
|
|
||||||
> + Sized
|
|
||||||
+ Unpin {
|
|
||||||
tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
|
|
||||||
.map_ok(|bytes| Cursor::new(bytes.freeze()))
|
|
||||||
.map_err(WebmetroError::from)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,19 +8,10 @@ use std::sync::{
|
||||||
use bytes::{Bytes, Buf};
|
use bytes::{Bytes, Buf};
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use clap::{App, Arg, ArgMatches, SubCommand};
|
||||||
use futures::{
|
use futures::{
|
||||||
Future,
|
never::Never,
|
||||||
Stream,
|
|
||||||
Sink,
|
|
||||||
stream::empty
|
|
||||||
};
|
|
||||||
use futures3::{
|
|
||||||
compat::{
|
|
||||||
Compat,
|
|
||||||
CompatSink,
|
|
||||||
Compat01As03,
|
|
||||||
},
|
|
||||||
Never,
|
|
||||||
prelude::*,
|
prelude::*,
|
||||||
|
Stream,
|
||||||
|
stream::FuturesUnordered,
|
||||||
};
|
};
|
||||||
use hyper::{
|
use hyper::{
|
||||||
Body,
|
Body,
|
||||||
|
@ -56,30 +47,29 @@ use webmetro::{
|
||||||
|
|
||||||
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
||||||
|
|
||||||
fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroError> {
|
fn get_stream(channel: Handle) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
Compat::new(Listener::new(channel).map(|c| Ok(c))
|
Listener::new(channel).map(|c| Ok(c))
|
||||||
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
.map_ok(move |chunk| timecode_fixer.process(chunk))
|
||||||
.find_starting_point()
|
.find_starting_point()
|
||||||
.map_ok(|webm_chunk| webm_chunk.into_bytes())
|
.map_ok(|webm_chunk| webm_chunk.into_bytes())
|
||||||
.map_err(|err: Never| match err {}))
|
.map_err(|err: Never| match err {})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
|
fn post_stream(channel: Handle, stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin) -> impl Stream<Item = Result<Bytes, WebmetroError>> {
|
||||||
let source = Compat01As03::new(stream
|
let source = stream
|
||||||
.map_err(WebmetroError::from))
|
.map_err(WebmetroError::from)
|
||||||
.parse_ebml().with_soft_limit(BUFFER_LIMIT)
|
.parse_ebml().with_soft_limit(BUFFER_LIMIT)
|
||||||
.chunk_webm().with_soft_limit(BUFFER_LIMIT);
|
.chunk_webm().with_soft_limit(BUFFER_LIMIT);
|
||||||
let sink = CompatSink::new(Transmitter::new(channel));
|
let sink = Transmitter::new(channel);
|
||||||
|
|
||||||
Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
|
source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
|
||||||
.into_stream()
|
.into_stream()
|
||||||
.map(|_| empty())
|
.map_ok(|_| Bytes::new())
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
warn!("{}", err);
|
warn!("{}", err);
|
||||||
err
|
err
|
||||||
})
|
})
|
||||||
.flatten()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn media_response(body: Body) -> Response<Body> {
|
fn media_response(body: Body) -> Response<Body> {
|
||||||
|
@ -99,7 +89,8 @@ pub fn options() -> App<'static, 'static> {
|
||||||
.required(true))
|
.required(true))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
#[tokio::main]
|
||||||
|
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
let channel_map = Arc::new(Mutex::new(WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new()));
|
let channel_map = Arc::new(Mutex::new(WeakValueHashMap::<String, Weak<Mutex<Channel>>>::new()));
|
||||||
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
|
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
|
||||||
|
|
||||||
|
@ -122,13 +113,13 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
media_response(Body::empty())
|
media_response(Body::empty())
|
||||||
});
|
});
|
||||||
|
|
||||||
let get = channel.clone().and(warp::get2())
|
let get = channel.clone().and(warp::get())
|
||||||
.map(|(channel, name)| {
|
.map(|(channel, name)| {
|
||||||
info!("Listener Connected On Channel {}", name);
|
info!("Listener Connected On Channel {}", name);
|
||||||
media_response(Body::wrap_stream(get_stream(channel)))
|
media_response(Body::wrap_stream(get_stream(channel)))
|
||||||
});
|
});
|
||||||
|
|
||||||
let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify())
|
let post_put = channel.clone().and(warp::post().or(warp::put()).unify())
|
||||||
.and(warp::body::stream()).map(|(channel, name), stream| {
|
.and(warp::body::stream()).map(|(channel, name), stream| {
|
||||||
info!("Source Connected On Channel {}", name);
|
info!("Source Connected On Channel {}", name);
|
||||||
Response::new(Body::wrap_stream(post_stream(channel, stream)))
|
Response::new(Body::wrap_stream(post_stream(channel, stream)))
|
||||||
|
@ -138,11 +129,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
.or(get)
|
.or(get)
|
||||||
.or(post_put);
|
.or(post_put);
|
||||||
|
|
||||||
let mut rt = tokio::runtime::Runtime::new()?;
|
let mut server_futures: FuturesUnordered<_> = addrs.map(|addr| warp::serve(routes.clone()).try_bind(addr)).collect();
|
||||||
|
|
||||||
for do_serve in addrs.map(|addr| warp::serve(routes.clone()).try_bind(addr)) {
|
while let Some(_) = server_futures.next().await {};
|
||||||
rt.spawn(do_serve);
|
|
||||||
}
|
|
||||||
|
|
||||||
rt.shutdown_on_idle().wait().map_err(|_| "Shutdown error.".into())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
use clap::{App, Arg, ArgMatches, SubCommand};
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use hyper13::{client::HttpConnector, Body, Client, Request};
|
use hyper::{client::HttpConnector, Body, Client, Request};
|
||||||
use std::io::{stdout, Write};
|
use std::io::{stdout, Write};
|
||||||
use tokio2::runtime::Runtime;
|
|
||||||
|
|
||||||
use super::stdin_stream;
|
use super::stdin_stream;
|
||||||
use webmetro::{
|
use webmetro::{
|
||||||
|
@ -23,14 +22,10 @@ pub fn options() -> App<'static, 'static> {
|
||||||
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
||||||
}
|
}
|
||||||
|
|
||||||
type BoxedChunkStream = Box<
|
type BoxedChunkStream = Box<dyn Stream<Item = Result<Chunk, WebmetroError>> + Send + Sync + Unpin>;
|
||||||
dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
|
|
||||||
+ Send
|
|
||||||
+ Sync
|
|
||||||
+ Unpin,
|
|
||||||
>;
|
|
||||||
|
|
||||||
pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
#[tokio::main]
|
||||||
|
pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
let mut timecode_fixer = ChunkTimecodeFixer::new();
|
||||||
let mut chunk_stream: BoxedChunkStream = Box::new(
|
let mut chunk_stream: BoxedChunkStream = Box::new(
|
||||||
stdin_stream()
|
stdin_stream()
|
||||||
|
@ -60,12 +55,10 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
||||||
let request = Request::put(url_str).body(request_payload)?;
|
let request = Request::put(url_str).body(request_payload)?;
|
||||||
let client = Client::builder().build(HttpConnector::new());
|
let client = Client::builder().build(HttpConnector::new());
|
||||||
|
|
||||||
Runtime::new().unwrap().block_on(async {
|
let response = client.request(request).await?;
|
||||||
let response = client.request(request).await?;
|
let mut response_stream = response.into_body();
|
||||||
let mut response_stream = response.into_body();
|
while let Some(response_chunk) = response_stream.next().await.transpose()? {
|
||||||
while let Some(response_chunk) = response_stream.next().await.transpose()? {
|
stdout().write_all(&response_chunk)?;
|
||||||
stdout().write_all(&response_chunk)?;
|
}
|
||||||
}
|
Ok(())
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
64
src/ebml.rs
64
src/ebml.rs
|
@ -1,6 +1,7 @@
|
||||||
use bytes::{BigEndian, ByteOrder, BufMut};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
|
use bytes::{BufMut};
|
||||||
use custom_error::custom_error;
|
use custom_error::custom_error;
|
||||||
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
|
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
|
||||||
|
|
||||||
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
|
pub const EBML_HEAD_ID: u64 = 0x0A45DFA3;
|
||||||
pub const DOC_TYPE_ID: u64 = 0x0282;
|
pub const DOC_TYPE_ID: u64 = 0x0282;
|
||||||
|
@ -135,10 +136,10 @@ pub fn encode_varint<T: Write>(varint: Varint, output: &mut T) -> IoResult<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 8]);
|
let mut buffer = [0; 8];
|
||||||
buffer.put_uint_be(number, size);
|
buffer.as_mut().put_uint(number, size);
|
||||||
|
|
||||||
return output.write_all(&buffer.get_ref()[..size]);
|
return output.write_all(&buffer[..size]);
|
||||||
}
|
}
|
||||||
|
|
||||||
const FOUR_FLAG: u64 = 0x10 << (8*3);
|
const FOUR_FLAG: u64 = 0x10 << (8*3);
|
||||||
|
@ -154,10 +155,10 @@ pub fn encode_varint_4<T: Write>(varint: Varint, output: &mut T) -> IoResult<()>
|
||||||
Varint::Value(value) => FOUR_FLAG | value
|
Varint::Value(value) => FOUR_FLAG | value
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 4]);
|
let mut buffer = [0; 4];
|
||||||
buffer.put_u32_be(number as u32);
|
buffer.as_mut().put_u32(number as u32);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])
|
output.write_all(&buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
|
pub fn encode_element<T: Write + Seek, F: Fn(&mut T) -> IoResult<X>, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> {
|
||||||
|
@ -190,10 +191,10 @@ pub fn encode_bytes<T: Write>(tag: u64, bytes: &[u8], output: &mut T) -> IoResul
|
||||||
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
|
pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResult<()> {
|
||||||
encode_tag_header(tag, Varint::Value(8), output)?;
|
encode_tag_header(tag, Varint::Value(8), output)?;
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 8]);
|
let mut buffer = [0; 8];
|
||||||
buffer.put_u64_be(value);
|
buffer.as_mut().put_u64(value);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])
|
output.write_all(&buffer[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EbmlLayout {
|
pub struct EbmlLayout {
|
||||||
|
@ -262,11 +263,10 @@ pub trait FromEbml<'a>: Sized {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bytes::{BytesMut};
|
use bytes::{BytesMut, buf::ext::BufMutExt};
|
||||||
use crate::ebml::*;
|
use crate::ebml::*;
|
||||||
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
|
use crate::ebml::EbmlError::{CorruptVarint, UnknownElementId};
|
||||||
use crate::ebml::Varint::{Unknown, Value};
|
use crate::ebml::Varint::{Unknown, Value};
|
||||||
use std::io::Cursor;
|
|
||||||
use crate::tests::TEST_FILE;
|
use crate::tests::TEST_FILE;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -298,44 +298,48 @@ mod tests {
|
||||||
fn encode_varints() {
|
fn encode_varints() {
|
||||||
let mut buffer = BytesMut::with_capacity(10).writer();
|
let mut buffer = BytesMut::with_capacity(10).writer();
|
||||||
|
|
||||||
let mut no_space = Cursor::new([0; 0]).writer();
|
let mut no_space = [0; 0];
|
||||||
assert_eq!(no_space.get_ref().remaining_mut(), 0);
|
let mut no_space_writer = no_space.as_mut().writer();
|
||||||
|
assert_eq!(no_space_writer.get_mut().remaining_mut(), 0);
|
||||||
|
|
||||||
let mut six_buffer = Cursor::new([0; 6]).writer();
|
let mut six_buffer = [0; 6];
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
|
let mut six_buffer_writer = six_buffer.as_mut().writer();
|
||||||
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
|
||||||
|
|
||||||
// 1 byte
|
// 1 byte
|
||||||
encode_varint(Varint::Unknown, &mut buffer).unwrap();
|
encode_varint(Varint::Unknown, &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Unknown, &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Unknown, &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(0), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(0), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(1), &mut buffer).unwrap();
|
encode_varint(Varint::Value(1), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(1), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(1), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(126), &mut buffer).unwrap();
|
encode_varint(Varint::Value(126), &mut buffer).unwrap();
|
||||||
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
|
assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(126), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(126), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 2 bytes
|
// 2 bytes
|
||||||
encode_varint(Varint::Value(127), &mut buffer).unwrap();
|
encode_varint(Varint::Value(127), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(127), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(127), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
encode_varint(Varint::Value(128), &mut buffer).unwrap();
|
encode_varint(Varint::Value(128), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref());
|
||||||
assert_eq!(encode_varint(Varint::Value(128), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(128), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 6 bytes
|
// 6 bytes
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 6);
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 6);
|
||||||
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer).unwrap();
|
encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer_writer).unwrap();
|
||||||
assert_eq!(six_buffer.get_ref().remaining_mut(), 0);
|
assert_eq!(six_buffer_writer.get_mut().remaining_mut(), 0);
|
||||||
assert_eq!(&six_buffer.get_ref().get_ref(), &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
assert_eq!(&six_buffer, &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
||||||
six_buffer = Cursor::new([0; 6]).writer();
|
|
||||||
|
let mut six_buffer = [0; 6];
|
||||||
|
let mut six_buffer_writer = six_buffer.as_mut().writer();
|
||||||
|
|
||||||
// 7 bytes
|
// 7 bytes
|
||||||
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap();
|
||||||
|
@ -347,8 +351,8 @@ mod tests {
|
||||||
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap();
|
||||||
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref());
|
||||||
|
|
||||||
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer).unwrap_err().kind(), ErrorKind::WriteZero);
|
assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer_writer).unwrap_err().kind(), ErrorKind::WriteZero);
|
||||||
|
|
||||||
// 8 bytes
|
// 8 bytes
|
||||||
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();
|
encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap();
|
||||||
|
|
|
@ -6,7 +6,6 @@ custom_error!{pub WebmetroError
|
||||||
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
|
EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
|
||||||
HttpError{source: http::Error} = "HTTP error: {source}",
|
HttpError{source: http::Error} = "HTTP error: {source}",
|
||||||
HyperError{source: hyper::Error} = "Hyper error: {source}",
|
HyperError{source: hyper::Error} = "Hyper error: {source}",
|
||||||
Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
|
|
||||||
IoError{source: std::io::Error} = "IO error: {source}",
|
IoError{source: std::io::Error} = "IO error: {source}",
|
||||||
WarpError{source: warp::Error} = "Warp error: {source}",
|
WarpError{source: warp::Error} = "Warp error: {source}",
|
||||||
ApplicationError{message: String} = "{message}"
|
ApplicationError{message: String} = "{message}"
|
||||||
|
|
|
@ -3,16 +3,16 @@ use std::task::{
|
||||||
Context,
|
Context,
|
||||||
Poll
|
Poll
|
||||||
};
|
};
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use futures3::prelude::*;
|
use futures::prelude::*;
|
||||||
use tokio2::timer::{
|
use tokio::time::{
|
||||||
delay,
|
delay_until,
|
||||||
Delay
|
Delay,
|
||||||
|
Duration,
|
||||||
|
Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::chunk::Chunk;
|
use crate::chunk::Chunk;
|
||||||
use crate::error::WebmetroError;
|
|
||||||
|
|
||||||
pub struct ChunkTimecodeFixer {
|
pub struct ChunkTimecodeFixer {
|
||||||
current_offset: u64,
|
current_offset: u64,
|
||||||
|
@ -28,7 +28,7 @@ impl ChunkTimecodeFixer {
|
||||||
assumed_duration: 33
|
assumed_duration: 33
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk {
|
pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
|
||||||
match chunk {
|
match chunk {
|
||||||
Chunk::ClusterHead(ref mut cluster_head) => {
|
Chunk::ClusterHead(ref mut cluster_head) => {
|
||||||
let start = cluster_head.start;
|
let start = cluster_head.start;
|
||||||
|
@ -105,16 +105,16 @@ impl<S> Throttle<S> {
|
||||||
Throttle {
|
Throttle {
|
||||||
stream: wrap,
|
stream: wrap,
|
||||||
start_time: now,
|
start_time: now,
|
||||||
sleep: delay(now)
|
sleep: delay_until(now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S>
|
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S>
|
||||||
{
|
{
|
||||||
type Item = Result<Chunk, WebmetroError>;
|
type Item = Result<Chunk, S::Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
|
||||||
match self.sleep.poll_unpin(cx) {
|
match self.sleep.poll_unpin(cx) {
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
Poll::Ready(()) => { /* can continue */ },
|
Poll::Ready(()) => { /* can continue */ },
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use futures3::stream::{Stream, StreamExt, TryStream};
|
use futures::stream::{Stream, StreamExt};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use crate::ebml::FromEbml;
|
use crate::ebml::FromEbml;
|
||||||
|
@ -22,11 +22,7 @@ impl<S> EbmlStreamingParser<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait StreamEbml: Sized + TryStream + Unpin
|
pub trait StreamEbml: Sized {
|
||||||
where
|
|
||||||
Self: Sized + TryStream + Unpin,
|
|
||||||
Self::Ok: Buf,
|
|
||||||
{
|
|
||||||
fn parse_ebml(self) -> EbmlStreamingParser<Self> {
|
fn parse_ebml(self) -> EbmlStreamingParser<Self> {
|
||||||
EbmlStreamingParser {
|
EbmlStreamingParser {
|
||||||
stream: self,
|
stream: self,
|
||||||
|
@ -37,9 +33,13 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> StreamEbml for S where WebmetroError: From<E>
|
||||||
|
{}
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
|
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> EbmlStreamingParser<S>
|
||||||
|
where
|
||||||
|
WebmetroError: From<E>,
|
||||||
|
{
|
||||||
pub fn poll_event<'a, T: FromEbml<'a>>(
|
pub fn poll_event<'a, T: FromEbml<'a>>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
|
@ -53,10 +53,9 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
let mut bytes = self.buffer.split_to(info.element_len).freeze();
|
let mut bytes = self.buffer.split_to(info.element_len).freeze();
|
||||||
bytes.advance(info.body_offset);
|
bytes.advance(info.body_offset);
|
||||||
self.borrowed = bytes;
|
self.borrowed = bytes;
|
||||||
return Poll::Ready(Some(T::decode(
|
return Poll::Ready(Some(
|
||||||
info.element_id,
|
T::decode(info.element_id, &self.borrowed).map_err(Into::into),
|
||||||
&self.borrowed,
|
));
|
||||||
).map_err(Into::into)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,9 +76,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
|
|
||||||
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
|
pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
|
||||||
loop {
|
loop {
|
||||||
if let Some(info) = T::check_space(&self.buffer)? {
|
if let Some(info) = T::check_space(&self.buffer)? {
|
||||||
|
@ -112,8 +109,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingPa
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use bytes::IntoBuf;
|
use futures::{future::poll_fn, stream::StreamExt, FutureExt};
|
||||||
use futures3::{future::poll_fn, stream::StreamExt, FutureExt};
|
|
||||||
use matches::assert_matches;
|
use matches::assert_matches;
|
||||||
use std::task::Poll::*;
|
use std::task::Poll::*;
|
||||||
|
|
||||||
|
@ -130,8 +126,8 @@ mod tests {
|
||||||
&ENCODE_WEBM_TEST_FILE[40..],
|
&ENCODE_WEBM_TEST_FILE[40..],
|
||||||
];
|
];
|
||||||
|
|
||||||
let mut stream_parser = futures3::stream::iter(pieces.iter())
|
let mut stream_parser = futures::stream::iter(pieces.iter())
|
||||||
.map(|bytes| Ok(bytes.into_buf()))
|
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
|
||||||
.parse_ebml();
|
.parse_ebml();
|
||||||
|
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
|
@ -182,8 +178,8 @@ mod tests {
|
||||||
];
|
];
|
||||||
|
|
||||||
async {
|
async {
|
||||||
let mut parser = futures3::stream::iter(pieces.iter())
|
let mut parser = futures::stream::iter(pieces.iter())
|
||||||
.map(|bytes| Ok(bytes.into_buf()))
|
.map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..]))
|
||||||
.parse_ebml();
|
.parse_ebml();
|
||||||
|
|
||||||
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
|
assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead));
|
||||||
|
@ -197,8 +193,8 @@ mod tests {
|
||||||
|
|
||||||
Result::<(), WebmetroError>::Ok(())
|
Result::<(), WebmetroError>::Ok(())
|
||||||
}
|
}
|
||||||
.now_or_never()
|
.now_or_never()
|
||||||
.expect("Test tried to block on I/O")
|
.expect("Test tried to block on I/O")
|
||||||
.expect("Parse failed");
|
.expect("Parse failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
15
src/webm.rs
15
src/webm.rs
|
@ -1,5 +1,6 @@
|
||||||
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
|
use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write, Seek};
|
||||||
use bytes::{BigEndian, BufMut, ByteOrder};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
|
use bytes::BufMut;
|
||||||
use crate::ebml::*;
|
use crate::ebml::*;
|
||||||
use crate::iterator::ebml_iter;
|
use crate::iterator::ebml_iter;
|
||||||
use crate::iterator::EbmlIterator;
|
use crate::iterator::EbmlIterator;
|
||||||
|
@ -103,11 +104,12 @@ pub fn encode_simple_block<T: Write>(block: SimpleBlock, output: &mut T) -> IoRe
|
||||||
|
|
||||||
encode_varint(Varint::Value(track), output)?;
|
encode_varint(Varint::Value(track), output)?;
|
||||||
|
|
||||||
let mut buffer = Cursor::new([0; 3]);
|
let mut buffer = [0; 3];
|
||||||
buffer.put_i16_be(timecode);
|
let mut cursor = buffer.as_mut();
|
||||||
buffer.put_u8(flags);
|
cursor.put_i16(timecode);
|
||||||
|
cursor.put_u8(flags);
|
||||||
|
|
||||||
output.write_all(&buffer.get_ref()[..])?;
|
output.write_all(&buffer)?;
|
||||||
output.write_all(data)
|
output.write_all(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +133,7 @@ pub fn encode_webm_element<T: Write + Seek>(element: WebmElement, output: &mut T
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::io::Cursor;
|
||||||
use crate::tests::{
|
use crate::tests::{
|
||||||
TEST_FILE,
|
TEST_FILE,
|
||||||
ENCODE_WEBM_TEST_FILE
|
ENCODE_WEBM_TEST_FILE
|
||||||
|
|
Loading…
Reference in a new issue