Add buffer size limit to chunker

This commit is contained in:
Tangent 128 2018-04-16 01:16:03 -04:00
parent 982c5c2dcb
commit 32cf6dd2ef
2 changed files with 35 additions and 24 deletions

View file

@ -91,9 +91,27 @@ enum ChunkerState {
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: S, source: S,
buffer_size_limit: Option<usize>,
state: ChunkerState state: ChunkerState
} }
impl<S> WebmChunker<S> {
pub fn with_buffer_limit(mut self, limit: usize) -> Self {
self.buffer_size_limit = Some(limit);
self
}
}
fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usize>) -> Result<(), WebmetroError> {
if let Some(limit) = limit {
if limit <= buffer.get_ref().len() {
return Err(WebmetroError::ResourcesExceeded);
}
}
encode_webm_element(element, buffer).map_err(|err| err.into())
}
impl<S: EbmlEventSource> Stream for WebmChunker<S> impl<S: EbmlEventSource> Stream for WebmChunker<S>
where S::Error: Into<WebmetroError> where S::Error: Into<WebmetroError>
{ {
@ -125,13 +143,10 @@ where S::Error: Into<WebmetroError>
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
match encode_webm_element(element, buffer) { encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| {
Ok(_) => {}, return_value = Some(Err(err));
Err(err) => { new_state = Some(ChunkerState::End);
return_value = Some(Err(err.into())); });
new_state = Some(ChunkerState::End);
}
}
} }
} }
}, },
@ -145,7 +160,7 @@ where S::Error: Into<WebmetroError>
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode_webm_element(element, &mut new_header_cursor) { match encode(element, &mut new_header_cursor, self.buffer_size_limit) {
Ok(_) => { Ok(_) => {
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))));
new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{
@ -154,7 +169,7 @@ where S::Error: Into<WebmetroError>
}); });
}, },
Err(err) => { Err(err) => {
return_value = Some(Err(err.into())); return_value = Some(Err(err));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
} }
} }
@ -175,25 +190,19 @@ where S::Error: Into<WebmetroError>
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) { encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| {
Ok(_) => {}, return_value = Some(Err(err));
Err(err) => { new_state = Some(ChunkerState::End);
return_value = Some(Err(err.into())); });
new_state = Some(ChunkerState::End);
}
}
}, },
Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Info))) => {},
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
match encode_webm_element(element, buffer) { encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| {
Ok(_) => {}, return_value = Some(Err(err));
Err(err) => { new_state = Some(ChunkerState::End);
return_value = Some(Err(err.into())); });
new_state = Some(ChunkerState::End);
}
}
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
@ -245,6 +254,7 @@ pub trait WebmStream where Self: Sized + EbmlEventSource {
fn chunk_webm(self) -> WebmChunker<Self> { fn chunk_webm(self) -> WebmChunker<Self> {
WebmChunker { WebmChunker {
source: self, source: self,
buffer_size_limit: None,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) state: ChunkerState::BuildingHeader(Cursor::new(Vec::new()))
} }
} }

View file

@ -64,7 +64,8 @@ impl RelayServer {
where S::Error: Error + Send { where S::Error: Error + Send {
let source = stream let source = stream
.map_err(WebmetroError::from_err) .map_err(WebmetroError::from_err)
.parse_ebml().with_buffer_limit(BUFFER_LIMIT).chunk_webm(); .parse_ebml().with_buffer_limit(BUFFER_LIMIT)
.chunk_webm().with_buffer_limit(BUFFER_LIMIT);
let sink = Transmitter::new(self.get_channel()); let sink = Transmitter::new(self.get_channel());
Box::new( Box::new(