diff --git a/src/chunk.rs b/src/chunk.rs index 60a5066..e678868 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,5 +1,4 @@ use bytes::{Buf, Bytes}; -use futures::{Async}; use futures3::prelude::*; use std::{ io::Cursor, @@ -141,10 +140,10 @@ impl> + Unpin> Stream for Webm match chunker.state { ChunkerState::BuildingHeader(ref mut buffer) => { match chunker.source.poll_event(cx) { - Err(passthru) => return Ready(Some(Err(passthru))), - Ok(Async::NotReady) => return Pending, - Ok(Async::Ready(None)) => return Ready(None), - Ok(Async::Ready(Some(element))) => match element { + Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), + Pending => return Pending, + Ready(None) => return Ready(None), + Ready(Some(Ok(element))) => match element { WebmElement::Cluster => { let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; @@ -169,9 +168,9 @@ impl> + Unpin> Stream for Webm }, ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { match chunker.source.poll_event(cx) { - Err(passthru) => return Ready(Some(Err(passthru))), - Ok(Async::NotReady) => return Pending, - Ok(Async::Ready(Some(element))) => match element { + Ready(Some(Err(passthru))) => return Ready(Some(Err(passthru))), + Pending => return Pending, + Ready(Some(Ok(element))) => match element { WebmElement::EbmlHead | WebmElement::Segment => { let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); @@ -222,7 +221,7 @@ impl> + Unpin> Stream for Webm } }, }, - Ok(Async::Ready(None)) => { + Ready(None) => { // flush final Cluster on end of stream let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); diff --git a/src/stream_parser.rs b/src/stream_parser.rs index b9313ac..a2796f6 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -1,5 +1,4 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::Async; use futures3::stream::{Stream, StreamExt, TryStream}; use std::task::{Context, Poll}; @@ -44,7 +43,7 @@ impl> + Unpin> EbmlStreamingPa pub fn poll_event<'a, T: FromEbml<'a>>( &'a mut self, cx: &mut Context, - ) -> Result>, WebmetroError> { + ) -> Poll>> { loop { match T::check_space(&self.buffer)? { None => { @@ -54,16 +53,16 @@ impl> + Unpin> EbmlStreamingPa let mut bytes = self.buffer.split_to(info.element_len).freeze(); bytes.advance(info.body_offset); self.borrowed = bytes; - return Ok(Async::Ready(Some(T::decode( + return Poll::Ready(Some(T::decode( info.element_id, &self.borrowed, - )?))); + ).map_err(Into::into))); } } if let Some(limit) = self.buffer_size_limit { if limit <= self.buffer.len() { - return Err(WebmetroError::ResourcesExceeded); + return Poll::Ready(Some(Err(WebmetroError::ResourcesExceeded))); } } @@ -73,8 +72,8 @@ impl> + Unpin> EbmlStreamingPa self.buffer.put(buf); // ok can retry decoding now } - Poll::Ready(None) => return Ok(Async::Ready(None)), - Poll::Pending => return Ok(Async::NotReady), + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, } } } @@ -114,9 +113,9 @@ impl> + Unpin> EbmlStreamingPa #[cfg(test)] mod tests { use bytes::IntoBuf; - use futures::Async::*; use futures3::{future::poll_fn, stream::StreamExt, FutureExt}; use matches::assert_matches; + use std::task::Poll::*; use crate::stream_parser::*; use crate::tests::ENCODE_WEBM_TEST_FILE; @@ -137,35 +136,35 @@ mod tests { assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::EbmlHead))) + Ready(Some(Ok(WebmElement::EbmlHead))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Segment))) + Ready(Some(Ok(WebmElement::Segment))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Tracks(_)))) + Ready(Some(Ok(WebmElement::Tracks(_)))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Cluster))) + Ready(Some(Ok(WebmElement::Cluster))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Timecode(0)))) + Ready(Some(Ok(WebmElement::Timecode(0)))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::SimpleBlock(_)))) + Ready(Some(Ok(WebmElement::SimpleBlock(_)))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Cluster))) + Ready(Some(Ok(WebmElement::Cluster))) ); assert_matches!( stream_parser.poll_event(cx), - Ok(Ready(Some(WebmElement::Timecode(1000)))) + Ready(Some(Ok(WebmElement::Timecode(1000)))) ); std::task::Poll::Ready(())