convert Chunk to external iteration

This commit is contained in:
Tangent Wantwight 2022-06-10 17:15:03 -04:00
parent 2b88d09d0f
commit 884aa37888
2 changed files with 105 additions and 71 deletions

View file

@ -1,3 +1,6 @@
use crate::error::WebmetroError;
use crate::stream_parser::EbmlStreamingParser;
use crate::webm::*;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::prelude::*; use futures::prelude::*;
use std::{ use std::{
@ -6,9 +9,6 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll, Poll::*}, task::{Context, Poll, Poll::*},
}; };
use crate::stream_parser::EbmlStreamingParser;
use crate::error::WebmetroError;
use crate::webm::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ClusterHead { pub struct ClusterHead {
@ -55,16 +55,8 @@ impl ClusterHead {
/// A chunk of WebM data /// A chunk of WebM data
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Chunk { pub enum Chunk {
Headers { Headers { bytes: Bytes },
bytes: Bytes
},
Cluster(ClusterHead, Bytes), Cluster(ClusterHead, Bytes),
// for iteration only
#[doc(hidden)]
RemainingBody(Bytes),
// for iteration only
#[doc(hidden)]
Empty
} }
impl Chunk { impl Chunk {
@ -76,29 +68,36 @@ impl Chunk {
} }
} }
// TODO: make an external iterator type so we can remove Chunk::RemainingBody & Chunk::Empty impl IntoIterator for Chunk {
impl Iterator for Chunk { type Item = Bytes;
type IntoIter = Iter;
fn into_iter(self) -> Self::IntoIter {
match self {
Chunk::Headers { bytes } => Iter::Buffer(bytes),
Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes),
}
}
}
pub enum Iter {
Cluster(ClusterHead, Bytes),
Buffer(Bytes),
Empty,
}
impl Iterator for Iter {
type Item = Bytes; type Item = Bytes;
fn next(&mut self) -> Option<Bytes> { fn next(&mut self) -> Option<Bytes> {
match self { let iter = mem::replace(self, Iter::Empty);
Chunk::Headers {ref mut bytes, ..} => { match iter {
let bytes = mem::replace(bytes, Bytes::new()); Iter::Cluster(ClusterHead { bytes: head, .. }, body) => {
*self = Chunk::Empty; *self = Iter::Buffer(body);
Some(bytes) Some(head.freeze())
}, }
Chunk::Cluster(ClusterHead {bytes, ..}, body) => { Iter::Buffer(bytes) => Some(bytes),
let bytes = mem::replace(bytes, BytesMut::new()); Iter::Empty => None,
let body = mem::replace(body, Bytes::new());
*self = Chunk::RemainingBody(body);
Some(bytes.freeze())
},
Chunk::RemainingBody(bytes) => {
let bytes = mem::replace(bytes, Bytes::new());
*self = Chunk::Empty;
Some(bytes)
},
Chunk::Empty => None
} }
} }
} }
@ -108,7 +107,7 @@ enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// ClusterHead & body buffer // ClusterHead & body buffer
BuildingCluster(ClusterHead, Cursor<Vec<u8>>), BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
End End,
} }
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
@ -127,7 +126,11 @@ impl<S> WebmChunker<S> {
} }
} }
fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usize>) -> Result<(), WebmetroError> { fn encode(
element: WebmElement,
buffer: &mut Cursor<Vec<u8>>,
limit: Option<usize>,
) -> Result<(), WebmetroError> {
if let Some(limit) = limit { if let Some(limit) = limit {
if limit <= buffer.get_ref().len() { if limit <= buffer.get_ref().len() {
return Err(WebmetroError::ResourcesExceeded); return Err(WebmetroError::ResourcesExceeded);
@ -143,7 +146,10 @@ where
{ {
type Item = Result<Chunk, WebmetroError>; type Item = Result<Chunk, WebmetroError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut(); let mut chunker = self.get_mut();
loop { loop {
match chunker.state { match chunker.state {
@ -154,89 +160,117 @@ where
Ready(None) => return Ready(None), Ready(None) => return Ready(None),
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer =
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {
bytes: Bytes::from(liberated_buffer.into_inner()),
};
chunker.state = ChunkerState::BuildingCluster( chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new()),
); );
return Ready(Some(Ok(header_chunk))); return Ready(Some(Ok(header_chunk)));
}, }
WebmElement::Info => {}, WebmElement::Info => {}
WebmElement::Void => {}, WebmElement::Void => {}
WebmElement::Unknown(_) => {}, WebmElement::Unknown(_) => {}
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
}
}
}, },
}
}
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match chunker.source.poll_event(cx) { match chunker.source.poll_event(cx) {
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Pending => return Pending, Pending => return Pending,
Ready(Some(Ok(element))) => match element { Ready(Some(Ok(element))) => match element {
WebmElement::EbmlHead | WebmElement::Segment => { WebmElement::EbmlHead | WebmElement::Segment => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { match encode(
element,
&mut new_header_cursor,
chunker.buffer_size_limit,
) {
Ok(_) => { Ok(_) => {
chunker.state = ChunkerState::BuildingHeader(new_header_cursor); chunker.state =
return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); ChunkerState::BuildingHeader(new_header_cursor);
}, return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
}
Err(err) => { Err(err) => {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
} }
}, }
WebmElement::Cluster => { WebmElement::Cluster => {
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); return Ready(Some(Ok(Chunk::Cluster(
}, liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
}
WebmElement::Timecode(timecode) => { WebmElement::Timecode(timecode) => {
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
}, }
WebmElement::SimpleBlock(ref block) => { WebmElement::SimpleBlock(ref block) => {
if (block.flags & 0b10000000) != 0 { if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) { if let Err(err) = encode(
WebmElement::SimpleBlock(*block),
buffer,
chunker.buffer_size_limit,
) {
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
}, }
WebmElement::Info => {}, WebmElement::Info => {}
WebmElement::Void => {}, WebmElement::Void => {}
WebmElement::Unknown(_) => {}, WebmElement::Unknown(_) => {}
element => { element => {
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
}, }
}, },
Ready(None) => { Ready(None) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head =
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::End; chunker.state = ChunkerState::End;
return Ready(Some(Ok(Chunk::Cluster(liberated_cluster_head, Bytes::from(liberated_buffer.into_inner()))))); return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
} }
} }
}, }
ChunkerState::End => return Ready(None) ChunkerState::End => return Ready(None),
}; };
} }
} }
@ -253,7 +287,7 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
WebmChunker { WebmChunker {
source: self, source: self,
buffer_size_limit: None, buffer_size_limit: None,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
} }
} }
} }

View file

@ -47,7 +47,7 @@ pub async fn run(args: FilterArgs) -> Result<(), WebmetroError> {
} }
while let Some(chunk) = chunk_stream.next().await { while let Some(chunk) = chunk_stream.next().await {
chunk?.try_for_each(|buffer| io::stdout().write_all(&buffer))?; chunk?.into_iter().try_for_each(|buffer| io::stdout().write_all(&buffer))?;
} }
Ok(()) Ok(())
} }