From 9cc7d8064d032ac1af9b8e44b674e663b855b7e5 Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Fri, 11 Sep 2020 19:34:27 -0400 Subject: [PATCH] Represent Chunks as iterables of Bytes --- src/chunk.rs | 14 -------------- src/commands/relay.rs | 8 +++++--- src/commands/send.rs | 6 +++++- 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index 64bcc1a..b368690 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -92,20 +92,6 @@ impl Iterator for Chunk { } } -// impl Buf??? - -impl Chunk { - /// converts this chunk of data into a Bytes object, perhaps to send over the network - pub fn into_bytes(self) -> Bytes { - match self { - Chunk::Headers {bytes, ..} => bytes, - Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(), - Chunk::ClusterBody {bytes, ..} => bytes, - Chunk::Empty => Bytes::new(), - } - } -} - #[derive(Debug)] enum ChunkerState { BuildingHeader(Cursor>), diff --git a/src/commands/relay.rs b/src/commands/relay.rs index eacd3d1..2ff2c2b 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -20,6 +20,7 @@ use hyper::{ CONTENT_TYPE } }; +use stream::iter; use warp::{ self, Filter, @@ -42,16 +43,17 @@ use webmetro::{ ChunkTimecodeFixer, }, stream_parser::StreamEbml -}; +, chunk::Chunk}; const BUFFER_LIMIT: usize = 2 * 1024 * 1024; fn get_stream(channel: Handle) -> impl Stream> { let mut timecode_fixer = ChunkTimecodeFixer::new(); - Listener::new(channel).map(|c| Ok(c)) + Listener::new(channel).map(|c| Result::::Ok(c)) .map_ok(move |chunk| timecode_fixer.process(chunk)) .find_starting_point() - .map_ok(|webm_chunk| webm_chunk.into_bytes()) + .map_ok(|webm_chunk| iter(webm_chunk).map(Result::::Ok)) + .try_flatten() } fn post_stream(channel: Handle, stream: impl Stream> + Unpin) -> impl Stream> { diff --git a/src/commands/send.rs b/src/commands/send.rs index 41df5fe..a65395a 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -1,7 +1,9 @@ +use bytes::Bytes; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::prelude::*; use hyper::{client::HttpConnector, Body, Client, Request}; use std::io::{stdout, Write}; +use stream::iter; use super::stdin_stream; use webmetro::{ @@ -26,6 +28,7 @@ type BoxedChunkStream = Box> + Se #[tokio::main] pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { + // build pipeline let mut timecode_fixer = ChunkTimecodeFixer::new(); let mut chunk_stream: BoxedChunkStream = Box::new( stdin_stream() @@ -44,7 +47,8 @@ pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { } let chunk_stream = chunk_stream - .map_ok(|webm_chunk| webm_chunk.into_bytes()) + .map_ok(|webm_chunk| iter(webm_chunk).map(Result::::Ok)) + .try_flatten() .map_err(|err| { warn!("{}", &err); err