diff --git a/Cargo.lock b/Cargo.lock index 921612b..4437496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,7 +1768,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 23138a7..47d94c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" authors = ["Tangent 128 "] edition = "2018" diff --git a/src/chunk.rs b/src/chunk.rs index e678868..bb20011 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use futures3::prelude::*; use std::{ io::Cursor, @@ -15,10 +15,9 @@ pub struct ClusterHead { pub keyframe: bool, pub start: u64, pub end: u64, - /// space for a Cluster tag and a Timecode tag - /// TODO: consider using a BytesMut here for simplicity - bytes: [u8;16], - bytes_used: u8 + /// a Cluster tag and a Timecode tag together take at most 15 bytes; + /// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems + bytes: BytesMut, } impl ClusterHead { @@ -27,8 +26,7 @@ impl ClusterHead { keyframe: false, start: 0, end: 0, - bytes: [0;16], - bytes_used: 0 + bytes: BytesMut::with_capacity(15), }; cluster_head.update_timecode(timecode); cluster_head @@ -37,11 +35,14 @@ impl ClusterHead { let delta = self.end - self.start; self.start = timecode; self.end = self.start + delta; - let mut cursor = Cursor::new(self.bytes.as_mut()); + let mut buffer = [0;15]; + let mut cursor = Cursor::new(buffer.as_mut()); // buffer is sized so these should never fail encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); - self.bytes_used = cursor.position() as u8; + self.bytes.clear(); + let len = cursor.position() as usize; + self.bytes.extend_from_slice(&buffer[..len]); } pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { let absolute_timecode = self.start + (timecode as u64); @@ -51,12 +52,6 @@ impl ClusterHead { } } -impl AsRef<[u8]> for ClusterHead { - fn as_ref(&self) -> &[u8] { - self.bytes[..self.bytes_used as usize].as_ref() - } -} - /// A chunk of WebM data #[derive(Clone, Debug)] pub enum Chunk { @@ -66,26 +61,47 @@ pub enum Chunk { ClusterHead(ClusterHead), ClusterBody { bytes: Bytes + }, + Empty +} + +pub struct Iter(Chunk); + +impl Iterator for Chunk { + type Item = Bytes; + + fn next(&mut self) -> Option { + match self { + Chunk::Headers {ref mut bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::ClusterHead(ClusterHead {bytes, ..}) => { + let bytes = mem::replace(bytes, BytesMut::new()); + *self = Chunk::Empty; + Some(bytes.freeze()) + }, + Chunk::ClusterBody {bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::Empty => None + } } } +// impl Buf??? + impl Chunk { /// converts this chunk of data into a Bytes object, perhaps to send over the network pub fn into_bytes(self) -> Bytes { match self { Chunk::Headers {bytes, ..} => bytes, - Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()), - Chunk::ClusterBody {bytes, ..} => bytes - } - } -} - -impl AsRef<[u8]> for Chunk { - fn as_ref(&self) -> &[u8] { - match self { - &Chunk::Headers {ref bytes, ..} => bytes.as_ref(), - &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), - &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref() + Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(), + Chunk::ClusterBody {bytes, ..} => bytes, + Chunk::Empty => Bytes::new(), } } } @@ -95,19 +111,14 @@ enum ChunkerState { BuildingHeader(Cursor>), // ClusterHead & body buffer BuildingCluster(ClusterHead, Cursor>), - EmittingClusterBody(Vec), - EmittingClusterBodyBeforeNewHeader { - body: Vec, - new_header: Cursor> - }, - EmittingFinalClusterBody(Vec), End } pub struct WebmChunker { source: EbmlStreamingParser, buffer_size_limit: Option, - state: ChunkerState + state: ChunkerState, + pending_chunk: Option, } impl WebmChunker { @@ -136,6 +147,9 @@ impl> + Unpin> Stream for Webm fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { let mut chunker = self.get_mut(); + if chunker.pending_chunk.is_some() { + return Ready(chunker.pending_chunk.take().map(Ok)); + } loop { match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { @@ -178,10 +192,8 @@ impl> + Unpin> Stream for Webm let mut new_header_cursor = Cursor::new(Vec::new()); match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { Ok(_) => { - chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{ - body: liberated_buffer.into_inner(), - new_header: new_header_cursor - }; + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); + chunker.state = ChunkerState::BuildingHeader(new_header_cursor); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, Err(err) => { @@ -194,7 +206,7 @@ impl> + Unpin> Stream for Webm let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()); + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); }, WebmElement::Timecode(timecode) => { @@ -226,34 +238,12 @@ impl> + Unpin> Stream for Webm let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()); + chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())}); + chunker.state = ChunkerState::End; return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head)))); } } }, - ChunkerState::EmittingClusterBody(ref mut buffer) => { - let liberated_buffer = mem::replace(buffer, Vec::new()); - - chunker.state = ChunkerState::BuildingCluster( - ClusterHead::new(0), - Cursor::new(Vec::new()) - ); - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); - }, - ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { - let liberated_body = mem::replace(body, Vec::new()); - let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); - - chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor); - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)}))); - }, - ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { - // flush final Cluster on end of stream - let liberated_buffer = mem::replace(buffer, Vec::new()); - - chunker.state = ChunkerState::End; - return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))); - }, ChunkerState::End => return Ready(None) }; } @@ -271,7 +261,8 @@ impl WebmStream for EbmlStreamingParser { WebmChunker { source: self, buffer_size_limit: None, - state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), + pending_chunk: None } } } diff --git a/src/commands/filter.rs b/src/commands/filter.rs index 40c271f..e203f7c 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -43,7 +43,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { chunk_stream = Box::new(Throttle::new(chunk_stream)); } - Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { - ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)) + Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|mut chunk| { + ready(chunk.try_for_each(|buffer| + io::stdout().write_all(&buffer).map_err(WebmetroError::from) + )) })) }