Implement header chunking for chunk_webm() operator

This commit is contained in:
Tangent 128 2017-10-04 01:05:23 -04:00
parent 52c1843311
commit 0aa2f1cbdd

View file

@ -1,5 +1,6 @@
use futures::{Async, Stream}; use futures::{Async, Stream};
use std::io::Cursor; use std::io::Cursor;
use std::mem;
use std::sync::Arc; use std::sync::Arc;
use webm::*; use webm::*;
@ -70,8 +71,16 @@ impl<B: AsRef<[u8]>> AsRef<[u8]> for Chunk<B> {
} }
} }
enum ChunkerState {
BuildingHeader(Cursor<Vec<u8>>),
// WIP ClusterHead & body buffer
BuildingCluster(Chunk, Cursor<Vec<u8>>),
EmittingClusterBody(Chunk)
}
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
stream: S stream: S,
state: ChunkerState
} }
impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S> impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
@ -80,7 +89,41 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
type Error = S::Error; type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
Ok(Async::NotReady) loop {
let (return_value, next_state) = match self.state {
ChunkerState::BuildingHeader(ref mut buffer) => {
match self.stream.poll() {
Err(passthru) => return Err(passthru),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())};
(
Ok(Async::Ready(Some(chunk))),
ChunkerState::BuildingCluster(
Chunk::<Vec<u8>>::new_cluster_head(0),
Cursor::new(Vec::new())
)
)
},
Ok(Async::Ready(Some(element @ _))) => {
encode_webm_element(&element, buffer);
continue;
}
}
},
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
return Ok(Async::Ready(None));
},
ChunkerState::EmittingClusterBody(ref mut buffer) => {
return Ok(Async::Ready(None));
}
};
self.state = next_state;
return return_value;
}
} }
} }
@ -91,7 +134,8 @@ pub trait WebmStream<T> {
impl<'a, T: Stream<Item = WebmElement<'a>>> WebmStream<T> for T { impl<'a, T: Stream<Item = WebmElement<'a>>> WebmStream<T> for T {
fn chunk_webm(self) -> WebmChunker<T> { fn chunk_webm(self) -> WebmChunker<T> {
WebmChunker { WebmChunker {
stream: self stream: self,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new()))
} }
} }
} }