From b64e3f4bacbabe1a12f357c48122731d5be3e731 Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sun, 21 Oct 2018 01:53:16 -0400 Subject: [PATCH] use Bytes for shared buffers in WebM chunks --- src/chunk.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index 75581fa..1b63499 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,8 +1,8 @@ +use bytes::Bytes; use futures::{Async, Stream}; use std::{ io::Cursor, - mem, - sync::Arc + mem }; use ebml::EbmlEventSource; use error::WebmetroError; @@ -13,7 +13,8 @@ pub struct ClusterHead { pub keyframe: bool, pub start: u64, pub end: u64, - // space for a Cluster tag and a Timecode tag + /// space for a Cluster tag and a Timecode tag + /// TODO: consider using a BytesMut here for simplicity bytes: [u8;16], bytes_used: u8 } @@ -58,20 +59,31 @@ impl AsRef<[u8]> for ClusterHead { #[derive(Clone, Debug)] pub enum Chunk { Headers { - bytes: Arc> + bytes: Bytes }, ClusterHead(ClusterHead), ClusterBody { - bytes: Arc> + bytes: Bytes + } +} + +impl Chunk { + /// converts this chunk of data into a Bytes object, perhaps to send over the network + pub fn into_bytes(self) -> Bytes { + match self { + Chunk::Headers {bytes, ..} => bytes, + Chunk::ClusterHead(cluster_head) => Bytes::from(cluster_head.as_ref()), + Chunk::ClusterBody {bytes, ..} => bytes + } } } impl AsRef<[u8]> for Chunk { fn as_ref(&self) -> &[u8] { match self { - &Chunk::Headers {ref bytes, ..} => bytes.as_ref().as_ref(), + &Chunk::Headers {ref bytes, ..} => bytes.as_ref(), &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), - &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref().as_ref() + &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref() } } } @@ -135,7 +147,7 @@ where S::Error: Into Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), Ok(Async::Ready(Some(WebmElement::Cluster))) => { let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); - let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; + let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())}; return_value = Some(Ok(Async::Ready(Some(header_chunk)))); new_state = Some(ChunkerState::BuildingCluster( @@ -221,7 +233,7 @@ where S::Error: Into ChunkerState::EmittingClusterBody(ref mut buffer) => { let liberated_buffer = mem::replace(buffer, Vec::new()); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); new_state = Some(ChunkerState::BuildingCluster( ClusterHead::new(0), Cursor::new(Vec::new()) @@ -231,14 +243,14 @@ where S::Error: Into let liberated_body = mem::replace(body, Vec::new()); let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_body)})))); + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_body)})))); new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); }, ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { // flush final Cluster on end of stream let liberated_buffer = mem::replace(buffer, Vec::new()); - return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)})))); new_state = Some(ChunkerState::End); }, ChunkerState::End => return Ok(Async::Ready(None))