From 7c1a2e48b0ef0e93ea7221270c7a354d4219af0b Mon Sep 17 00:00:00 2001 From: Tangent 128 Date: Sat, 27 Oct 2018 18:13:18 -0400 Subject: [PATCH] Support arbitrary number of streams; bump version to reflect URL change --- Cargo.lock | 9 ++++++++- Cargo.toml | 3 ++- src/channel.rs | 4 +++- src/commands/relay.rs | 31 +++++++++++++++++++++++++------ src/main.rs | 1 + 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aea8efb..e6fa277 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -877,9 +877,14 @@ dependencies = [ "urlencoding 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "weak-table" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "webmetro" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -891,6 +896,7 @@ dependencies = [ "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "weak-table 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1036,6 +1042,7 @@ dependencies = [ "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" "checksum want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "797464475f30ddb8830cc529aaaae648d581f99e2036a928877dfde027ddf6b3" "checksum warp 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f3c669972fab8d3d249cb4516512448b0ae143433e23eac1a157081e8752f5a9" +"checksum weak-table 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a5862bb244c852a56c6f3c39668ff181271bda44513ef30d2073a3eedd9898d" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/Cargo.toml b/Cargo.toml index ac409c2..9bb8a15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webmetro" -version = "0.1.0" +version = "0.2.0" authors = ["Tangent 128 "] [dependencies] @@ -14,3 +14,4 @@ tokio = "0.1.11" tokio-codec = "0.1.1" tokio-io = "0.1.10" warp = "0.1" +weak-table = "0.2" diff --git a/src/channel.rs b/src/channel.rs index aa9f10d..ced5bf8 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -26,6 +26,7 @@ pub enum Never {} /// case, there's nothing practical the server can do to recover, /// so the failing client is just dropped from the listener list. pub struct Channel { + pub name: String, header_chunk: Option, listeners: Vec> } @@ -33,8 +34,9 @@ pub struct Channel { pub type Handle = Arc>; impl Channel { - pub fn new() -> Handle { + pub fn new(name: String) -> Handle { Arc::new(Mutex::new(Channel { + name, header_chunk: None, listeners: Vec::new() })) diff --git a/src/commands/relay.rs b/src/commands/relay.rs index 26810b7..2f81bdc 100644 --- a/src/commands/relay.rs +++ b/src/commands/relay.rs @@ -1,5 +1,10 @@ use std::error::Error; use std::net::ToSocketAddrs; +use std::sync::{ + Arc, + Mutex, + Weak +}; use bytes::{Bytes, Buf}; use clap::{App, Arg, ArgMatches, SubCommand}; @@ -21,6 +26,9 @@ use warp::{ self, Filter }; +use weak_table::{ + WeakValueHashMap +}; use webmetro::{ channel::{ Channel, @@ -79,22 +87,33 @@ pub fn options() -> App<'static, 'static> { } pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { - let single_channel = Channel::new(); + let channel_map = Arc::new(Mutex::new(WeakValueHashMap::>>::new())); let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; - let channel = path!("live").map(move || single_channel.clone()); + let channel = path!("live" / String).map(move |name: String| { + let channel = channel_map.lock().unwrap() + .entry(name.clone()) + .or_insert_with(|| Channel::new(name.clone())); + (channel, name) + }); let head = channel.clone().and(warp::head()) - .map(|_| media_response(Body::empty())); + .map(|(_, name)| { + println!("[Info] HEAD Request For Channel {}", name); + media_response(Body::empty()) + }); let get = channel.clone().and(warp::get2()) - .map(|channel| media_response(Body::wrap_stream(get_stream(channel)))); + .map(|(channel, name)| { + println!("[Info] Listener Connected On Channel {}", name); + media_response(Body::wrap_stream(get_stream(channel))) + }); let post_put = channel.clone().and(warp::post2().or(warp::put2()).unify()) - .and(warp::body::stream()).map(|channel, stream| { - println!("[Info] Source Connected"); + .and(warp::body::stream()).map(|(channel, name), stream| { + println!("[Info] Source Connected On Channel {}", name); Response::new(Body::wrap_stream(post_stream(channel, stream))) }); diff --git a/src/main.rs b/src/main.rs index 123d98b..454a058 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ extern crate tokio; extern crate tokio_codec; extern crate tokio_io; #[macro_use] extern crate warp; +extern crate weak_table; extern crate webmetro; mod commands;