diff --git a/src/stream_parser.rs b/src/stream_parser.rs index de13163..ee8b120 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -40,40 +40,29 @@ impl, S: Stream> EbmlStreamingParser { loop { match T::check_space(&self.buffer) { - Err(err) => { - return Err(ParsingError::EbmlError(err)) - }, Ok(None) => { // need to refill buffer, below }, - Ok(Some(_)) => { - return match T::decode_element(&self.buffer) { - Err(err) => { - Err(ParsingError::EbmlError(err)) - }, - Ok(None) => { - // buffer should have the data already - panic!("Buffer was supposed to have enough data to parse element, somehow did not.") - }, + other => return other.map_err(ParsingError::EbmlError).and_then(move |_| { + match T::decode_element(&self.buffer) { + Err(err) => Err(ParsingError::EbmlError(err)), + Ok(None) => panic!("Buffer was supposed to have enough data to parse element, somehow did not."), Ok(Some((element, element_size))) => { self.last_read = element_size; - return Ok(Async::Ready(Some(element))) + Ok(Async::Ready(Some(element))) } } - } + }) } - match self.stream.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + match self.stream.poll().map_err(ParsingError::OtherError) { Ok(Async::Ready(Some(chunk))) => { self.buffer.reserve(chunk.as_ref().len()); self.buffer.put_slice(chunk.as_ref()); - //println!("Read {} into Buffer", chunk.as_ref().len()); // ok can retry decoding now - } - Err(err) => return Err(ParsingError::OtherError(err)) - }; + }, + other => return other.map(|async| async.map(|_| None)) + } } } }