Make ClusterHead a first-class struct, simplifying a lot of enum wrangling

This commit is contained in:
Tangent 128 2018-03-28 00:31:58 -04:00
parent cdcff869aa
commit f8db95e61e
2 changed files with 53 additions and 55 deletions

View File

@ -5,60 +5,59 @@ use std::sync::Arc;
use webm::*; use webm::*;
#[derive(Clone)] #[derive(Clone)]
pub enum Chunk<B: AsRef<[u8]> = Vec<u8>> { pub struct ClusterHead {
Headers { pub keyframe: bool,
bytes: Arc<B> pub start: u64,
}, pub end: u64,
ClusterHead { // space for a Cluster tag and a Timecode tag
keyframe: bool, bytes: [u8;16],
start: u64, bytes_used: u8
end: u64,
// space for a Cluster tag and a Timecode tag
bytes: [u8;16],
bytes_used: u8
},
ClusterBody {
bytes: Arc<B>
}
} }
impl<B: AsRef<[u8]>> Chunk<B> { impl ClusterHead {
pub fn new_cluster_head(timecode: u64) -> Chunk { pub fn new(timecode: u64) -> ClusterHead {
let mut chunk = Chunk::ClusterHead { let mut cluster_head = ClusterHead {
keyframe: false, keyframe: false,
start: 0, start: 0,
end: 0, end: 0,
bytes: [0;16], bytes: [0;16],
bytes_used: 0 bytes_used: 0
}; };
chunk.update_timecode(timecode); cluster_head.update_timecode(timecode);
chunk cluster_head
} }
pub fn update_timecode(&mut self, timecode: u64) { pub fn update_timecode(&mut self, timecode: u64) {
if let &mut Chunk::ClusterHead {ref mut start, ref mut end, ref mut bytes, ref mut bytes_used, ..} = self { let delta = self.end - self.start;
let delta = *end - *start; self.start = timecode;
*start = timecode; self.end = self.start + delta;
*end = *start + delta; let mut cursor = Cursor::new(self.bytes.as_mut());
let mut cursor = Cursor::new(bytes as &mut [u8]); // buffer is sized so these should never fail
// buffer is sized so these should never fail encode_webm_element(&WebmElement::Cluster, &mut cursor).unwrap();
encode_webm_element(&WebmElement::Cluster, &mut cursor).unwrap(); encode_webm_element(&WebmElement::Timecode(timecode), &mut cursor).unwrap();
encode_webm_element(&WebmElement::Timecode(timecode), &mut cursor).unwrap(); self.bytes_used = cursor.position() as u8;
*bytes_used = cursor.position() as u8;
}
} }
pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { pub fn observe_simpleblock_timecode(&mut self, timecode: i16) {
if let &mut Chunk::ClusterHead {start, ref mut end, ..} = self { let absolute_timecode = self.start + (timecode as u64);
let absolute_timecode = start + (timecode as u64); if absolute_timecode > self.start {
if absolute_timecode > start { self.end = absolute_timecode;
*end = absolute_timecode;
}
} }
} }
pub fn mark_keyframe(&mut self, new_keyframe: bool) { }
if let &mut Chunk::ClusterHead {ref mut keyframe, ..} = self {
*keyframe = new_keyframe; impl AsRef<[u8]> for ClusterHead {
} fn as_ref(&self) -> &[u8] {
self.bytes[..self.bytes_used as usize].as_ref()
}
}
#[derive(Clone)]
pub enum Chunk<B: AsRef<[u8]> = Vec<u8>> {
Headers {
bytes: Arc<B>
},
ClusterHead(ClusterHead),
ClusterBody {
bytes: Arc<B>
} }
} }
@ -66,7 +65,7 @@ impl<B: AsRef<[u8]>> AsRef<[u8]> for Chunk<B> {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
match self { match self {
&Chunk::Headers {ref bytes, ..} => bytes.as_ref().as_ref(), &Chunk::Headers {ref bytes, ..} => bytes.as_ref().as_ref(),
&Chunk::ClusterHead {ref bytes, bytes_used, ..} => bytes[..bytes_used as usize].as_ref(), &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(),
&Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref().as_ref() &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref().as_ref()
} }
} }
@ -75,7 +74,7 @@ impl<B: AsRef<[u8]>> AsRef<[u8]> for Chunk<B> {
enum ChunkerState { enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>), BuildingHeader(Cursor<Vec<u8>>),
// WIP ClusterHead & body buffer // WIP ClusterHead & body buffer
BuildingCluster(Chunk, Cursor<Vec<u8>>), BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
EmittingClusterBody(Vec<u8>), EmittingClusterBody(Vec<u8>),
EmittingFinalClusterBody(Vec<u8>), EmittingFinalClusterBody(Vec<u8>),
End End
@ -110,7 +109,7 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
( (
Ok(Async::Ready(Some(header_chunk))), Ok(Async::Ready(Some(header_chunk))),
ChunkerState::BuildingCluster( ChunkerState::BuildingCluster(
Chunk::<Vec<u8>>::new_cluster_head(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
) )
) )
@ -131,10 +130,10 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let cluster_head_chunk = mem::replace(cluster_head, Chunk::<Vec<u8>>::new_cluster_head(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
( (
Ok(Async::Ready(Some(cluster_head_chunk))), Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))),
ChunkerState::EmittingClusterBody(liberated_buffer.into_inner()) ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())
) )
}, },
@ -145,7 +144,7 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => { Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => {
if (block.flags & 0b10000000) != 0 { if (block.flags & 0b10000000) != 0 {
// TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster
cluster_head.mark_keyframe(true); cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
match encode_webm_element(&WebmElement::SimpleBlock(*block), buffer) { match encode_webm_element(&WebmElement::SimpleBlock(*block), buffer) {
@ -170,10 +169,10 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let cluster_head_chunk = mem::replace(cluster_head, Chunk::<Vec<u8>>::new_cluster_head(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
( (
Ok(Async::Ready(Some(cluster_head_chunk))), Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))),
ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner()) ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())
) )
} }
@ -184,7 +183,7 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
( (
Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))), Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)}))),
ChunkerState::BuildingCluster( ChunkerState::BuildingCluster(
Chunk::<Vec<u8>>::new_cluster_head(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
) )
) )
@ -226,6 +225,6 @@ mod tests {
#[test] #[test]
fn enough_space_for_header() { fn enough_space_for_header() {
Chunk::<Vec<u8>>::new_cluster_head(u64::max_value()); ClusterHead::new(u64::max_value());
} }
} }

View File

@ -60,16 +60,15 @@ impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let mut poll_chunk = self.stream.poll(); let mut poll_chunk = self.stream.poll();
match poll_chunk { match poll_chunk {
Ok(Async::Ready(Some(Chunk::ClusterHead {start, end, ..}))) => { Ok(Async::Ready(Some(Chunk::ClusterHead(ref mut cluster_head)))) => {
let start = cluster_head.start;
if start < self.last_observed_timecode { if start < self.last_observed_timecode {
let next_timecode = self.last_observed_timecode + self.assumed_duration; let next_timecode = self.last_observed_timecode + self.assumed_duration;
self.current_offset = next_timecode - start; self.current_offset = next_timecode - start;
} }
if let Ok(Async::Ready(Some(ref mut cluster_head))) = poll_chunk { cluster_head.update_timecode(start + self.current_offset);
cluster_head.update_timecode(start + self.current_offset); self.last_observed_timecode = cluster_head.end + self.current_offset;
}
self.last_observed_timecode = end + self.current_offset;
}, },
Ok(Async::Ready(Some(Chunk::Headers {..}))) => { Ok(Async::Ready(Some(Chunk::Headers {..}))) => {
if self.seen_header { if self.seen_header {