diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 37d97c6..be7e50d 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -40,6 +40,8 @@ use webmetro::{ use super::to_hyper_error; +const BUFFER_LIMIT: usize = 2 * 1024 * 1024; + type BodyStream = Box>; struct RelayServer(Arc>); @@ -62,7 +64,7 @@ impl RelayServer { where S::Error: Error + Send { let source = stream .map_err(WebmetroError::from_err) - .parse_ebml().chunk_webm(); + .parse_ebml().with_buffer_limit(BUFFER_LIMIT).chunk_webm(); let sink = Transmitter::new(self.get_channel()); Box::new( diff --git a/src/error.rs b/src/error.rs index ad11148..40f2007 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,7 @@ use ebml::EbmlError; #[derive(Debug)] pub enum WebmetroError { + ResourcesExceeded, EbmlError(EbmlError), IoError(IoError), Unknown(Box) @@ -30,6 +31,7 @@ impl WebmetroError { impl Display for WebmetroError { fn fmt(&self, f: &mut Formatter) -> FmtResult { match self { + &WebmetroError::ResourcesExceeded => write!(f, "resources exceeded"), &WebmetroError::EbmlError(ref err) => err.fmt(f), &WebmetroError::IoError(ref err) => err.fmt(f), &WebmetroError::Unknown(ref err) => err.fmt(f), @@ -39,6 +41,7 @@ impl Display for WebmetroError { impl Error for WebmetroError { fn description(&self) -> &str { match self { + &WebmetroError::ResourcesExceeded => "resources exceeded", &WebmetroError::EbmlError(ref err) => err.description(), &WebmetroError::IoError(ref err) => err.description(), &WebmetroError::Unknown(ref err) => err.description(), diff --git a/src/stream_parser.rs b/src/stream_parser.rs index c1e4665..58d215b 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -10,14 +10,23 @@ use error::WebmetroError; pub struct EbmlStreamingParser { stream: S, buffer: BytesMut, + buffer_size_limit: Option, last_read: usize } +impl EbmlStreamingParser { + pub fn with_buffer_limit(mut self, limit: usize) -> Self { + self.buffer_size_limit = Some(limit); + self + } +} + pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> { fn parse_ebml(self) -> EbmlStreamingParser { EbmlStreamingParser { stream: self, buffer: BytesMut::new(), + buffer_size_limit: None, last_read: 0 } } @@ -48,6 +57,12 @@ impl, S: Stream> EbmlStreamingPa }) } + if let Some(limit) = self.buffer_size_limit { + if limit <= self.buffer.len() { + return Err(WebmetroError::ResourcesExceeded); + } + } + match self.stream.poll() { Ok(Async::Ready(Some(chunk))) => { self.buffer.reserve(chunk.as_ref().len());