Make WebmChunker use WebmetroError

This commit is contained in:
Tangent 128 2018-04-13 19:16:34 -04:00
parent bf56789810
commit 2170096a21
2 changed files with 14 additions and 34 deletions

View File

@ -1,12 +1,11 @@
use futures::{Async, Stream}; use futures::{Async, Stream};
use std::{ use std::{
error::Error,
fmt::{Display, Formatter, Result as FmtResult},
io::Cursor, io::Cursor,
mem, mem,
sync::Arc sync::Arc
}; };
use ebml::EbmlEventSource; use ebml::EbmlEventSource;
use error::WebmetroError;
use webm::*; use webm::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -90,37 +89,18 @@ enum ChunkerState {
End End
} }
#[derive(Debug)]
pub enum ChunkingError<E> {
IoError(::std::io::Error),
OtherError(E)
}
impl<E: Display + Error> Display for ChunkingError<E> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "Chunking error: {}", self.description())
}
}
impl<E: Error> Error for ChunkingError<E> {
fn description(&self) -> &str {
match self {
&ChunkingError::IoError(ref err) => err.description(),
&ChunkingError::OtherError(ref err) => err.description()
}
}
}
pub struct WebmChunker<S> { pub struct WebmChunker<S> {
source: S, source: S,
state: ChunkerState state: ChunkerState
} }
impl<S: EbmlEventSource> Stream for WebmChunker<S> impl<S: EbmlEventSource> Stream for WebmChunker<S>
where S::Error: Into<WebmetroError>
{ {
type Item = Chunk; type Item = Chunk;
type Error = ChunkingError<S::Error>; type Error = WebmetroError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> {
loop { loop {
let mut return_value = None; let mut return_value = None;
let mut new_state = None; let mut new_state = None;
@ -128,7 +108,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
match self.state { match self.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
match self.source.poll_event() { match self.source.poll_event() {
Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Err(passthru) => return Err(passthru.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
@ -148,7 +128,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
match encode_webm_element(element, buffer) { match encode_webm_element(element, buffer) {
Ok(_) => {}, Ok(_) => {},
Err(err) => { Err(err) => {
return_value = Some(Err(ChunkingError::IoError(err))); return_value = Some(Err(err.into()));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
} }
} }
@ -157,7 +137,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
}, },
ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
match self.source.poll_event() { match self.source.poll_event() {
Err(passthru) => return Err(ChunkingError::OtherError(passthru)), Err(passthru) => return Err(passthru.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) Ok(Async::Ready(Some(element @ WebmElement::EbmlHead)))
| Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => {
@ -174,7 +154,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
}); });
}, },
Err(err) => { Err(err) => {
return_value = Some(Err(ChunkingError::IoError(err))); return_value = Some(Err(err.into()));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
} }
} }
@ -198,7 +178,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) { match encode_webm_element(WebmElement::SimpleBlock(*block), buffer) {
Ok(_) => {}, Ok(_) => {},
Err(err) => { Err(err) => {
return_value = Some(Err(ChunkingError::IoError(err))); return_value = Some(Err(err.into()));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
} }
} }
@ -210,7 +190,7 @@ impl<S: EbmlEventSource> Stream for WebmChunker<S>
match encode_webm_element(element, buffer) { match encode_webm_element(element, buffer) {
Ok(_) => {}, Ok(_) => {},
Err(err) => { Err(err) => {
return_value = Some(Err(ChunkingError::IoError(err))); return_value = Some(Err(err.into()));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
} }
} }

View File

@ -33,7 +33,7 @@ use webmetro::{
Listener, Listener,
Transmitter Transmitter
}, },
chunk::{Chunk, WebmStream, ChunkingError}, chunk::{Chunk, WebmStream},
error::WebmetroError, error::WebmetroError,
fixers::ChunkStream, fixers::ChunkStream,
stream_parser::StreamEbml stream_parser::StreamEbml
@ -65,13 +65,13 @@ impl RelayServer {
let sink = Transmitter::new(self.get_channel()); let sink = Transmitter::new(self.get_channel());
Box::new( Box::new(
source.forward(sink.sink_map_err(|err| match err {})) source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
.into_stream() .into_stream()
.map(|_| empty()) .map(|_| empty())
.map_err(|err| { .map_err(|err| {
let io_err = match err { let io_err = match err {
ChunkingError::IoError(io_err) => io_err, WebmetroError::IoError(io_err) => io_err,
ChunkingError::OtherError(_) => ErrorKind::InvalidData.into() _ => ErrorKind::InvalidData.into()
}; };
println!("Post failed: {}", &io_err); println!("Post failed: {}", &io_err);
io_err io_err