replace a number of intermediate states with a "pending" Option
This commit is contained in:
parent
3da59e2d96
commit
5e2e1bcf83
1 changed files with 12 additions and 37 deletions
49
src/chunk.rs
49
src/chunk.rs
|
@ -95,19 +95,14 @@ enum ChunkerState {
|
||||||
BuildingHeader(Cursor<Vec<u8>>),
|
BuildingHeader(Cursor<Vec<u8>>),
|
||||||
// ClusterHead & body buffer
|
// ClusterHead & body buffer
|
||||||
BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
|
BuildingCluster(ClusterHead, Cursor<Vec<u8>>),
|
||||||
EmittingClusterBody(Vec<u8>),
|
|
||||||
EmittingClusterBodyBeforeNewHeader {
|
|
||||||
body: Vec<u8>,
|
|
||||||
new_header: Cursor<Vec<u8>>
|
|
||||||
},
|
|
||||||
EmittingFinalClusterBody(Vec<u8>),
|
|
||||||
End
|
End
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WebmChunker<S> {
|
pub struct WebmChunker<S> {
|
||||||
source: EbmlStreamingParser<S>,
|
source: EbmlStreamingParser<S>,
|
||||||
buffer_size_limit: Option<usize>,
|
buffer_size_limit: Option<usize>,
|
||||||
state: ChunkerState
|
state: ChunkerState,
|
||||||
|
pending_chunk: Option<Chunk>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> WebmChunker<S> {
|
impl<S> WebmChunker<S> {
|
||||||
|
@ -136,6 +131,9 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
||||||
let mut chunker = self.get_mut();
|
let mut chunker = self.get_mut();
|
||||||
|
if chunker.pending_chunk.is_some() {
|
||||||
|
return Ready(chunker.pending_chunk.take().map(Ok));
|
||||||
|
}
|
||||||
loop {
|
loop {
|
||||||
match chunker.state {
|
match chunker.state {
|
||||||
ChunkerState::BuildingHeader(ref mut buffer) => {
|
ChunkerState::BuildingHeader(ref mut buffer) => {
|
||||||
|
@ -178,10 +176,8 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
|
||||||
let mut new_header_cursor = Cursor::new(Vec::new());
|
let mut new_header_cursor = Cursor::new(Vec::new());
|
||||||
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
|
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{
|
chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
|
||||||
body: liberated_buffer.into_inner(),
|
chunker.state = ChunkerState::BuildingHeader(new_header_cursor);
|
||||||
new_header: new_header_cursor
|
|
||||||
};
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -194,7 +190,7 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
|
||||||
chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner());
|
chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
||||||
},
|
},
|
||||||
WebmElement::Timecode(timecode) => {
|
WebmElement::Timecode(timecode) => {
|
||||||
|
@ -226,34 +222,12 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
|
||||||
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
|
||||||
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
|
||||||
|
|
||||||
chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner());
|
chunker.pending_chunk = Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer.into_inner())});
|
||||||
|
chunker.state = ChunkerState::End;
|
||||||
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ChunkerState::EmittingClusterBody(ref mut buffer) => {
|
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::BuildingCluster(
|
|
||||||
ClusterHead::new(0),
|
|
||||||
Cursor::new(Vec::new())
|
|
||||||
);
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
|
|
||||||
},
|
|
||||||
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
|
|
||||||
let liberated_body = mem::replace(body, Vec::new());
|
|
||||||
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
|
|
||||||
},
|
|
||||||
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
|
|
||||||
// flush final Cluster on end of stream
|
|
||||||
let liberated_buffer = mem::replace(buffer, Vec::new());
|
|
||||||
|
|
||||||
chunker.state = ChunkerState::End;
|
|
||||||
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
|
|
||||||
},
|
|
||||||
ChunkerState::End => return Ready(None)
|
ChunkerState::End => return Ready(None)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -271,7 +245,8 @@ impl<S: Stream> WebmStream for EbmlStreamingParser<S> {
|
||||||
WebmChunker {
|
WebmChunker {
|
||||||
source: self,
|
source: self,
|
||||||
buffer_size_limit: None,
|
buffer_size_limit: None,
|
||||||
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new()))
|
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
|
||||||
|
pending_chunk: None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue