Take advantage of NLL to be able to state-transition & return directly in chunker

This commit is contained in:
Tangent Wantwight 2019-11-16 15:19:14 -05:00
parent 7485119028
commit d3b147f8ea

View file

@ -138,9 +138,6 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
let mut chunker = self.get_mut(); let mut chunker = self.get_mut();
loop { loop {
let mut return_value = None;
let mut new_state = None;
match chunker.state { match chunker.state {
ChunkerState::BuildingHeader(ref mut buffer) => { ChunkerState::BuildingHeader(ref mut buffer) => {
match chunker.source.poll_event(cx) { match chunker.source.poll_event(cx) {
@ -151,20 +148,20 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
return_value = Some(Ok(Async::Ready(Some(header_chunk)))); chunker.state = ChunkerState::BuildingCluster(
new_state = Some(ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
)); );
return Ready(Some(Ok(header_chunk)));
}, },
Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Info))) => {},
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
}); }
} }
} }
}, },
@ -180,15 +177,15 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let mut new_header_cursor = Cursor::new(Vec::new()); let mut new_header_cursor = Cursor::new(Vec::new());
match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) { match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
Ok(_) => { Ok(_) => {
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingClusterBodyBeforeNewHeader{
new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{
body: liberated_buffer.into_inner(), body: liberated_buffer.into_inner(),
new_header: new_header_cursor new_header: new_header_cursor
}); };
return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
}, },
Err(err) => { Err(err) => {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
} }
} }
} }
@ -196,8 +193,8 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingClusterBody(liberated_buffer.into_inner());
new_state = Some(ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
}, },
Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => { Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => {
cluster_head.update_timecode(timecode); cluster_head.update_timecode(timecode);
@ -208,67 +205,55 @@ impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for Webm
cluster_head.keyframe = true; cluster_head.keyframe = true;
} }
cluster_head.observe_simpleblock_timecode(block.timecode); cluster_head.observe_simpleblock_timecode(block.timecode);
encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { if let Err(err) = encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit) {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
}); }
}, },
Ok(Async::Ready(Some(WebmElement::Info))) => {}, Ok(Async::Ready(Some(WebmElement::Info))) => {},
Ok(Async::Ready(Some(WebmElement::Void))) => {}, Ok(Async::Ready(Some(WebmElement::Void))) => {},
Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
Ok(Async::Ready(Some(element))) => { Ok(Async::Ready(Some(element))) => {
encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| { if let Err(err) = encode(element, buffer, chunker.buffer_size_limit) {
return_value = Some(Err(err)); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Err(err)));
}); }
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); chunker.state = ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner());
new_state = Some(ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())); return Ready(Some(Ok(Chunk::ClusterHead(liberated_cluster_head))));
} }
} }
}, },
ChunkerState::EmittingClusterBody(ref mut buffer) => { ChunkerState::EmittingClusterBody(ref mut buffer) => {
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); chunker.state = ChunkerState::BuildingCluster(
new_state = Some(ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
)); );
return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
}, },
ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => {
let liberated_body = mem::replace(body, Vec::new()); let liberated_body = mem::replace(body, Vec::new());
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})))); chunker.state = ChunkerState::BuildingHeader(liberated_header_cursor);
new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})));
}, },
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); chunker.state = ChunkerState::End;
new_state = Some(ChunkerState::End); return Ready(Some(Ok(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})));
}, },
ChunkerState::End => return Ready(None) ChunkerState::End => return Ready(None)
}; };
if let Some(new_state) = new_state {
chunker.state = new_state;
}
if let Some(return_value) = return_value {
return match return_value {
Ok(Async::Ready(Some(chunk))) => Ready(Some(Ok(chunk))),
Ok(Async::Ready(None)) => Ready(None),
Ok(Async::NotReady) => Pending,
Err(err) => Ready(Some(Err(err))),
};
}
} }
} }
} }