use Bytes for shared buffers in WebM chunks

This commit is contained in:
Tangent 128 2018-10-21 01:53:16 -04:00
parent 5d25d5adb7
commit b64e3f4bac

View file

@ -1,8 +1,8 @@
use bytes::Bytes;
use futures::{Async, Stream}; use futures::{Async, Stream};
use std::{ use std::{
io::Cursor, io::Cursor,
mem, mem
sync::Arc
}; };
use ebml::EbmlEventSource; use ebml::EbmlEventSource;
use error::WebmetroError; use error::WebmetroError;
@ -13,7 +13,8 @@ pub struct ClusterHead {
pub keyframe: bool, pub keyframe: bool,
pub start: u64, pub start: u64,
pub end: u64, pub end: u64,
// space for a Cluster tag and a Timecode tag /// space for a Cluster tag and a Timecode tag
/// TODO: consider using a BytesMut here for simplicity
bytes: [u8;16], bytes: [u8;16],
bytes_used: u8 bytes_used: u8
} }
@ -58,20 +59,31 @@ impl AsRef<[u8]> for ClusterHead {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Chunk { pub enum Chunk {
Headers { Headers {
bytes: Arc<Vec<u8>> bytes: Bytes
}, },
ClusterHead(ClusterHead), ClusterHead(ClusterHead),
ClusterBody { ClusterBody {
bytes: Arc<Vec<u8>> bytes: Bytes
}
}
impl Chunk {
/// converts this chunk of data into a Bytes object, perhaps to send over the network
pub fn into_bytes(self) -> Bytes {
match self {
Chunk::Headers {bytes, ..} => bytes,
Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()),
Chunk::ClusterBody {bytes, ..} => bytes
}
} }
} }
impl AsRef<[u8]> for Chunk { impl AsRef<[u8]> for Chunk {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
match self { match self {
&Chunk::Headers {ref bytes, ..} => bytes.as_ref().as_ref(), &Chunk::Headers {ref bytes, ..} => bytes.as_ref(),
&Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(),
&Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref().as_ref() &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref()
} }
} }
} }
@ -135,7 +147,7 @@ where S::Error: Into<WebmetroError>
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(WebmElement::Cluster))) => { Ok(Async::Ready(Some(WebmElement::Cluster))) => {
let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
return_value = Some(Ok(Async::Ready(Some(header_chunk)))); return_value = Some(Ok(Async::Ready(Some(header_chunk))));
new_state = Some(ChunkerState::BuildingCluster( new_state = Some(ChunkerState::BuildingCluster(
@ -221,7 +233,7 @@ where S::Error: Into<WebmetroError>
ChunkerState::EmittingClusterBody(ref mut buffer) => { ChunkerState::EmittingClusterBody(ref mut buffer) => {
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))));
new_state = Some(ChunkerState::BuildingCluster( new_state = Some(ChunkerState::BuildingCluster(
ClusterHead::new(0), ClusterHead::new(0),
Cursor::new(Vec::new()) Cursor::new(Vec::new())
@ -231,14 +243,14 @@ where S::Error: Into<WebmetroError>
let liberated_body = mem::replace(body, Vec::new()); let liberated_body = mem::replace(body, Vec::new());
let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new()));
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_body)})))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)}))));
new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor));
}, },
ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { ChunkerState::EmittingFinalClusterBody(ref mut buffer) => {
// flush final Cluster on end of stream // flush final Cluster on end of stream
let liberated_buffer = mem::replace(buffer, Vec::new()); let liberated_buffer = mem::replace(buffer, Vec::new());
return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))));
new_state = Some(ChunkerState::End); new_state = Some(ChunkerState::End);
}, },
ChunkerState::End => return Ok(Async::Ready(None)) ChunkerState::End => return Ok(Async::Ready(None))