diff --git a/Cargo.lock b/Cargo.lock index 80164a3..20eb6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1635,6 +1635,7 @@ dependencies = [ "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index ccbfe32..28195e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ futures = "0.1.29" futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] } http = "0.1.18" hyper = "0.12.35" +matches = "0.1.8" odds = { version = "0.3.1", features = ["std-vec"] } tokio = "0.1.22" tokio2 = { package = "tokio", version="0.2.0-alpha.6" } diff --git a/src/async_parser.rs b/src/async_parser.rs new file mode 100644 index 0000000..e650ddb --- /dev/null +++ b/src/async_parser.rs @@ -0,0 +1,108 @@ +use bytes::{Bytes, BytesMut}; +use std::future::Future; + +use crate::ebml::FromEbml; +use crate::error::WebmetroError; + +#[derive(Default)] +pub struct EbmlParser { + buffer: BytesMut, + buffer_size_limit: Option, + borrowed: Bytes +} + +impl EbmlParser { + /// add a "soft" buffer size limit; if the input buffer exceeds this size, + /// error the stream instead of resuming. It's still possible for the buffer + /// to exceed this size *after* a fill, so ensure input sizes are reasonable. + pub fn with_soft_limit(mut self, limit: usize) -> Self { + self.buffer_size_limit = Some(limit); + self + } + + pub fn feed(&mut self, bytes: impl AsRef<[u8]>) { + self.buffer.extend_from_slice(bytes.as_ref()) + } + + pub fn next_element<'a, T: FromEbml<'a>>(&'a mut self) -> Result, WebmetroError> { + Ok(match T::check_space(&self.buffer)? { + None => None, + Some(info) => { + let mut bytes = self.buffer.split_to(info.element_len).freeze(); + bytes.advance(info.body_offset); + self.borrowed = bytes; + Some(T::decode(info.element_id, &self.borrowed)?) + } + }) + } + + pub async fn next_element_with_feeder< + 'a, + T: FromEbml<'a>, + F: FnMut() -> Fut, + Fut: Future>, + >( + &'a mut self, + mut feeder: F, + ) -> Result, WebmetroError> { + loop { + if let Some(_) = T::check_space(&self.buffer)? { + return self.next_element(); + } + + if let Some(limit) = self.buffer_size_limit { + if limit <= self.buffer.len() { + // hit our buffer limit and still nothing parsed + return Err(WebmetroError::ResourcesExceeded); + } + } + + self.buffer.extend(feeder().await?); + } + } +} + +#[cfg(test)] +mod tests { + use matches::assert_matches; + + use crate::async_parser::*; + use crate::tests::ENCODE_WEBM_TEST_FILE; + use crate::webm::*; + + #[test] + fn async_webm_test() { + let pieces = vec![ + &ENCODE_WEBM_TEST_FILE[0..20], + &ENCODE_WEBM_TEST_FILE[20..40], + &ENCODE_WEBM_TEST_FILE[40..], + ]; + + let mut piece_iter = pieces.iter(); + + let result: Result<_, WebmetroError> = futures3::executor::block_on(async { + let mut next = || { + let result = if let Some(bytes) = piece_iter.next() { + Ok(Bytes::from(*bytes)) + } else { + Err("End of input".into()) + }; + async { result } + }; + + let mut parser = EbmlParser::default(); + + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::EbmlHead)); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Segment)); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Tracks(_))); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster)); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(0))); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::SimpleBlock(_))); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster)); + assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(1000))); + + Ok(()) + }); + result.unwrap(); + } +} diff --git a/src/ebml.rs b/src/ebml.rs index 5815591..8844a5f 100644 --- a/src/ebml.rs +++ b/src/ebml.rs @@ -198,9 +198,9 @@ pub fn encode_integer(tag: u64, value: u64, output: &mut T) -> IoResul } pub struct EbmlLayout { - element_id: u64, - body_offset: usize, - element_len: usize, + pub element_id: u64, + pub body_offset: usize, + pub element_len: usize, } pub trait FromEbml<'a>: Sized { diff --git a/src/lib.rs b/src/lib.rs index 64e234c..84a89d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ pub mod ebml; pub mod error; + +pub mod async_parser; pub mod iterator; pub mod stream_parser;