Stub out a one-channel video relay server

This commit is contained in:
Tangent 128 2018-04-09 21:00:51 -04:00
parent 96f6ec8115
commit bf0f727b03
2 changed files with 123 additions and 4 deletions

View file

@ -17,16 +17,16 @@ use hyper::server::{Http, Request, Response, Service};
const SRC_FILE: &'static [u8] = include_bytes!("../data/test1.webm"); const SRC_FILE: &'static [u8] = include_bytes!("../data/test1.webm");
#[derive(Clone)] struct LoopServer;
struct WebmServer;
type BodyStream = Box<Stream<Item = Chunk, Error = hyper::Error>>; type BodyStream = Box<Stream<Item = Chunk, Error = hyper::Error>>;
impl Service for WebmServer { impl Service for LoopServer {
type Request = Request; type Request = Request;
type Response = Response<BodyStream>; type Response = Response<BodyStream>;
type Error = hyper::Error; type Error = hyper::Error;
type Future = FutureResult<Self::Response, hyper::Error>; type Future = FutureResult<Self::Response, hyper::Error>;
fn call(&self, req: Request) -> Self::Future { fn call(&self, req: Request) -> Self::Future {
let response = match (req.method(), req.path()) { let response = match (req.method(), req.path()) {
(&Get, "/loop") => { (&Get, "/loop") => {
@ -53,5 +53,5 @@ impl Service for WebmServer {
pub fn main() { pub fn main() {
let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap(); let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap();
Http::new().bind(&addr, move || Ok(WebmServer)).unwrap().run().unwrap(); Http::new().bind(&addr, move || Ok(LoopServer)).unwrap().run().unwrap();
} }

119
src/bin/relay_server.rs Normal file
View file

@ -0,0 +1,119 @@
extern crate futures;
extern crate hyper;
extern crate lab_ebml;
use std::env::args;
use std::io::ErrorKind;
use std::net::ToSocketAddrs;
use std::sync::{
Arc,
Mutex
};
use futures::{
Future,
Stream,
Sink,
future::{
FutureResult,
ok
},
stream::empty
};
use lab_ebml::{
channel::{
Channel,
Listener,
Transmitter
},
chunk::{Chunk, WebmStream, ChunkingError},
fixers::ChunkStream,
stream_parser::StreamEbml
};
use hyper::{
Error as HyperError,
Get,
Head,
Post,
StatusCode,
header::ContentType,
server::{Http, Request, Response, Service}
};
type BodyStream = Box<Stream<Item = Chunk, Error = HyperError>>;
struct RelayServer(Arc<Mutex<Channel>>);
impl RelayServer {
fn get_channel(&self) -> Arc<Mutex<Channel>> {
self.0.clone()
}
fn get_stream(&self) -> BodyStream {
Box::new(
Listener::new(self.get_channel())
.fix_timecodes()
.find_starting_point()
.map_err(|err| match err {})
)
}
fn post_stream<I: AsRef<[u8]>, S: Stream<Item = I> + 'static>(&self, stream: S) -> BodyStream {
let source = stream.parse_ebml().chunk_webm();
let sink = Transmitter::new(self.get_channel());
Box::new(
source.forward(sink.sink_map_err(|err| match err {}))
.into_stream()
.map(|_| empty())
.map_err(|err| {
let io_err = match err {
ChunkingError::IoError(io_err) => io_err,
ChunkingError::OtherError(_) => ErrorKind::InvalidData.into()
};
println!("Post failed: {}", &io_err);
io_err
})
.flatten()
)
}
}
impl Service for RelayServer {
type Request = Request;
type Response = Response<BodyStream>;
type Error = HyperError;
type Future = FutureResult<Self::Response, HyperError>;
fn call(&self, request: Request) -> Self::Future {
let (method, uri, _http_version, _headers, request_body) = request.deconstruct();
eprintln!("New {} Request: {}", method, uri.path());
ok(match (method, uri.path()) {
(Head, "/live") => {
Response::new()
.with_header(ContentType("video/webm".parse().unwrap()))
},
(Get, "/live") => {
Response::new()
.with_header(ContentType("video/webm".parse().unwrap()))
.with_body(self.get_stream())
},
(Post, "/live") => {
Response::new()
.with_body(self.post_stream(request_body))
},
_ => {
Response::new()
.with_status(StatusCode::NotFound)
}
})
}
}
pub fn main() {
let single_channel = Channel::new();
let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap();
Http::new().bind(&addr, move || Ok(RelayServer(single_channel.clone()))).unwrap().run().unwrap();
}