rename EbmlStreamingParser to reflect genericity, change construction to Stream extension

This commit is contained in:
Tangent 128 2018-04-04 01:36:15 -04:00
parent 8189794287
commit be92d04c09
3 changed files with 16 additions and 10 deletions

View file

@ -11,7 +11,7 @@ use futures::stream::repeat;
use futures::stream::Stream; use futures::stream::Stream;
use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError}; use lab_ebml::chunk::{Chunk, WebmStream, ChunkingError};
use lab_ebml::timecode_fixer::ChunkStream; use lab_ebml::timecode_fixer::ChunkStream;
use lab_ebml::webm_stream::WebmBuffer; use lab_ebml::stream_parser::StreamEbml;
use hyper::{Get, StatusCode}; use hyper::{Get, StatusCode};
use hyper::header::ContentType; use hyper::header::ContentType;
use hyper::server::{Http, Request, Response, Service}; use hyper::server::{Http, Request, Response, Service};
@ -34,7 +34,7 @@ impl Service for WebmServer {
let stream: BodyStream<Vec<u8>> = Box::new( let stream: BodyStream<Vec<u8>> = Box::new(
repeat(()).take(3) repeat(()).take(3)
.map(|()| { .map(|()| {
WebmBuffer::new(once::<&[u8], ()>(Ok(SRC_FILE))).chunk_webm() once::<&[u8], ()>(Ok(SRC_FILE)).parse_ebml().chunk_webm()
}).flatten() }).flatten()
.fix_timecodes() .fix_timecodes()
.map_err(|err| match err { .map_err(|err| match err {

View file

@ -6,8 +6,8 @@ pub mod chunk;
pub mod ebml; pub mod ebml;
mod iterator; mod iterator;
pub mod slice; pub mod slice;
pub mod stream_parser;
pub mod webm_stream;
pub mod timecode_fixer; pub mod timecode_fixer;
pub mod webm; pub mod webm;

View file

@ -12,21 +12,27 @@ pub enum ParsingError<E> {
OtherError(E) OtherError(E)
} }
pub struct WebmBuffer<S> { pub struct EbmlStreamingParser<S> {
stream: S, stream: S,
buffer: BytesMut, buffer: BytesMut,
last_read: usize last_read: usize
} }
impl<I: AsRef<[u8]>, S: Stream<Item = I>> WebmBuffer<S> { pub trait StreamEbml<I: AsRef<[u8]>, S: Stream<Item = I>> {
pub fn new(stream: S) -> Self { fn parse_ebml(self) -> EbmlStreamingParser<S>;
WebmBuffer { }
stream: stream,
impl<I: AsRef<[u8]>, S: Stream<Item = I>> StreamEbml<I, S> for S {
fn parse_ebml(self) -> EbmlStreamingParser<S> {
EbmlStreamingParser {
stream: self,
buffer: BytesMut::new(), buffer: BytesMut::new(),
last_read: 0 last_read: 0
} }
} }
}
impl<I: AsRef<[u8]>, S: Stream<Item = I>> EbmlStreamingParser<S> {
pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, ParsingError<S::Error>> { pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, ParsingError<S::Error>> {
// release buffer from previous event // release buffer from previous event
self.buffer.advance(self.last_read); self.buffer.advance(self.last_read);
@ -72,11 +78,11 @@ impl<I: AsRef<[u8]>, S: Stream<Item = I>> WebmBuffer<S> {
} }
} }
impl<I: AsRef<[u8]>, S: Stream<Item = I>> EbmlEventSource for WebmBuffer<S> { impl<I: AsRef<[u8]>, S: Stream<Item = I>> EbmlEventSource for EbmlStreamingParser<S> {
type Error = ParsingError<S::Error>; type Error = ParsingError<S::Error>;
fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, Self::Error> { fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Async<Option<T>>, Self::Error> {
return WebmBuffer::poll_event(self); return EbmlStreamingParser::poll_event(self);
} }
} }