From bf0f727b03051552cc805f1f4957f312d68b0aed Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Mon, 9 Apr 2018 21:00:51 -0400 Subject: [PATCH] Stub out a one-channel video relay server --- src/bin/loop_server.rs | 8 +-- src/bin/relay_server.rs | 119 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 src/bin/relay_server.rs diff --git a/src/bin/loop_server.rs b/src/bin/loop_server.rs index 81c4948..a01a589 100644 --- a/src/bin/loop_server.rs +++ b/src/bin/loop_server.rs @@ -17,16 +17,16 @@ use hyper::server::{Http, Request, Response, Service}; const SRC_FILE: &'static [u8] = include_bytes!("../data/test1.webm"); -#[derive(Clone)] -struct WebmServer; +struct LoopServer; type BodyStream = Box>; -impl Service for WebmServer { +impl Service for LoopServer { type Request = Request; type Response = Response; type Error = hyper::Error; type Future = FutureResult; + fn call(&self, req: Request) -> Self::Future { let response = match (req.method(), req.path()) { (&Get, "/loop") => { @@ -53,5 +53,5 @@ impl Service for WebmServer { pub fn main() { let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap(); - Http::new().bind(&addr, move || Ok(WebmServer)).unwrap().run().unwrap(); + Http::new().bind(&addr, move || Ok(LoopServer)).unwrap().run().unwrap(); } diff --git a/src/bin/relay_server.rs b/src/bin/relay_server.rs new file mode 100644 index 0000000..6f80d01 --- /dev/null +++ b/src/bin/relay_server.rs @@ -0,0 +1,119 @@ +extern crate futures; +extern crate hyper; +extern crate lab_ebml; + +use std::env::args; +use std::io::ErrorKind; +use std::net::ToSocketAddrs; +use std::sync::{ + Arc, + Mutex +}; + +use futures::{ + Future, + Stream, + Sink, + future::{ + FutureResult, + ok + }, + stream::empty +}; +use lab_ebml::{ + channel::{ + Channel, + Listener, + Transmitter + }, + chunk::{Chunk, WebmStream, ChunkingError}, + fixers::ChunkStream, + stream_parser::StreamEbml +}; +use hyper::{ + Error as HyperError, + Get, + Head, + Post, + StatusCode, + header::ContentType, + server::{Http, Request, Response, Service} +}; + +type BodyStream = Box>; + +struct RelayServer(Arc>); + +impl RelayServer { + fn get_channel(&self) -> Arc> { + self.0.clone() + } + + fn get_stream(&self) -> BodyStream { + Box::new( + Listener::new(self.get_channel()) + .fix_timecodes() + .find_starting_point() + .map_err(|err| match err {}) + ) + } + + fn post_stream, S: Stream + 'static>(&self, stream: S) -> BodyStream { + let source = stream.parse_ebml().chunk_webm(); + let sink = Transmitter::new(self.get_channel()); + + Box::new( + source.forward(sink.sink_map_err(|err| match err {})) + .into_stream() + .map(|_| empty()) + .map_err(|err| { + let io_err = match err { + ChunkingError::IoError(io_err) => io_err, + ChunkingError::OtherError(_) => ErrorKind::InvalidData.into() + }; + println!("Post failed: {}", &io_err); + io_err + }) + .flatten() + ) + } +} + +impl Service for RelayServer { + type Request = Request; + type Response = Response; + type Error = HyperError; + type Future = FutureResult; + + fn call(&self, request: Request) -> Self::Future { + let (method, uri, _http_version, _headers, request_body) = request.deconstruct(); + + eprintln!("New {} Request: {}", method, uri.path()); + + ok(match (method, uri.path()) { + (Head, "/live") => { + Response::new() + .with_header(ContentType("video/webm".parse().unwrap())) + }, + (Get, "/live") => { + Response::new() + .with_header(ContentType("video/webm".parse().unwrap())) + .with_body(self.get_stream()) + }, + (Post, "/live") => { + Response::new() + .with_body(self.post_stream(request_body)) + }, + _ => { + Response::new() + .with_status(StatusCode::NotFound) + } + }) + } +} + +pub fn main() { + let single_channel = Channel::new(); + let addr = args().nth(1).expect("Need binding address+port").to_socket_addrs().unwrap().next().unwrap(); + Http::new().bind(&addr, move || Ok(RelayServer(single_channel.clone()))).unwrap().run().unwrap(); +}