From 5e2e1bcf83a297c1cff37abd96df302d930b078c Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Tue, 26 Nov 2019 21:26:08 -0500 Subject: [PATCH 1/4] replace a number of intermediate states with a "pending" Option --- src/chunk.rs | 49 ++++++++++++------------------------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index e678868..a7be6b5 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -95,19 +95,14 @@ enum ChunkerState { BuildingHeader(Cursor>), // ClusterHead & body buffer BuildingCluster(ClusterHead, Cursor>), - EmittingClusterBody(Vec), - EmittingClusterBodyBeforeNewHeader { - body: Vec, - new_header: Cursor> - }, - EmittingFinalClusterBody(Vec), End } pub struct WebmChunker { source: EbmlStreamingParser, buffer_size_limit: Option, - state: ChunkerState + state: ChunkerState, + pending_chunk: Option, } impl WebmChunker { @@ -136,6 +131,9 @@ impl> + Unpin> Stream for Webm fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { let mut chunker = self.get_mut(); + if chunker.pending_chunk.is_some() { + return Ready(chunker.pending_chunk.take().map(Ok)); + } loop { match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { @@ -178,10 +176,8 @@ impl> + Unpin> Stream for Webm let mut new_header_cursor = Cursor::new(Vec::new()); match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { Ok(_) => { - chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{ - body: liberated_buffer.into_inner(), - new_header: new_header_cursor - }; + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); + chunker.state = ChunkerState::BuildingHeader(new_header_cursor); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, Err(err) => { @@ -194,7 +190,7 @@ impl> + Unpin> Stream for Webm let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()); + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, WebmElement::Timecode(timecode) => { @@ -226,34 +222,12 @@ impl> + Unpin> Stream for Webm let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()); + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); + chunker.state = ChunkerState::End; return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); } } }, - ChunkerState::EmittingClusterBody(ref mut buffer) => { - let liberated_buffer = mem::replace(buffer, Vec::new()); - - chunker.state = ChunkerState::BuildingCluster( - ClusterHead::new(0), - Cursor::new(Vec::new()) - ); - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); - }, - ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { - let liberated_body = mem::replace(body, Vec::new()); - let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); - - chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor); - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)}))); - }, - ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { - // flush final Cluster on end of stream - let liberated_buffer = mem::replace(buffer, Vec::new()); - - chunker.state = ChunkerState::End; - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); - }, ChunkerState::End => return Ready(None) }; } @@ -271,7 +245,8 @@ impl WebmStream for EbmlStreamingParser { WebmChunker { source: self, buffer_size_limit: None, - state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), + pending_chunk: None } } } From 2400720d037e78a98fae2944673af8aa4be5f2ec Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Wed, 27 Nov 2019 22:25:50 -0500 Subject: [PATCH 2/4] Use a Bytes[Mut] for all Chunk data --- src/chunk.rs | 33 ++++++++++++--------------------- src/commands/filter.rs | 2 +- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index a7be6b5..74531fd 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use futures3::prelude::*; use std::{ io::Cursor, @@ -15,10 +15,9 @@ pub struct ClusterHead { pub keyframe: bool, pub start: u64, pub end: u64, - /// space for a Cluster tag and a Timecode tag - /// TODO: consider using a BytesMut here for simplicity - bytes: [u8;16], - bytes_used: u8 + /// a Cluster tag and a Timecode tag together take at most 15 bytes; + /// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems + bytes: BytesMut, } impl ClusterHead { @@ -27,8 +26,7 @@ impl ClusterHead { keyframe: false, start: 0, end: 0, - bytes: [0;16], - bytes_used: 0 + bytes: BytesMut::with_capacity(15), }; cluster_head.update_timecode(timecode); cluster_head @@ -37,11 +35,14 @@ impl ClusterHead { let delta = self.end - self.start; self.start = timecode; self.end = self.start + delta; - let mut cursor = Cursor::new(self.bytes.as_mut()); + let mut buffer = [0;15]; + let mut cursor = Cursor::new(buffer.as_mut()); // buffer is sized so these should never fail encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); - self.bytes_used = cursor.position() as u8; + self.bytes.clear(); + let len = cursor.position() as usize; + self.bytes.extend_from_slice(&buffer[..len]); } pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { let absolute_timecode = self.start + (timecode as u64); @@ -53,7 +54,7 @@ impl ClusterHead { impl AsRef<[u8]> for ClusterHead { fn as_ref(&self) -> &[u8] { - self.bytes[..self.bytes_used as usize].as_ref() + self.bytes.as_ref() } } @@ -74,22 +75,12 @@ impl Chunk { pub fn into_bytes(self) -> Bytes { match self { Chunk::Headers {bytes, ..} => bytes, - Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()), + Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(), Chunk::ClusterBody {bytes, ..} => bytes } } } -impl AsRef<[u8]> for Chunk { - fn as_ref(&self) -> &[u8] { - match self { - &Chunk::Headers {ref bytes, ..} => bytes.as_ref(), - &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), - &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref() - } - } -} - #[derive(Debug)] enum ChunkerState { BuildingHeader(Cursor>), diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 40c271f..bb1947f 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -44,6 +44,6 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { } Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { - ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)) + ready(io::stdout().write_all(&chunk.into_bytes()).map_err(WebmetroError::from)) })) } From dbcbf2831eb256292a3470cd63cd5e877e6b7c5c Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Thu, 28 Nov 2019 11:09:02 -0500 Subject: [PATCH 3/4] Remove unused (and soon-to-be meaningless) AsRef impl for ClusterHead --- src/chunk.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index 74531fd..ab9ca7e 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -52,12 +52,6 @@ impl ClusterHead { } } -impl AsRef<[u8]> for ClusterHead { - fn as_ref(&self) -> &[u8] { - self.bytes.as_ref() - } -} - /// A chunk of WebM data #[derive(Clone, Debug)] pub enum Chunk { From 9274fabeea18c735976733edee04a0fd22dfb3cb Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Wed, 6 May 2020 21:50:03 -0400 Subject: [PATCH 4/4] impl Iterator for Chunk --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/chunk.rs | 33 ++++++++++++++++++++++++++++++++- src/commands/filter.rs | 6 ++++-- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 921612b..4437496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,7 +1768,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 23138a7..47d94c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" authors = ["Tangent 128 "] edition = "2018" diff --git a/src/chunk.rs b/src/chunk.rs index ab9ca7e..bb20011 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -61,16 +61,47 @@ pub enum Chunk { ClusterHead(ClusterHead), ClusterBody { bytes: Bytes + }, + Empty +} + +pub struct Iter(Chunk); + +impl Iterator for Chunk { + type Item = Bytes; + + fn next(&mut self) -> Option { + match self { + Chunk::Headers {ref mut bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::ClusterHead(ClusterHead {bytes, ..}) => { + let bytes = mem::replace(bytes, BytesMut::new()); + *self = Chunk::Empty; + Some(bytes.freeze()) + }, + Chunk::ClusterBody {bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::Empty => None + } } } +// impl Buf??? + impl Chunk { /// converts this chunk of data into a Bytes object, perhaps to send over the network pub fn into_bytes(self) -> Bytes { match self { Chunk::Headers {bytes, ..} => bytes, Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(), - Chunk::ClusterBody {bytes, ..} => bytes + Chunk::ClusterBody {bytes, ..} => bytes, + Chunk::Empty => Bytes::new(), } } } diff --git a/src/commands/filter.rs b/src/commands/filter.rs index bb1947f..e203f7c 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -43,7 +43,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { chunk_stream = Box::new(Throttle::new(chunk_stream)); } - Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { - ready(io::stdout().write_all(&chunk.into_bytes()).map_err(WebmetroError::from)) + Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|mut chunk| { + ready(chunk.try_for_each(|buffer| + io::stdout().write_all(&buffer).map_err(WebmetroError::from) + )) })) }