From 9274fabeea18c735976733edee04a0fd22dfb3cb Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Wed, 6 May 2020 21:50:03 -0400 Subject: [PATCH] impl Iterator for Chunk --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/chunk.rs | 33 ++++++++++++++++++++++++++++++++- src/commands/filter.rs | 6 ++++-- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 921612b..4437496 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,7 +1768,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 23138a7..47d94c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webmetro" -version = "0.2.3-dev" +version = "0.3.0-dev" authors = ["Tangent 128 "] edition = "2018" diff --git a/src/chunk.rs b/src/chunk.rs index ab9ca7e..bb20011 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -61,16 +61,47 @@ pub enum Chunk { ClusterHead(ClusterHead), ClusterBody { bytes: Bytes + }, + Empty +} + +pub struct Iter(Chunk); + +impl Iterator for Chunk { + type Item = Bytes; + + fn next(&mut self) -> Option { + match self { + Chunk::Headers {ref mut bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::ClusterHead(ClusterHead {bytes, ..}) => { + let bytes = mem::replace(bytes, BytesMut::new()); + *self = Chunk::Empty; + Some(bytes.freeze()) + }, + Chunk::ClusterBody {bytes, ..} => { + let bytes = mem::replace(bytes, Bytes::new()); + *self = Chunk::Empty; + Some(bytes) + }, + Chunk::Empty => None + } } } +// impl Buf??? + impl Chunk { /// converts this chunk of data into a Bytes object, perhaps to send over the network pub fn into_bytes(self) -> Bytes { match self { Chunk::Headers {bytes, ..} => bytes, Chunk::ClusterHead(cluster_head) => cluster_head.bytes.freeze(), - Chunk::ClusterBody {bytes, ..} => bytes + Chunk::ClusterBody {bytes, ..} => bytes, + Chunk::Empty => Bytes::new(), } } } diff --git a/src/commands/filter.rs b/src/commands/filter.rs index bb1947f..e203f7c 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -43,7 +43,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { chunk_stream = Box::new(Throttle::new(chunk_stream)); } - Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| { - ready(io::stdout().write_all(&chunk.into_bytes()).map_err(WebmetroError::from)) + Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|mut chunk| { + ready(chunk.try_for_each(|buffer| + io::stdout().write_all(&buffer).map_err(WebmetroError::from) + )) })) }