Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start)

This commit is contained in:
Tangent Wantwight 2020-09-10 18:03:21 -04:00
parent c8124ed388
commit 2fb8408ebc
2 changed files with 6 additions and 3 deletions

View File

@ -1,6 +1,7 @@
## v0.3.1-dev
- forget a channel's initialization segment when no transmitter is active. This improves behavior when a channel is occasionally used for streams with different codecs.
- Add INFO logging for channel creation/garbage-collection
- Start throttle timing on first data instead of throttle creation (improves cases where the source is slow to start)
## v0.3.0
- update internals to v0.2 of `warp` and `tokio`; no remaining code relies on `futures` 0.1

View File

@ -95,7 +95,7 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
pub struct Throttle<S> {
stream: S,
start_time: Instant,
start_time: Option<Instant>,
sleep: Delay
}
@ -104,7 +104,7 @@ impl<S> Throttle<S> {
let now = Instant::now();
Throttle {
stream: wrap,
start_time: now,
start_time: None,
sleep: delay_until(now)
}
}
@ -122,9 +122,11 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S>
let next_chunk = self.stream.try_poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
// we have actual data, so start the clock if we haven't yet
let start_time = self.start_time.get_or_insert_with(Instant::now);
// snooze until real time has "caught up" to the stream
let offset = Duration::from_millis(cluster_head.end);
let sleep_until = self.start_time + offset;
let sleep_until = *start_time + offset;
self.sleep.reset(sleep_until);
}
next_chunk