Create EbmlEventSource trait since Iterators/Streams can't return borrows

This commit is contained in:
Tangent 128 2018-03-30 01:33:13 -04:00
parent f8db95e61e
commit 417cbf49c7
4 changed files with 45 additions and 18 deletions

View file

@ -3,7 +3,7 @@ extern crate hyper;
extern crate lab_ebml; extern crate lab_ebml;
use futures::future::FutureResult; use futures::future::FutureResult;
use futures::stream::{iter, Stream}; use futures::stream::Stream;
use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError}; use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError};
use lab_ebml::timecode_fixer::ChunkStream; use lab_ebml::timecode_fixer::ChunkStream;
use lab_ebml::webm::*; use lab_ebml::webm::*;
@ -28,13 +28,12 @@ impl Service for WebmServer {
fn call(&self, req: Request) -> Self::Future { fn call(&self, req: Request) -> Self::Future {
let response = match (req.method(), req.path()) { let response = match (req.method(), req.path()) {
(&Get, "/loop") => { (&Get, "/loop") => {
let stream: BodyStream<Vec<u8>> = iter(parse_webm(SRC_FILE).into_iter().map(|x| Ok(x))) let stream: BodyStream<Vec<u8>> = parse_webm(SRC_FILE).into_iter().chunk_webm()
.chunk_webm() .chain(parse_webm(SRC_FILE).into_iter().chunk_webm())
.chain(iter(parse_webm(SRC_FILE).into_iter().map(|x| Ok(x))).chunk_webm())
.fix_timecodes() .fix_timecodes()
.map_err(|err| match err { .map_err(|err| match err {
ChunkingError::IoError(io_err) => hyper::Error::Io(io_err), ChunkingError::IoError(io_err) => hyper::Error::Io(io_err),
ChunkingError::OtherError(otx_err) => otx_err ChunkingError::OtherError(_) => hyper::Error::Incomplete
}) })
.boxed(); .boxed();
Response::new() Response::new()
@ -51,6 +50,6 @@ impl Service for WebmServer {
} }
pub fn main() { pub fn main() {
let addr = args().nth(1).unwrap().to_socket_addrs().unwrap().next().unwrap(); let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap();
Http::new().bind(&addr, move || Ok(WebmServer)).unwrap().run().unwrap(); Http::new().bind(&addr, move || Ok(WebmServer)).unwrap().run().unwrap();
} }

View file

@ -1,7 +1,9 @@
use futures::{Async, Stream}; use futures::{Async, Stream};
use std::io::Cursor; use std::io::Cursor;
use std::marker::PhantomData;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use ebml::EbmlEventSource;
use webm::*; use webm::*;
#[derive(Clone)] #[derive(Clone)]
@ -85,12 +87,13 @@ pub enum ChunkingError<E> {
OtherError(E) OtherError(E)
} }
pub struct WebmChunker<S> { pub struct WebmChunker<'a, S: EbmlEventSource<'a>> {
stream: S, source: S,
state: ChunkerState state: ChunkerState,
_marker: PhantomData<&'a [u8]>
} }
impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S> impl<'a, S: EbmlEventSource<'a, Event = WebmElement<'a>>> Stream for WebmChunker<'a, S>
{ {
type Item = Chunk; type Item = Chunk;
type Error = ChunkingError<S::Error>; type Error = ChunkingError<S::Error>;
@ -99,7 +102,7 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
loop { loop {
let (return_value, next_state) = match self.state { let (return_value, next_state) = match self.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
match self.stream.poll() { match self.source.poll_event() {
Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
@ -126,7 +129,7 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
} }
}, },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match self.stream.poll() { match self.source.poll_event() {
Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Err(passthru) => return Err(ChunkingError::OtherError(passthru)),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
@ -205,15 +208,16 @@ impl<'a, S: Stream<Item = WebmElement<'a>>> Stream for WebmChunker<S>
} }
} }
pub trait WebmStream<T> { pub trait WebmStream<'a, T: EbmlEventSource<'a, Event = WebmElement<'a>>> {
fn chunk_webm(self) -> WebmChunker<T>; fn chunk_webm(self) -> WebmChunker<'a, T>;
} }
impl<'a, T: Stream<Item = WebmElement<'a>>> WebmStream<T> for T { impl<'a, T: EbmlEventSource<'a, Event = WebmElement<'a>>> WebmStream<'a, T> for T {
fn chunk_webm(self) -> WebmChunker<T> { fn chunk_webm(self) -> WebmChunker<'a, T> {
WebmChunker { WebmChunker {
stream: self, source: self,
state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())),
_marker: PhantomData
} }
} }
} }

View file

@ -1,4 +1,5 @@
use bytes::{BigEndian, ByteOrder, BufMut}; use bytes::{BigEndian, ByteOrder, BufMut};
use futures::Async;
use std::error::Error as ErrorTrait; use std::error::Error as ErrorTrait;
use std::fmt::{Display, Formatter, Result as FmtResult}; use std::fmt::{Display, Formatter, Result as FmtResult};
use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom};
@ -259,6 +260,12 @@ pub trait FromEbml<'b>: Sized {
} }
} }
pub trait EbmlEventSource<'a> {
type Event: FromEbml<'a>;
type Error;
fn poll_event(&mut self) -> Result<Async<Option<Self::Event>>, Self::Error>;
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use bytes::{BytesMut}; use bytes::{BytesMut};

View file

@ -1,4 +1,5 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::Async;
use ebml::*; use ebml::*;
pub struct EbmlIterator<'b, T: FromEbml<'b>> { pub struct EbmlIterator<'b, T: FromEbml<'b>> {
@ -35,3 +36,19 @@ impl<'b, T: FromEbml<'b>> Iterator for EbmlIterator<'b, T> {
} }
} }
} }
impl<'a, T: FromEbml<'a>> EbmlEventSource<'a> for EbmlIterator<'a, T> {
type Event = T;
type Error = Error;
fn poll_event(&mut self) -> Result<Async<Option<T>>, Error> {
match Self::Event::decode_element(&self.slice[self.position..]) {
Err(err) => Err(err),
Ok(None) => Ok(Async::Ready(None)),
Ok(Some((element, element_size))) => {
self.position += element_size;
Ok(Async::Ready(Some(element)))
}
}
}
}