diff --git a/CHANGELOG.md b/CHANGELOG.md index 2228370..a0ffb37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v0.3.1-dev - forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs. - Add INFO logging for channel creation/garbage-collection +- Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start) ## v0.3.0 - update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1 diff --git a/src/fixers.rs b/src/fixers.rs index bdb3606..0c02272 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -95,7 +95,7 @@ impl + Unpin> Stream for StartingPointFinder pub struct Throttle { stream: S, - start_time: Instant, + start_time: Option, sleep: Delay } @@ -104,7 +104,7 @@ impl Throttle { let now = Instant::now(); Throttle { stream: wrap, - start_time: now, + start_time: None, sleep: delay_until(now) } } @@ -122,9 +122,11 @@ impl + Unpin> Stream for Throttle let next_chunk = self.stream.try_poll_next_unpin(cx); if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { + // we have actual data, so start the clock if we haven't yet + let start_time = self.start_time.get_or_insert_with(Instant::now); // snooze until real time has "caught up" to the stream let offset = Duration::from_millis(cluster_head.end); - let sleep_until = self.start_time + offset; + let sleep_until = *start_time + offset; self.sleep.reset(sleep_until); } next_chunk