From d9e9b9e49cb1c949cbf5f5a9774eef4f7011b73f Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Mon, 21 Oct 2019 04:03:52 -0400 Subject: [PATCH] Give stream_parser an async next() method, making async_parser no longer interesting. --- src/async_parser.rs | 108 ------------------------------------------- src/stream_parser.rs | 68 ++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 114 deletions(-) delete mode 100644 src/async_parser.rs diff --git a/src/async_parser.rs b/src/async_parser.rs deleted file mode 100644 index e650ddb..0000000 --- a/src/async_parser.rs +++ /dev/null @@ -1,108 +0,0 @@ -use bytes::{Bytes, BytesMut}; -use std::future::Future; - -use crate::ebml::FromEbml; -use crate::error::WebmetroError; - -#[derive(Default)] -pub struct EbmlParser { - buffer: BytesMut, - buffer_size_limit: Option, - borrowed: Bytes -} - -impl EbmlParser { - /// add a "soft" buffer size limit; if the input buffer exceeds this size, - /// error the stream instead of resuming. It's still possible for the buffer - /// to exceed this size *after* a fill, so ensure input sizes are reasonable. - pub fn with_soft_limit(mut self, limit: usize) -> Self { - self.buffer_size_limit = Some(limit); - self - } - - pub fn feed(&mut self, bytes: impl AsRef<[u8]>) { - self.buffer.extend_from_slice(bytes.as_ref()) - } - - pub fn next_element<'a, T: FromEbml<'a>>(&'a mut self) -> Result, WebmetroError> { - Ok(match T::check_space(&self.buffer)? { - None => None, - Some(info) => { - let mut bytes = self.buffer.split_to(info.element_len).freeze(); - bytes.advance(info.body_offset); - self.borrowed = bytes; - Some(T::decode(info.element_id, &self.borrowed)?) - } - }) - } - - pub async fn next_element_with_feeder< - 'a, - T: FromEbml<'a>, - F: FnMut() -> Fut, - Fut: Future>, - >( - &'a mut self, - mut feeder: F, - ) -> Result, WebmetroError> { - loop { - if let Some(_) = T::check_space(&self.buffer)? { - return self.next_element(); - } - - if let Some(limit) = self.buffer_size_limit { - if limit <= self.buffer.len() { - // hit our buffer limit and still nothing parsed - return Err(WebmetroError::ResourcesExceeded); - } - } - - self.buffer.extend(feeder().await?); - } - } -} - -#[cfg(test)] -mod tests { - use matches::assert_matches; - - use crate::async_parser::*; - use crate::tests::ENCODE_WEBM_TEST_FILE; - use crate::webm::*; - - #[test] - fn async_webm_test() { - let pieces = vec![ - &ENCODE_WEBM_TEST_FILE[0..20], - &ENCODE_WEBM_TEST_FILE[20..40], - &ENCODE_WEBM_TEST_FILE[40..], - ]; - - let mut piece_iter = pieces.iter(); - - let result: Result<_, WebmetroError> = futures3::executor::block_on(async { - let mut next = || { - let result = if let Some(bytes) = piece_iter.next() { - Ok(Bytes::from(*bytes)) - } else { - Err("End of input".into()) - }; - async { result } - }; - - let mut parser = EbmlParser::default(); - - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::EbmlHead)); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Segment)); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Tracks(_))); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster)); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(0))); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::SimpleBlock(_))); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster)); - assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(1000))); - - Ok(()) - }); - result.unwrap(); - } -} diff --git a/src/stream_parser.rs b/src/stream_parser.rs index ed4a0a4..b9313ac 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -80,15 +80,42 @@ impl> + Unpin> EbmlStreamingPa } } +impl> + Unpin> EbmlStreamingParser { + pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result, WebmetroError> { + loop { + if let Some(info) = T::check_space(&self.buffer)? { + let mut bytes = self.buffer.split_to(info.element_len).freeze(); + bytes.advance(info.body_offset); + self.borrowed = bytes; + return Ok(Some(T::decode(info.element_id, &self.borrowed)?)); + } + + if let Some(limit) = self.buffer_size_limit { + if limit <= self.buffer.len() { + // hit our buffer limit and still nothing parsed + return Err(WebmetroError::ResourcesExceeded); + } + } + + match self.stream.next().await.transpose()? { + Some(refill) => { + self.buffer.reserve(refill.remaining()); + self.buffer.put(refill); + } + None => { + // Nothing left, we're done + return Ok(None); + } + } + } + } +} + #[cfg(test)] mod tests { use bytes::IntoBuf; use futures::Async::*; - use futures3::{ - future::poll_fn, - stream::StreamExt, - FutureExt, - }; + use futures3::{future::poll_fn, stream::StreamExt, FutureExt}; use matches::assert_matches; use crate::stream_parser::*; @@ -144,6 +171,35 @@ mod tests { std::task::Poll::Ready(()) }) .now_or_never() - .expect("Test succeeded without blocking"); + .expect("Test tried to block on I/O"); + } + + #[test] + fn async_webm_test() { + let pieces = vec![ + &ENCODE_WEBM_TEST_FILE[0..20], + &ENCODE_WEBM_TEST_FILE[20..40], + &ENCODE_WEBM_TEST_FILE[40..], + ]; + + async { + let mut parser = futures3::stream::iter(pieces.iter()) + .map(|bytes| Ok(bytes.into_buf())) + .parse_ebml(); + + assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead)); + assert_matches!(parser.next().await?, Some(WebmElement::Segment)); + assert_matches!(parser.next().await?, Some(WebmElement::Tracks(_))); + assert_matches!(parser.next().await?, Some(WebmElement::Cluster)); + assert_matches!(parser.next().await?, Some(WebmElement::Timecode(0))); + assert_matches!(parser.next().await?, Some(WebmElement::SimpleBlock(_))); + assert_matches!(parser.next().await?, Some(WebmElement::Cluster)); + assert_matches!(parser.next().await?, Some(WebmElement::Timecode(1000))); + + Result::<(), WebmetroError>::Ok(()) + } + .now_or_never() + .expect("Test tried to block on I/O") + .expect("Parse failed"); } }