Break out "find starting point" fixer operator

This commit is contained in:
Tangent 128 2018-04-08 02:31:00 -04:00
parent b7ee425905
commit ff8d4e9126
2 changed files with 59 additions and 12 deletions

View file

@ -32,7 +32,7 @@ impl Service for WebmServer {
(&Get, "/loop") => { (&Get, "/loop") => {
let stream: BodyStream<Vec<u8>> = Box::new( let stream: BodyStream<Vec<u8>> = Box::new(
repeat::<&[u8], ()>(SRC_FILE).take(5) repeat::<&[u8], ()>(SRC_FILE).take(5)
.parse_ebml().chunk_webm().fix_timecodes() .parse_ebml().chunk_webm().fix_timecodes().find_starting_point()
.map_err(|err| match err { .map_err(|err| match err {
ChunkingError::IoError(io_err) => hyper::Error::Io(io_err), ChunkingError::IoError(io_err) => hyper::Error::Io(io_err),
ChunkingError::OtherError(_) => hyper::Error::Incomplete ChunkingError::OtherError(_) => hyper::Error::Incomplete

View file

@ -7,8 +7,7 @@ pub struct ChunkTimecodeFixer<S> {
stream: S, stream: S,
current_offset: u64, current_offset: u64,
last_observed_timecode: u64, last_observed_timecode: u64,
assumed_duration: u64, assumed_duration: u64
seen_header: bool
} }
impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S> impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
@ -29,27 +28,75 @@ impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
cluster_head.update_timecode(start + self.current_offset); cluster_head.update_timecode(start + self.current_offset);
self.last_observed_timecode = cluster_head.end; self.last_observed_timecode = cluster_head.end;
}, },
Ok(Async::Ready(Some(Chunk::Headers {..}))) => {
if self.seen_header {
return self.poll();
} else {
self.seen_header = true;
}
},
_ => {} _ => {}
}; };
poll_chunk poll_chunk
} }
} }
pub struct StartingPointFinder<S> {
stream: S,
seen_header: bool,
seen_keyframe: bool
}
impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
{
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
return match self.stream.poll() {
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => {
if cluster_head.keyframe {
self.seen_keyframe = true;
}
if self.seen_keyframe {
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head))))
} else {
continue;
}
},
chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => {
if self.seen_keyframe {
chunk
} else {
continue;
}
},
chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => {
if self.seen_header {
// new stream starting, we don't need a new header but should wait for a safe spot to resume
self.seen_keyframe = false;
continue;
} else {
self.seen_header = true;
chunk
}
},
chunk => chunk
}
};
}
}
pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> { pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> { fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> {
ChunkTimecodeFixer { ChunkTimecodeFixer {
stream: self, stream: self,
current_offset: 0, current_offset: 0,
last_observed_timecode: 0, last_observed_timecode: 0,
assumed_duration: 33, assumed_duration: 33
seen_header: false }
}
fn find_starting_point(self) -> StartingPointFinder<Self> {
StartingPointFinder {
stream: self,
seen_header: false,
seen_keyframe: false
} }
} }
} }