From 417cbf49c79fc99f2c69e2bacb7f79080295cc95 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Fri, 30 Mar 2018 01:33:13 -0400 Subject: [PATCH] Create EbmlEventSource trait since Iterators/Streams can't return borrows --- src/bin/loop_server.rs | 11 +++++------ src/chunk.rs | 28 ++++++++++++++++------------ src/ebml.rs | 7 +++++++ src/iterator.rs | 17 +++++++++++++++++ 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/bin/loop_server.rs b/src/bin/loop_server.rs index d2f18e8..34031a7 100644 --- a/src/bin/loop_server.rs +++ b/src/bin/loop_server.rs @@ -3,7 +3,7 @@ extern crate hyper; extern crate lab_ebml; use futures::future::FutureResult; -use futures::stream::{iter, Stream}; +use futures::stream::Stream; use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError}; use lab_ebml::timecode_fixer::ChunkStream; use lab_ebml::webm::*; @@ -28,13 +28,12 @@ impl Service for WebmServer { fn call(&self, req: Request) -> Self::Future { let response = match (req.method(), req.path()) { (&Get, "/loop") => { - let stream: BodyStream> = iter(parse_webm(SRC_FILE).into_iter().map(|x| Ok(x))) - .chunk_webm() - .chain(iter(parse_webm(SRC_FILE).into_iter().map(|x| Ok(x))).chunk_webm()) + let stream: BodyStream> = parse_webm(SRC_FILE).into_iter().chunk_webm() + .chain(parse_webm(SRC_FILE).into_iter().chunk_webm()) .fix_timecodes() .map_err(|err| match err { ChunkingError::IoError(io_err) => hyper::Error::Io(io_err), - ChunkingError::OtherError(otx_err) => otx_err + ChunkingError::OtherError(_) => hyper::Error::Incomplete }) .boxed(); Response::new() @@ -51,6 +50,6 @@ impl Service for WebmServer { } pub fn main() { - let addr = args().nth(1).unwrap().to_socket_addrs().unwrap().next().unwrap(); + let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap(); Http::new().bind(&addr, move || Ok(WebmServer)).unwrap().run().unwrap(); } diff --git a/src/chunk.rs b/src/chunk.rs index 9511b88..5541c6c 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,7 +1,9 @@ use futures::{Async, Stream}; use std::io::Cursor; +use std::marker::PhantomData; use std::mem; use std::sync::Arc; +use ebml::EbmlEventSource; use webm::*; #[derive(Clone)] @@ -85,12 +87,13 @@ pub enum ChunkingError { OtherError(E) } -pub struct WebmChunker { - stream: S, - state: ChunkerState +pub struct WebmChunker<'a, S: EbmlEventSource<'a>> { + source: S, + state: ChunkerState, + _marker: PhantomData<&'a [u8]> } -impl<'a, S: Stream>> Stream for WebmChunker +impl<'a, S: EbmlEventSource<'a, Event = WebmElement<'a>>> Stream for WebmChunker<'a, S> { type Item = Chunk; type Error = ChunkingError; @@ -99,7 +102,7 @@ impl<'a, S: Stream>> Stream for WebmChunker loop { let (return_value, next_state) = match self.state { ChunkerState::BuildingHeader(ref mut buffer) => { - match self.stream.poll() { + match self.source.poll_event() { Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), @@ -126,7 +129,7 @@ impl<'a, S: Stream>> Stream for WebmChunker } }, ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { - match self.stream.poll() { + match self.source.poll_event() { Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(Some(WebmElement::Cluster))) => { @@ -205,15 +208,16 @@ impl<'a, S: Stream>> Stream for WebmChunker } } -pub trait WebmStream { - fn chunk_webm(self) -> WebmChunker; +pub trait WebmStream<'a, T: EbmlEventSource<'a, Event = WebmElement<'a>>> { + fn chunk_webm(self) -> WebmChunker<'a, T>; } -impl<'a, T: Stream>> WebmStream for T { - fn chunk_webm(self) -> WebmChunker { +impl<'a, T: EbmlEventSource<'a, Event = WebmElement<'a>>> WebmStream<'a, T> for T { + fn chunk_webm(self) -> WebmChunker<'a, T> { WebmChunker { - stream: self, - state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) + source: self, + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())), + _marker: PhantomData } } } diff --git a/src/ebml.rs b/src/ebml.rs index e0930e1..a4ff6fe 100644 --- a/src/ebml.rs +++ b/src/ebml.rs @@ -1,4 +1,5 @@ use bytes::{BigEndian, ByteOrder, BufMut}; +use futures::Async; use std::error::Error as ErrorTrait; use std::fmt::{Display, Formatter, Result as FmtResult}; use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; @@ -259,6 +260,12 @@ pub trait FromEbml<'b>: Sized { } } +pub trait EbmlEventSource<'a> { + type Event: FromEbml<'a>; + type Error; + fn poll_event(&mut self) -> Result>, Self::Error>; +} + #[cfg(test)] mod tests { use bytes::{BytesMut}; diff --git a/src/iterator.rs b/src/iterator.rs index de7c961..7cdd00d 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use futures::Async; use ebml::*; pub struct EbmlIterator<'b, T: FromEbml<'b>> { @@ -35,3 +36,19 @@ impl<'b, T: FromEbml<'b>> Iterator for EbmlIterator<'b, T> { } } } + +impl<'a, T: FromEbml<'a>> EbmlEventSource<'a> for EbmlIterator<'a, T> { + type Event = T; + type Error = Error; + + fn poll_event(&mut self) -> Result>, Error> { + match Self::Event::decode_element(&self.slice[self.position..]) { + Err(err) => Err(err), + Ok(None) => Ok(Async::Ready(None)), + Ok(Some((element, element_size))) => { + self.position += element_size; + Ok(Async::Ready(Some(element))) + } + } + } +}