From 32cf6dd2efe57ee1199bcb4142410eb58e5d0d18 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Mon, 16 Apr 2018 01:16:03 -0400 Subject: [PATCH] Add buffer size limit to chunker --- src/chunk.rs | 56 +++++++++++++++++++++++++------------------ src/commands/relay.rs | 3 ++- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index 0c514c6..3f6df32 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -91,9 +91,27 @@ enum ChunkerState { pub struct WebmChunker { source: S, + buffer_size_limit: Option, state: ChunkerState } +impl WebmChunker { + pub fn with_buffer_limit(mut self, limit: usize) -> Self { + self.buffer_size_limit = Some(limit); + self + } +} + +fn encode(element: WebmElement, buffer: &mut Cursor>, limit: Option) -> Result<(), WebmetroError> { + if let Some(limit) = limit { + if limit <= buffer.get_ref().len() { + return Err(WebmetroError::ResourcesExceeded); + } + } + + encode_webm_element(element, buffer).map_err(|err| err.into()) +} + impl Stream for WebmChunker where S::Error: Into { @@ -125,13 +143,10 @@ where S::Error: Into Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - match encode_webm_element(element, buffer) { - Ok(_) => {}, - Err(err) => { - return_value = Some(Err(err.into())); - new_state = Some(ChunkerState::End); - } - } + encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); } } }, @@ -145,7 +160,7 @@ where S::Error: Into let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let mut new_header_cursor = Cursor::new(Vec::new()); - match encode_webm_element(element, &mut new_header_cursor) { + match encode(element, &mut new_header_cursor, self.buffer_size_limit) { Ok(_) => { return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ @@ -154,7 +169,7 @@ where S::Error: Into }); }, Err(err) => { - return_value = Some(Err(err.into())); + return_value = Some(Err(err)); new_state = Some(ChunkerState::End); } } @@ -175,25 +190,19 @@ where S::Error: Into cluster_head.keyframe = true; } cluster_head.observe_simpleblock_timecode(block.timecode); - match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) { - Ok(_) => {}, - Err(err) => { - return_value = Some(Err(err.into())); - new_state = Some(ChunkerState::End); - } - } + encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); }, Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(element))) => { - match encode_webm_element(element, buffer) { - Ok(_) => {}, - Err(err) => { - return_value = Some(Err(err.into())); - new_state = Some(ChunkerState::End); - } - } + encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); }, Ok(Async::Ready(None)) => { // flush final Cluster on end of stream @@ -245,6 +254,7 @@ pub trait WebmStream where Self: Sized + EbmlEventSource { fn chunk_webm(self) -> WebmChunker { WebmChunker { source: self, + buffer_size_limit: None, state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) } } diff --git a/src/commands/relay.rs b/src/commands/relay.rs index be7e50d..d213601 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -64,7 +64,8 @@ impl RelayServer { where S::Error: Error + Send { let source = stream .map_err(WebmetroError::from_err) - .parse_ebml().with_buffer_limit(BUFFER_LIMIT).chunk_webm(); + .parse_ebml().with_buffer_limit(BUFFER_LIMIT) + .chunk_webm().with_buffer_limit(BUFFER_LIMIT); let sink = Transmitter::new(self.get_channel()); Box::new(