diff --git a/src/fixers.rs b/src/fixers.rs index 9605d34..2b6b9ea 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -115,10 +115,11 @@ impl + Unpin> Stream for Throttle let next_chunk = self.stream.try_poll_next_unpin(cx); if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk { - // we have actual data, so start the clock if we haven't yet - let start_time = self.start_time.get_or_insert_with(Instant::now); - // snooze until real time has "caught up" to the stream let offset = Duration::from_millis(cluster_head.end); + // we have actual data, so start the clock if we haven't yet; + // if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero + let start_time = self.start_time.get_or_insert_with(|| Instant::now() - offset); + // snooze until real time has "caught up" to the stream let sleep_until = *start_time + offset; self.sleep.reset(sleep_until); }