webmetro/src/chunk.rs

305 lines
11 KiB
Rust
Raw Permalink Normal View History

2022-06-10 21:15:03 +00:00
use crate::error::WebmetroError;
use crate::stream_parser::EbmlStreamingParser;
use crate::webm::*;
2019-11-28 03:25:50 +00:00
use bytes::{Buf, Bytes, BytesMut};
use futures::prelude::*;
2018-04-12 05:59:28 +00:00
use std::{
io::Cursor,
mem,
pin::Pin,
task::{Context, Poll, Poll::*},
2018-04-12 05:59:28 +00:00
};
2018-03-30 06:26:35 +00:00
#[derive(Clone, Debug)]
pub struct ClusterHead {
pub keyframe: bool,
pub start: u64,
pub end: u64,
2019-11-28 03:25:50 +00:00
/// a Cluster tag and a Timecode tag together take at most 15 bytes;
/// fortuitously, 15 bytes can be inlined in a Bytes handle even on 32-bit systems
bytes: BytesMut,
}
impl ClusterHead {
pub fn new(timecode: u64) -> ClusterHead {
let mut cluster_head = ClusterHead {
2017-09-29 04:12:49 +00:00
keyframe: false,
start: 0,
end: 0,
2019-11-28 03:25:50 +00:00
bytes: BytesMut::with_capacity(15),
2017-09-29 04:12:49 +00:00
};
cluster_head.update_timecode(timecode);
cluster_head
2017-09-29 04:12:49 +00:00
}
pub fn update_timecode(&mut self, timecode: u64) {
let delta = self.end - self.start;
self.start = timecode;
self.end = self.start + delta;
2022-06-10 21:15:03 +00:00
let mut buffer = [0; 15];
2019-11-28 03:25:50 +00:00
let mut cursor = Cursor::new(buffer.as_mut());
// buffer is sized so these should never fail
2018-04-04 23:55:20 +00:00
encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap();
encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap();
2019-11-28 03:25:50 +00:00
self.bytes.clear();
let len = cursor.position() as usize;
self.bytes.extend_from_slice(&buffer[..len]);
}
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
let absolute_timecode = self.start + (timecode as u64);
if absolute_timecode > self.start {
self.end = absolute_timecode;
2017-09-29 04:12:49 +00:00
}
}
}
2018-09-18 06:15:02 +00:00
/// A chunk of WebM data
2018-04-03 00:11:54 +00:00
#[derive(Clone, Debug)]
pub enum Chunk {
2022-06-10 21:15:03 +00:00
Headers { bytes: Bytes },
Cluster(ClusterHead, Bytes),
2020-05-07 01:50:03 +00:00
}
impl Chunk {
pub fn overlaps(&self, start: u128, stop: u128) -> bool {
match self {
Chunk::Cluster(head, _) => head.start as u128 <= stop && head.end as u128 >= start,
_ => true,
}
}
}
2022-06-10 21:15:03 +00:00
impl IntoIterator for Chunk {
2020-05-07 01:50:03 +00:00
type Item = Bytes;
2022-06-10 21:15:03 +00:00
type IntoIter = Iter;
2020-05-07 01:50:03 +00:00
2022-06-10 21:15:03 +00:00
fn into_iter(self) -> Self::IntoIter {
2020-05-07 01:50:03 +00:00
match self {
2022-06-10 21:15:03 +00:00
Chunk::Headers { bytes } => Iter::Buffer(bytes),
Chunk::Cluster(head, bytes) => Iter::Cluster(head, bytes),
}
}
}
pub enum Iter {
Cluster(ClusterHead, Bytes),
Buffer(Bytes),
Empty,
}
impl Iterator for Iter {
type Item = Bytes;
fn next(&mut self) -> Option<Bytes> {
let iter = mem::replace(self, Iter::Empty);
match iter {
Iter::Cluster(ClusterHead { bytes: head, .. }, body) => {
*self = Iter::Buffer(body);
Some(head.freeze())
}
Iter::Buffer(bytes) => Some(bytes),
Iter::Empty => None,
2020-05-07 01:50:03 +00:00
}
}
}
2018-03-30 06:26:35 +00:00
#[derive(Debug)]
enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>),
// ClusterHead & body buffer
BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
2022-06-10 21:15:03 +00:00
End,
}
pub struct WebmChunker<S> {
source: EbmlStreamingParser<S>,
2018-04-16 05:16:03 +00:00
buffer_size_limit: Option<usize>,
state: ChunkerState,
2017-10-01 04:21:33 +00:00
}
2018-04-16 05:16:03 +00:00
impl<S> WebmChunker<S> {
/// add a "soft" buffer size limit; if a chunk buffer exceeds this size,
/// error the stream instead of resuming. It's still possible for a buffer
/// to exceed this size *after* a write, so ensure input sizes are reasonable.
pub fn with_soft_limit(mut self, limit: usize) -> Self {
2018-04-16 05:16:03 +00:00
self.buffer_size_limit = Some(limit);
self
}
}
2022-06-10 21:15:03 +00:00
fn encode(
element: WebmElement,
buffer: &mut Cursor<Vec<u8>>,
limit: Option<usize>,
) -> Result<(), WebmetroError> {
2018-04-16 05:16:03 +00:00
if let Some(limit) = limit {
if limit <= buffer.get_ref().len() {
return Err(WebmetroError::ResourcesExceeded);
}
}
encode_webm_element(element, buffer).map_err(|err| err.into())
}
2020-05-09 01:15:18 +00:00
impl<I: Buf, E, S: Stream<Item = Result<I, E>> + Unpin> Stream for WebmChunker<S>
where
WebmetroError: From<E>,
2017-10-01 04:21:33 +00:00
{
type Item = Result<Chunk, WebmetroError>;
2017-10-01 04:21:33 +00:00
2022-06-10 21:15:03 +00:00
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut();
loop {
match chunker.state {
ChunkerState::BuildingHeader(ref mut buffer) => {
match chunker.source.poll_event(cx) {
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Pending => return Pending,
Ready(None) => return Ready(None),
Ready(Some(Ok(element))) => match element {
WebmElement::Cluster => {
2022-06-10 21:15:03 +00:00
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {
bytes: Bytes::from(liberated_buffer.into_inner()),
};
chunker.state = ChunkerState::BuildingCluster(
ClusterHead::new(0),
2022-06-10 21:15:03 +00:00
Cursor::new(Vec::new()),
);
return Ready(Some(Ok(header_chunk)));
2022-06-10 21:15:03 +00:00
}
WebmElement::Info => {}
WebmElement::Void => {}
WebmElement::Unknown(_) => {}
element => {
2022-06-10 21:15:03 +00:00
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
}
2022-06-10 21:15:03 +00:00
},
}
2022-06-10 21:15:03 +00:00
}
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match chunker.source.poll_event(cx) {
Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))),
Pending => return Pending,
Ready(Some(Ok(element))) => match element {
WebmElement::EbmlHead | WebmElement::Segment => {
2022-06-10 21:15:03 +00:00
let liberated_cluster_head =
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new());
2022-06-10 21:15:03 +00:00
match encode(
element,
&mut new_header_cursor,
chunker.buffer_size_limit,
) {
Ok(_) => {
2022-06-10 21:15:03 +00:00
chunker.state =
ChunkerState::BuildingHeader(new_header_cursor);
return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
}
Err(err) => {
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
}
2022-06-10 21:15:03 +00:00
}
WebmElement::Cluster => {
2022-06-10 21:15:03 +00:00
let liberated_cluster_head =
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer =
mem::replace(buffer, Cursor::new(Vec::new()));
2022-06-10 21:15:03 +00:00
return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
}
WebmElement::Timecode(timecode) => {
cluster_head.update_timecode(timecode);
2022-06-10 21:15:03 +00:00
}
WebmElement::SimpleBlock(ref block) => {
if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.keyframe = true;
}
cluster_head.observe_simpleblock_timecode(block.timecode);
2022-06-10 21:15:03 +00:00
if let Err(err) = encode(
WebmElement::SimpleBlock(*block),
buffer,
chunker.buffer_size_limit,
) {
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
2022-06-10 21:15:03 +00:00
}
WebmElement::Info => {}
WebmElement::Void => {}
WebmElement::Unknown(_) => {}
element => {
2022-06-10 21:15:03 +00:00
if let Err(err) = encode(element, buffer, chunker.buffer_size_limit)
{
chunker.state = ChunkerState::End;
return Ready(Some(Err(err)));
}
2022-06-10 21:15:03 +00:00
}
},
Ready(None) => {
// flush final Cluster on end of stream
2022-06-10 21:15:03 +00:00
let liberated_cluster_head =
mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
chunker.state = ChunkerState::End;
2022-06-10 21:15:03 +00:00
return Ready(Some(Ok(Chunk::Cluster(
liberated_cluster_head,
Bytes::from(liberated_buffer.into_inner()),
))));
}
}
2022-06-10 21:15:03 +00:00
}
ChunkerState::End => return Ready(None),
};
}
2017-10-01 04:21:33 +00:00
}
}
pub trait WebmStream {
type Stream;
fn chunk_webm(self) -> WebmChunker<Self::Stream>;
}
impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
type Stream = S;
fn chunk_webm(self) -> WebmChunker<S> {
2017-10-01 04:21:33 +00:00
WebmChunker {
source: self,
2018-04-16 05:16:03 +00:00
buffer_size_limit: None,
2022-06-10 21:15:03 +00:00
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
2017-10-01 04:21:33 +00:00
}
}
}
#[cfg(test)]
mod tests {
2018-12-22 20:03:19 +00:00
use crate::chunk::*;
#[test]
fn enough_space_for_header() {
ClusterHead::new(u64::max_value());
}
}