2019-10-11 04:28:08 +00:00
|
|
|
use std::pin::Pin;
|
|
|
|
use std::task::{
|
|
|
|
Context,
|
|
|
|
Poll
|
|
|
|
};
|
2018-04-13 03:29:12 +00:00
|
|
|
|
2020-05-08 03:14:43 +00:00
|
|
|
use futures::prelude::*;
|
|
|
|
use tokio::time::{
|
|
|
|
delay_until,
|
|
|
|
Delay,
|
|
|
|
Duration,
|
|
|
|
Instant,
|
2019-10-11 04:28:08 +00:00
|
|
|
};
|
2017-09-06 06:02:50 +00:00
|
|
|
|
2018-12-22 20:03:19 +00:00
|
|
|
use crate::chunk::Chunk;
|
2017-09-30 06:33:36 +00:00
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
pub struct ChunkTimecodeFixer {
|
2017-09-30 21:36:09 +00:00
|
|
|
current_offset: u64,
|
|
|
|
last_observed_timecode: u64,
|
2018-04-08 06:31:00 +00:00
|
|
|
assumed_duration: u64
|
2017-09-30 06:33:36 +00:00
|
|
|
}
|
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
impl ChunkTimecodeFixer {
|
|
|
|
pub fn new() -> ChunkTimecodeFixer {
|
|
|
|
ChunkTimecodeFixer {
|
|
|
|
current_offset: 0,
|
|
|
|
last_observed_timecode: 0,
|
|
|
|
assumed_duration: 33
|
|
|
|
}
|
|
|
|
}
|
2020-05-09 01:15:18 +00:00
|
|
|
pub fn process(&mut self, mut chunk: Chunk) -> Chunk {
|
2019-10-11 04:28:08 +00:00
|
|
|
match chunk {
|
|
|
|
Chunk::ClusterHead(ref mut cluster_head) => {
|
2018-03-28 04:31:58 +00:00
|
|
|
let start = cluster_head.start;
|
2017-09-30 21:36:09 +00:00
|
|
|
if start < self.last_observed_timecode {
|
|
|
|
let next_timecode = self.last_observed_timecode + self.assumed_duration;
|
|
|
|
self.current_offset = next_timecode - start;
|
|
|
|
}
|
|
|
|
|
2018-03-28 04:31:58 +00:00
|
|
|
cluster_head.update_timecode(start + self.current_offset);
|
2018-03-30 06:44:42 +00:00
|
|
|
self.last_observed_timecode = cluster_head.end;
|
2019-10-11 04:28:08 +00:00
|
|
|
}
|
2017-09-30 21:36:09 +00:00
|
|
|
_ => {}
|
2019-10-11 04:28:08 +00:00
|
|
|
}
|
|
|
|
chunk
|
2017-09-30 06:33:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-08 06:31:00 +00:00
|
|
|
pub struct StartingPointFinder<S> {
|
|
|
|
stream: S,
|
|
|
|
seen_header: bool,
|
|
|
|
seen_keyframe: bool
|
|
|
|
}
|
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
|
2018-04-08 06:31:00 +00:00
|
|
|
{
|
2019-10-11 04:28:08 +00:00
|
|
|
type Item = Result<Chunk, S::Error>;
|
2018-04-08 06:31:00 +00:00
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
|
2018-04-08 06:31:00 +00:00
|
|
|
loop {
|
2019-10-11 04:28:08 +00:00
|
|
|
return match self.stream.try_poll_next_unpin(cx) {
|
|
|
|
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => {
|
2018-04-08 06:31:00 +00:00
|
|
|
if cluster_head.keyframe {
|
|
|
|
self.seen_keyframe = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.seen_keyframe {
|
2019-10-11 04:28:08 +00:00
|
|
|
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head))))
|
2018-04-08 06:31:00 +00:00
|
|
|
} else {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
},
|
2019-10-11 04:28:08 +00:00
|
|
|
chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => {
|
2018-04-08 06:31:00 +00:00
|
|
|
if self.seen_keyframe {
|
|
|
|
chunk
|
|
|
|
} else {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
},
|
2019-10-11 04:28:08 +00:00
|
|
|
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
|
2018-04-08 06:31:00 +00:00
|
|
|
if self.seen_header {
|
|
|
|
// new stream starting, we don't need a new header but should wait for a safe spot to resume
|
|
|
|
self.seen_keyframe = false;
|
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
self.seen_header = true;
|
|
|
|
chunk
|
|
|
|
}
|
|
|
|
},
|
|
|
|
chunk => chunk
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-13 03:29:12 +00:00
|
|
|
pub struct Throttle<S> {
|
|
|
|
stream: S,
|
2018-04-14 08:45:35 +00:00
|
|
|
start_time: Instant,
|
|
|
|
sleep: Delay
|
2018-04-13 03:29:12 +00:00
|
|
|
}
|
|
|
|
|
2019-10-16 04:16:47 +00:00
|
|
|
impl<S> Throttle<S> {
|
|
|
|
pub fn new(wrap: S) -> Throttle<S> {
|
|
|
|
let now = Instant::now();
|
|
|
|
Throttle {
|
|
|
|
stream: wrap,
|
|
|
|
start_time: now,
|
2020-05-08 03:14:43 +00:00
|
|
|
sleep: delay_until(now)
|
2019-10-16 04:16:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-09 01:15:18 +00:00
|
|
|
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S>
|
2018-04-13 03:29:12 +00:00
|
|
|
{
|
2020-05-09 01:15:18 +00:00
|
|
|
type Item = Result<Chunk, S::Error>;
|
2019-10-11 04:28:08 +00:00
|
|
|
|
2020-05-09 01:15:18 +00:00
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
|
2019-10-11 04:28:08 +00:00
|
|
|
match self.sleep.poll_unpin(cx) {
|
|
|
|
Poll::Pending => return Poll::Pending,
|
|
|
|
Poll::Ready(()) => { /* can continue */ },
|
2018-04-14 08:45:35 +00:00
|
|
|
}
|
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
let next_chunk = self.stream.try_poll_next_unpin(cx);
|
|
|
|
if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
|
2018-04-14 08:45:35 +00:00
|
|
|
// snooze until real time has "caught up" to the stream
|
|
|
|
let offset = Duration::from_millis(cluster_head.end);
|
2019-10-11 04:28:08 +00:00
|
|
|
let sleep_until = self.start_time + offset;
|
|
|
|
self.sleep.reset(sleep_until);
|
2018-04-14 08:45:35 +00:00
|
|
|
}
|
|
|
|
next_chunk
|
2018-04-13 03:29:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
|
|
|
|
/*fn fix_timecodes(self) -> Map<_> {
|
|
|
|
let fixer = ;
|
|
|
|
self.map(move |chunk| {
|
|
|
|
fixer.process(chunk);
|
|
|
|
chunk
|
|
|
|
})
|
|
|
|
}*/
|
2018-04-08 06:31:00 +00:00
|
|
|
|
|
|
|
fn find_starting_point(self) -> StartingPointFinder<Self> {
|
|
|
|
StartingPointFinder {
|
|
|
|
stream: self,
|
|
|
|
seen_header: false,
|
|
|
|
seen_keyframe: false
|
2017-09-30 06:33:36 +00:00
|
|
|
}
|
|
|
|
}
|
2018-04-13 03:29:12 +00:00
|
|
|
|
|
|
|
fn throttle(self) -> Throttle<Self> {
|
2019-10-16 04:16:47 +00:00
|
|
|
Throttle::new(self)
|
2018-04-13 03:29:12 +00:00
|
|
|
}
|
2017-09-30 06:33:36 +00:00
|
|
|
}
|
2018-04-07 05:09:17 +00:00
|
|
|
|
2019-10-11 04:28:08 +00:00
|
|
|
impl<T: TryStream<Ok = Chunk>> ChunkStream for T {}
|