Merge branch 'chunk_refactor'

This commit is contained in:
Tangent Wantwight 2020-05-06 21:50:45 -04:00
commit f4f752548e
4 changed files with 61 additions and 68 deletions

2
Cargo.lock generated
View file

@ -1768,7 +1768,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "webmetro" name = "webmetro"
version = "0.2.3-dev" version = "0.3.0-dev"
dependencies = [ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "webmetro" name = "webmetro"
version = "0.2.3-dev" version = "0.3.0-dev"
authors = ["Tangent 128 <Tangent128@gmail.com>"] authors = ["Tangent 128 <Tangent128@gmail.com>"]
edition = "2018" edition = "2018"

View file

@ -1,4 +1,4 @@
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes, BytesMut};
use futures3::prelude::*; use futures3::prelude::*;
use std::{ use std::{
io::Cursor, io::Cursor,
@ -15,10 +15,9 @@ pub struct ClusterHead {
pub keyframe: bool, pub keyframe: bool,
pub start: u64, pub start: u64,
pub end: u64, pub end: u64,
/// space for a Cluster tag and a Timecode tag /// a Cluster tag and a Timecode tag together take at most 15 bytes;
/// TODO: consider using a BytesMut here for simplicity /// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems
bytes: [u8;16], bytes: BytesMut,
bytes_used: u8
} }
impl ClusterHead { impl ClusterHead {
@ -27,8 +26,7 @@ impl ClusterHead {
keyframe: false, keyframe: false,
start: 0, start: 0,
end: 0, end: 0,
bytes: [0;16], bytes: BytesMut::with_capacity(15),
bytes_used: 0
}; };
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
cluster_head cluster_head
@ -37,11 +35,14 @@ impl ClusterHead {
let delta = self.end - self.start; let delta = self.end - self.start;
self.start = timecode; self.start = timecode;
self.end = self.start + delta; self.end = self.start + delta;
let mut cursor = Cursor::new(self.bytes.as_mut()); let mut buffer = [0;15];
let mut cursor = Cursor::new(buffer.as_mut());
// buffer is sized so these should never fail // buffer is sized so these should never fail
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
self.bytes_used = cursor.position() as u8; self.bytes.clear();
let len = cursor.position() as usize;
self.bytes.extend_from_slice(&buffer[..len]);
} }
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
let absolute_timecode = self.start + (timecode as u64); let absolute_timecode = self.start + (timecode as u64);
@ -51,12 +52,6 @@ impl ClusterHead {
} }
} }
impl AsRef<[u8]> for ClusterHead {
fn as_ref(&self) -> &[u8] {
self.bytes[..self.bytes_used as usize].as_ref()
}
}
/// A chunk of WebM data /// A chunk of WebM data
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Chunk { pub enum Chunk {
@ -66,26 +61,47 @@ pub enum Chunk {
ClusterHead(ClusterHead), ClusterHead(ClusterHead),
ClusterBody { ClusterBody {
bytes: Bytes bytes: Bytes
},
Empty
}
pub struct Iter(Chunk);
impl Iterator for Chunk {
type Item = Bytes;
fn next(&mut self) -> Option<Bytes> {
match self {
Chunk::Headers {ref mut bytes, ..} => {
let bytes = mem::replace(bytes, Bytes::new());
*self = Chunk::Empty;
Some(bytes)
},
Chunk::ClusterHead(ClusterHead {bytes, ..}) => {
let bytes = mem::replace(bytes, BytesMut::new());
*self = Chunk::Empty;
Some(bytes.freeze())
},
Chunk::ClusterBody {bytes, ..} => {
let bytes = mem::replace(bytes, Bytes::new());
*self = Chunk::Empty;
Some(bytes)
},
Chunk::Empty => None
} }
} }
}
// impl Buf???
impl Chunk { impl Chunk {
/// converts this chunk of data into a Bytes object, perhaps to send over the network /// converts this chunk of data into a Bytes object, perhaps to send over the network
pub fn into_bytes(self) -> Bytes { pub fn into_bytes(self) -> Bytes {
match self { match self {
Chunk::Headers {bytes, ..} => bytes, Chunk::Headers {bytes, ..} => bytes,
Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()), Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(),
Chunk::ClusterBody {bytes, ..} => bytes Chunk::ClusterBody {bytes, ..} => bytes,
} Chunk::Empty => Bytes::new(),
}
}
impl AsRef<[u8]> for Chunk {
fn as_ref(&self) -> &[u8] {
match self {
&Chunk::Headers {ref bytes, ..} => bytes.as_ref(),
&Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(),
&Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref()
} }
} }
} }
@ -95,19 +111,14 @@ enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// ClusterHead & body buffer // ClusterHead & body buffer
BuildingCluster(ClusterHead, Cursor<Vec<u8>>), BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
EmittingClusterBody(Vec<u8>),
EmittingClusterBodyBeforeNewHeader {
body: Vec<u8>,
new_header: Cursor<Vec<u8>>
},
EmittingFinalClusterBody(Vec<u8>),
End End
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: EbmlStreamingParser<S>, source: EbmlStreamingParser<S>,
buffer_size_limit: Option<usize>, buffer_size_limit: Option<usize>,
state: ChunkerState state: ChunkerState,
pending_chunk: Option<Chunk>,
} }
impl<S> WebmChunker<S> { impl<S> WebmChunker<S> {
@ -136,6 +147,9 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut(); let mut chunker = self.get_mut();
if chunker.pending_chunk.is_some() {
return Ready(chunker.pending_chunk.take().map(Ok));
}
loop { loop {
match chunker.state { match chunker.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
@ -178,10 +192,8 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
Ok(_) => { Ok(_) => {
chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{ chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
body: liberated_buffer.into_inner(), chunker.state = ChunkerState::BuildingHeader(new_header_cursor);
new_header: new_header_cursor
};
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
}, },
Err(err) => { Err(err) => {
@ -194,7 +206,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()); chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
}, },
WebmElement::Timecode(timecode) => { WebmElement::Timecode(timecode) => {
@ -226,34 +238,12 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()); chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
} }
} }
}, },
ChunkerState::EmittingClusterBody(ref mut buffer) => {
let liberated_buffer = mem::replace(buffer, Vec::new());
chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0),
Cursor::new(Vec::new())
);
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
},
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
let liberated_body = mem::replace(body, Vec::new());
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
},
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new());
chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
},
ChunkerState::End => return Ready(None) ChunkerState::End => return Ready(None)
}; };
} }
@ -271,7 +261,8 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
WebmChunker { WebmChunker {
source: self, source: self,
buffer_size_limit: None, buffer_size_limit: None,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
pending_chunk: None
} }
} }
} }

View file

@ -43,7 +43,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
chunk_stream = Box::new(Throttle::new(chunk_stream)); chunk_stream = Box::new(Throttle::new(chunk_stream));
} }
Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|mut chunk| {
ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)) ready(chunk.try_for_each(|buffer|
io::stdout().write_all(&buffer).map_err(WebmetroError::from)
))
})) }))
} }