diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eb5a316 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..2d36767 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,719 @@ +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "arrayvec" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "atty" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "base64" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "bitflags" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "byteorder" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bytes" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "cfg-if" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "clap" +version = "2.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "atty 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "vec_map 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-deque" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-utils" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-cpupool" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "httparse" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "hyper" +version = "0.11.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "iovec" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "language-tags" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "lazy_static" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "lazycell" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "libc" +version = "0.2.40" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "log" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "log" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memoffset" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "mime" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "mio" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "miow" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "net2" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "nodrop" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "num_cpus" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "odds" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rawslice 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unchecked-index 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "percent-encoding" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rand" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rand" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rawpointer" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rawslice" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "redox_syscall" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "redox_termios" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "relay" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "safemem" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "scoped-tls" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "scopeguard" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "slab" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "slab" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "smallvec" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "strsim" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "take" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "termion" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "textwrap" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "time" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-core" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "scoped-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-executor" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-io" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-proto" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-reactor" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-service" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-tcp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-threadpool" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-deque 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-timer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-udp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unchecked-index" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "unicase" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unicode-width" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "vec_map" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "version_check" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "webmetro" +version = "0.1.0" +dependencies = [ + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)", + "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[metadata] +"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +"checksum arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a1e964f9e24d588183fcb43503abda40d288c8657dfc27311516ce2f05675aef" +"checksum atty 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "af80143d6f7608d746df1520709e5d141c96f240b0e62b0aa41bdfb53374d9d4" +"checksum base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "229d032f1a99302697f10b27167ae6d03d49d032e6a8e2550e8d3fc13356d2b4" +"checksum bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c30d3802dfb7281680d6285f2ccdaa8c2d8fee41f93805dba5c4cf50dc23cf" +"checksum byteorder 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "73b5bdfe7ee3ad0b99c9801d58807a9dbc9e09196365b0203853b99889ab3c87" +"checksum bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "1b7db437d718977f6dc9b2e3fd6fc343c02ac6b899b73fdd2179163447bd9ce9" +"checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" +"checksum clap 2.31.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0f16b89cbb9ee36d87483dc939fe9f1e13c05898d56d7b230a0d4dff033a536" +"checksum crossbeam-deque 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c1bdc73742c36f7f35ebcda81dbb33a7e0d33757d03a06d9ddca762712ec5ea2" +"checksum crossbeam-epoch 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9b4e2817eb773f770dcb294127c011e22771899c21d18fce7dd739c0b9832e81" +"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" +"checksum crossbeam-utils 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d636a8b3bcc1b409d7ffd3facef8f21dcb4009626adbd0c5e6c4305c07253c7b" +"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c" +"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" +"checksum hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)" = "549dbb86397490ce69d908425b9beebc85bbaad25157d67479d4995bb56fdf9a" +"checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" +"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" +"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" +"checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" +"checksum libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)" = "6fd41f331ac7c5b8ac259b8bf82c75c0fb2e469bbf37d2becbba9a6a2221965b" +"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" +"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" +"checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" +"checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" +"checksum mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "6d771e3ef92d58a8da8df7d6976bfca9371ed1de6619d9d5a5ce5b1f29b85bfe" +"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)" = "9044faf1413a1057267be51b5afba8eb1090bd2231c693664aa1db716fe1eae0" +"checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" +"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" +"checksum odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a9a18d7081eb052145753e982d7b8de495f15f74636d0d963f09116581eab665" +"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1" +"checksum rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eba5f8cb59cc50ed56be8880a5c7b496bfd9bd26394e176bc67884094145c2c5" +"checksum rawpointer 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ebac11a9d2e11f2af219b8b8d833b76b1ea0e054aa0e8d8e9e4cbde353bdf019" +"checksum rawslice 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "22b23b9f57ea250c6db4b21e2897b43ff08209217ca8260469fae6c0f9ad7e25" +"checksum redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "0d92eecebad22b767915e4d529f89f28ee96dbbf5a4810d2b844373f136417fd" +"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" +"checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a" +"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" +"checksum scoped-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8674d439c964889e2476f474a3bf198cc9e199e77499960893bac5de7e9218a4" +"checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" +"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" +"checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" +"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" +"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" +"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" +"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" +"checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693" +"checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098" +"checksum tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "be15ef40f675c9fe66e354d74c73f3ed012ca1aa14d65846a33ee48f1ae8d922" +"checksum tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "aeeffbbb94209023feaef3c196a41cbcdafa06b4a6f893f68779bb5e53796f71" +"checksum tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8cac2a7883ff3567e9d66bb09100d09b33d90311feca0206c7ca034bc0c55113" +"checksum tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "6af9eb326f64b2d6b68438e1953341e00ab3cf54de7e35d92bfc73af8555313a" +"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" +"checksum tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3cedc8e5af5131dc3423ffa4f877cce78ad25259a9a62de0613735a13ebc64b" +"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ec9b094851aadd2caf83ba3ad8e8c4ce65a42104f7b94d9e6550023f0407853f" +"checksum tokio-threadpool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3d05cdd6a78005e535d2b27c21521bdf91fbb321027a62d8e178929d18966d" +"checksum tokio-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "29a89e4ad0c8f1e4c9860e605c38c69bfdad3cccd4ea446e58ff588c1c07a397" +"checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a" +"checksum unchecked-index 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eeba86d422ce181a719445e51872fa30f1f7413b62becb52e95ec91aa262d85c" +"checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" +"checksum unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bf3a113775714a22dcb774d8ea3655c53a32debae63a063acc00a91cc586245f" +"checksum vec_map 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "887b5b631c2ad01628bbbaa7dd4c869f80d3186688f8d0b6f58774fbe324988c" +"checksum version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6b772017e347561807c1aa192438c5fd74242a670a6cffacc40f2defd1dc069d" +"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" +"checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" +"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" +"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7a72521 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "webmetro" +version = "0.1.0" +authors = ["Tangent 128 "] + +[dependencies] +bytes = "0.4" +clap = "2.31.2" +futures = "0.1.20" +hyper = "0.11.25" +odds = { version = "0.3.1", features = ["std-vec"] } +tokio = "0.1.5" +tokio-core = "0.1.17" +tokio-io = "0.1.6" diff --git a/README.md b/README.md new file mode 100644 index 0000000..c5ca841 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# webmetro + +`webmetro` is a simple relay server for broadcasting a WebM stream from one uploader to many downloaders, via HTTP. + +The initialization segment is remembered, so that viewers can join mid-stream. + +Cluster timestamps are rewritten to be monotonic, so multiple (compatibly-encoded) webm files can be chained together without clients needing to reconnect. + +## Usage + +Launch a relay server with the `relay` subcommand: + +`webmetro relay localhost:8080` + +At this point you can open http://localhost:8080/live in a web browser. + +Next, a source client will need to `POST` or `PUT` a stream to that URL; a static file can be uploaded with the `send` subcommand: + +`webmetro send --throttle http://localhost:8080/live < file.webm` + +You can even glue together multiple files, provided they share the same codecs and track order: + +`cat 1.webm 2.webm 3.webm | webmetro send --throttle http://localhost:8080/live` + +You can use ffmpeg to transcode a non-WebM file or access a media device: + +`ffmpeg -i file.mp4 -deadline realtime -threads 4 -vb 700k -vcodec libvpx -f webm -live 1 - | webmetro send --throttle http://localhost:8080/live` + +(if the source is itself a live stream, you can leave off the `--throttle` flag) + +## Limitations + +* HTTPS is not supported yet. It really should be. +* There aren't any access controls on either the source or viewer roles yet. +* Currently the server only recognizes a single stream, at `/live`. +* The server tries to start a viewer at a cluster containing a keyframe; it is not yet smart enough to ensure that the keyframe belongs to the *video* stream. +* The server doesn't parse any metadata, such as tags; the Info segment is stripped out, everything else is blindly passed along. +* The server drops any source that it feels uses too much buffer space. This is not yet configurable, though sane files probably won't hit the limit. (Essentially, clusters & the initialization segment can't individually be more than 2M) + +## See Also + +* the [Icecast](http://www.icecast.org/) streaming server likewise relays media streams over HTTP, and supports additional non-WebM formats such as Ogg. It does not support clients connecting to a stream before the source, however. + +## License + +`webmetro` is licensed under the MIT license; see the LICENSE file. diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..80d7725 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,112 @@ +use std::sync::{ + Arc, + Mutex +}; + +use futures::{ + Async, + AsyncSink, + Sink, + Stream, + sync::mpsc::{ + channel as mpsc_channel, + Sender, + Receiver + } +}; +use odds::vec::VecExt; + +use chunk::Chunk; + +pub enum Never {} + +/// A collection of listeners to a stream of WebM chunks. +/// Sending a chunk may fail due to a client being disconnected, +/// or simply failing to keep up with the stream buffer. In either +/// case, there's nothing practical the server can do to recover, +/// so the failing client is just dropped from the listener list. +pub struct Channel { + header_chunk: Option, + listeners: Vec> +} + +impl Channel { + pub fn new() -> Arc> { + Arc::new(Mutex::new(Channel { + header_chunk: None, + listeners: Vec::new() + })) + } +} + +pub struct Transmitter { + channel: Arc> +} + +impl Transmitter { + pub fn new(channel_arc: Arc>) -> Self { + Transmitter { + channel: channel_arc + } + } +} + +impl Sink for Transmitter { + type SinkItem = Chunk; + type SinkError = Never; // never errors, slow clients are simply dropped + + fn start_send(&mut self, chunk: Chunk) -> Result, Never> { + let mut channel = self.channel.lock().expect("Locking channel"); + + if let Chunk::Headers { .. } = chunk { + channel.header_chunk = Some(chunk.clone()); + } + + channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); + + Ok(AsyncSink::Ready) + } + fn poll_complete(&mut self) -> Result, Never> { + let mut channel = self.channel.lock().expect("Locking channel"); + + channel.listeners.retain_mut(|listener| listener.poll_complete().is_ok()); + + Ok(Async::Ready(())) + } +} + +pub struct Listener { + /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter + _channel: Arc>, + receiver: Receiver +} + +impl Listener { + pub fn new(channel_arc: Arc>) -> Self { + let (mut sender, receiver) = mpsc_channel(5); + + { + let mut channel = channel_arc.lock().expect("Locking channel"); + + if let Some(ref chunk) = channel.header_chunk { + sender.start_send(chunk.clone()).expect("Queuing existing header chunk"); + } + + channel.listeners.push(sender); + } + + Listener { + _channel: channel_arc, + receiver: receiver + } + } +} + +impl Stream for Listener { + type Item = Chunk; + type Error = Never; // no transmitter errors are exposed to the listeners + + fn poll(&mut self) -> Result>, Never> { + Ok(self.receiver.poll().expect("Channel receiving can't error")) + } +} diff --git a/src/chunk.rs b/src/chunk.rs new file mode 100644 index 0000000..b81d16f --- /dev/null +++ b/src/chunk.rs @@ -0,0 +1,277 @@ +use futures::{Async, Stream}; +use std::{ + io::Cursor, + mem, + sync::Arc +}; +use ebml::EbmlEventSource; +use error::WebmetroError; +use webm::*; + +#[derive(Clone, Debug)] +pub struct ClusterHead { + pub keyframe: bool, + pub start: u64, + pub end: u64, + // space for a Cluster tag and a Timecode tag + bytes: [u8;16], + bytes_used: u8 +} + +impl ClusterHead { + pub fn new(timecode: u64) -> ClusterHead { + let mut cluster_head = ClusterHead { + keyframe: false, + start: 0, + end: 0, + bytes: [0;16], + bytes_used: 0 + }; + cluster_head.update_timecode(timecode); + cluster_head + } + pub fn update_timecode(&mut self, timecode: u64) { + let delta = self.end - self.start; + self.start = timecode; + self.end = self.start + delta; + let mut cursor = Cursor::new(self.bytes.as_mut()); + // buffer is sized so these should never fail + encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); + encode_webm_element(WebmElement::Timecode(timecode), &mut cursor).unwrap(); + self.bytes_used = cursor.position() as u8; + } + pub fn observe_simpleblock_timecode(&mut self, timecode: i16) { + let absolute_timecode = self.start + (timecode as u64); + if absolute_timecode > self.start { + self.end = absolute_timecode; + } + } +} + +impl AsRef<[u8]> for ClusterHead { + fn as_ref(&self) -> &[u8] { + self.bytes[..self.bytes_used as usize].as_ref() + } +} + +#[derive(Clone, Debug)] +pub enum Chunk { + Headers { + bytes: Arc> + }, + ClusterHead(ClusterHead), + ClusterBody { + bytes: Arc> + } +} + +impl AsRef<[u8]> for Chunk { + fn as_ref(&self) -> &[u8] { + match self { + &Chunk::Headers {ref bytes, ..} => bytes.as_ref().as_ref(), + &Chunk::ClusterHead(ref cluster_head) => cluster_head.as_ref(), + &Chunk::ClusterBody {ref bytes, ..} => bytes.as_ref().as_ref() + } + } +} + +#[derive(Debug)] +enum ChunkerState { + BuildingHeader(Cursor>), + // ClusterHead & body buffer + BuildingCluster(ClusterHead, Cursor>), + EmittingClusterBody(Vec), + EmittingClusterBodyBeforeNewHeader { + body: Vec, + new_header: Cursor> + }, + EmittingFinalClusterBody(Vec), + End +} + +pub struct WebmChunker { + source: S, + buffer_size_limit: Option, + state: ChunkerState +} + +impl WebmChunker { + /// add a "soft" buffer size limit; if a chunk buffer exceeds this size, + /// error the stream instead of resuming. It's still possible for a buffer + /// to exceed this size *after* a write, so ensure input sizes are reasonable. + pub fn with_soft_limit(mut self, limit: usize) -> Self { + self.buffer_size_limit = Some(limit); + self + } +} + +fn encode(element: WebmElement, buffer: &mut Cursor>, limit: Option) -> Result<(), WebmetroError> { + if let Some(limit) = limit { + if limit <= buffer.get_ref().len() { + return Err(WebmetroError::ResourcesExceeded); + } + } + + encode_webm_element(element, buffer).map_err(|err| err.into()) +} + +impl Stream for WebmChunker +where S::Error: Into +{ + type Item = Chunk; + type Error = WebmetroError; + + fn poll(&mut self) -> Result>, WebmetroError> { + loop { + let mut return_value = None; + let mut new_state = None; + + match self.state { + ChunkerState::BuildingHeader(ref mut buffer) => { + match self.source.poll_event() { + Err(passthru) => return Err(passthru.into()), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(Some(WebmElement::Cluster))) => { + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + let header_chunk = Chunk::Headers {bytes: Arc::new(liberated_buffer.into_inner())}; + + return_value = Some(Ok(Async::Ready(Some(header_chunk)))); + new_state = Some(ChunkerState::BuildingCluster( + ClusterHead::new(0), + Cursor::new(Vec::new()) + )); + }, + Ok(Async::Ready(Some(WebmElement::Info))) => {}, + Ok(Async::Ready(Some(WebmElement::Void))) => {}, + Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, + Ok(Async::Ready(Some(element))) => { + encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); + } + } + }, + ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => { + match self.source.poll_event() { + Err(passthru) => return Err(passthru.into()), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(Some(element @ WebmElement::EbmlHead))) + | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => { + let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + + let mut new_header_cursor = Cursor::new(Vec::new()); + match encode(element, &mut new_header_cursor, self.buffer_size_limit) { + Ok(_) => { + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); + new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{ + body: liberated_buffer.into_inner(), + new_header: new_header_cursor + }); + }, + Err(err) => { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + } + } + } + Ok(Async::Ready(Some(WebmElement::Cluster))) => { + let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); + new_state = Some(ChunkerState::EmittingClusterBody(liberated_buffer.into_inner())); + }, + Ok(Async::Ready(Some(WebmElement::Timecode(timecode)))) => { + cluster_head.update_timecode(timecode); + }, + Ok(Async::Ready(Some(WebmElement::SimpleBlock(ref block)))) => { + if (block.flags & 0b10000000) != 0 { + // TODO: this is incorrect, condition needs to also affirm we're the first video block of the cluster + cluster_head.keyframe = true; + } + cluster_head.observe_simpleblock_timecode(block.timecode); + encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); + }, + Ok(Async::Ready(Some(WebmElement::Info))) => {}, + Ok(Async::Ready(Some(WebmElement::Void))) => {}, + Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {}, + Ok(Async::Ready(Some(element))) => { + encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| { + return_value = Some(Err(err)); + new_state = Some(ChunkerState::End); + }); + }, + Ok(Async::Ready(None)) => { + // flush final Cluster on end of stream + let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0)); + let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new())); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head))))); + new_state = Some(ChunkerState::EmittingFinalClusterBody(liberated_buffer.into_inner())); + } + } + }, + ChunkerState::EmittingClusterBody(ref mut buffer) => { + let liberated_buffer = mem::replace(buffer, Vec::new()); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); + new_state = Some(ChunkerState::BuildingCluster( + ClusterHead::new(0), + Cursor::new(Vec::new()) + )); + }, + ChunkerState::EmittingClusterBodyBeforeNewHeader { ref mut body, ref mut new_header } => { + let liberated_body = mem::replace(body, Vec::new()); + let liberated_header_cursor = mem::replace(new_header, Cursor::new(Vec::new())); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_body)})))); + new_state = Some(ChunkerState::BuildingHeader(liberated_header_cursor)); + }, + ChunkerState::EmittingFinalClusterBody(ref mut buffer) => { + // flush final Cluster on end of stream + let liberated_buffer = mem::replace(buffer, Vec::new()); + + return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Arc::new(liberated_buffer)})))); + new_state = Some(ChunkerState::End); + }, + ChunkerState::End => return Ok(Async::Ready(None)) + }; + + if let Some(new_state) = new_state { + self.state = new_state; + } + if let Some(return_value) = return_value { + return return_value; + } + } + } +} + +pub trait WebmStream where Self: Sized + EbmlEventSource { + fn chunk_webm(self) -> WebmChunker { + WebmChunker { + source: self, + buffer_size_limit: None, + state: ChunkerState::BuildingHeader(Cursor::new(Vec::new())) + } + } +} + +impl WebmStream for T {} + +#[cfg(test)] +mod tests { + + use chunk::*; + + #[test] + fn enough_space_for_header() { + ClusterHead::new(u64::max_value()); + } +} diff --git a/src/commands/dump.rs b/src/commands/dump.rs new file mode 100644 index 0000000..8b691e6 --- /dev/null +++ b/src/commands/dump.rs @@ -0,0 +1,35 @@ +use clap::{App, AppSettings, ArgMatches, SubCommand}; +use futures::prelude::*; + +use super::stdin_stream; +use webmetro::{ + error::WebmetroError, + stream_parser::StreamEbml, + webm::{ + SimpleBlock, + WebmElement::* + } +}; + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("dump") + .setting(AppSettings::Hidden) + .about("Dumps WebM parsing events from parsing stdin") +} + +pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> { + + let mut events = stdin_stream().parse_ebml(); + + // stdin is sync so Async::NotReady will never happen + while let Ok(Async::Ready(Some(element))) = events.poll_event() { + match element { + // suppress printing byte arrays + Tracks(slice) => println!("Tracks[{}]", slice.len()), + SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode), + other => println!("{:?}", other) + } + } + + Ok(()) +} diff --git a/src/commands/filter.rs b/src/commands/filter.rs new file mode 100644 index 0000000..1cdecde --- /dev/null +++ b/src/commands/filter.rs @@ -0,0 +1,43 @@ +use std::{ + io, + io::prelude::* +}; + +use clap::{App, Arg, ArgMatches, SubCommand}; +use futures::prelude::*; + +use super::stdin_stream; +use webmetro::{ + chunk::{ + Chunk, + WebmStream + }, + error::WebmetroError, + fixers::ChunkStream, + stream_parser::StreamEbml +}; + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("filter") + .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.") + .arg(Arg::with_name("throttle") + .long("throttle") + .help("Slow down output to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) +} + +pub fn run(args: &ArgMatches) -> Box + Send> { + let mut chunk_stream: Box + Send> = Box::new( + stdin_stream() + .parse_ebml() + .chunk_webm() + .fix_timecodes() + ); + + if args.is_present("throttle") { + chunk_stream = Box::new(chunk_stream.throttle()); + } + + Box::new(chunk_stream.for_each(|chunk| { + io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::IoError) + })) +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..9ebb8b4 --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,41 @@ +use std::error::Error; +use std::io::{ + Error as IoError, + ErrorKind, + stdin, + Stdin +}; + +use futures::{ + prelude::*, + stream::MapErr +}; +use hyper::Error as HyperError; +use tokio_io::{ + io::AllowStdIo, + codec::{ + BytesCodec, + FramedRead + } +}; +use webmetro::error::WebmetroError; + +pub mod dump; +pub mod filter; +pub mod relay; +pub mod send; + +/// An adapter that makes chunks of bytes from stdin available as a Stream; +/// is NOT actually async, and just uses blocking read. Don't use more than +/// one at once, who knows who gets which bytes. +pub fn stdin_stream() -> MapErr, BytesCodec>, fn(IoError) -> WebmetroError> { + FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new()) + .map_err(WebmetroError::IoError) +} + +pub fn to_hyper_error(err: WebmetroError) -> HyperError { + match err { + WebmetroError::IoError(io_err) => io_err.into(), + err => IoError::new(ErrorKind::InvalidData, err.description()).into() + } +} diff --git a/src/commands/relay.rs b/src/commands/relay.rs new file mode 100644 index 0000000..17bc018 --- /dev/null +++ b/src/commands/relay.rs @@ -0,0 +1,140 @@ +use std::error::Error; +use std::net::ToSocketAddrs; +use std::sync::{ + Arc, + Mutex +}; + +use clap::{App, Arg, ArgMatches, SubCommand}; +use futures::{ + Future, + Stream, + Sink, + future::{ + FutureResult, + ok + }, + stream::empty +}; +use hyper::{ + Error as HyperError, + Get, + Head, + Post, + Put, + StatusCode, + header::ContentType, + server::{Http, Request, Response, Service} +}; +use webmetro::{ + channel::{ + Channel, + Listener, + Transmitter + }, + chunk::{Chunk, WebmStream}, + error::WebmetroError, + fixers::ChunkStream, + stream_parser::StreamEbml +}; + +use super::to_hyper_error; + +const BUFFER_LIMIT: usize = 2 * 1024 * 1024; + +type BodyStream = Box>; + +struct RelayServer(Arc>); + +impl RelayServer { + fn get_channel(&self) -> Arc> { + self.0.clone() + } + + fn get_stream(&self) -> BodyStream { + Box::new( + Listener::new(self.get_channel()) + .fix_timecodes() + .find_starting_point() + .map_err(|err| match err {}) + ) + } + + fn post_stream, S: Stream + 'static>(&self, stream: S) -> BodyStream + where S::Error: Error + Send { + let source = stream + .map_err(WebmetroError::from_err) + .parse_ebml().with_soft_limit(BUFFER_LIMIT) + .chunk_webm().with_soft_limit(BUFFER_LIMIT); + let sink = Transmitter::new(self.get_channel()); + + Box::new( + source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}})) + .into_stream() + .map(|_| empty()) + .map_err(|err| { + //TODO: log something somewhere + to_hyper_error(err) + }) + .flatten() + ) + } +} + +impl Service for RelayServer { + type Request = Request; + type Response = Response; + type Error = HyperError; + type Future = FutureResult; + + fn call(&self, request: Request) -> Self::Future { + let (method, uri, _http_version, _headers, request_body) = request.deconstruct(); + + //TODO: log equiv to: eprintln!("New {} Request: {}", method, uri.path()); + + ok(match (method, uri.path()) { + (Head, "/live") => { + Response::new() + .with_header(ContentType("video/webm".parse().unwrap())) + }, + (Get, "/live") => { + Response::new() + .with_header(ContentType("video/webm".parse().unwrap())) + .with_body(self.get_stream()) + }, + (Post, "/live") | (Put, "/live") => { + Response::new() + .with_body(self.post_stream(request_body)) + }, + _ => { + Response::new() + .with_status(StatusCode::NotFound) + } + }) + } +} + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("relay") + .about("Hosts an HTTP-based relay server") + .arg(Arg::with_name("listen") + .help("The address:port to listen to") + .required(true)) +} + +pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> { + let single_channel = Channel::new(); + + let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?; + let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?; + + Http::new() + .bind(&addr, move || { + Ok(RelayServer(single_channel.clone())) + }) + .map_err(|err| WebmetroError::Unknown(Box::new(err)))? + .run() + .map_err(|err| WebmetroError::Unknown(Box::new(err)))?; + + Ok(()) +} diff --git a/src/commands/send.rs b/src/commands/send.rs new file mode 100644 index 0000000..6987f01 --- /dev/null +++ b/src/commands/send.rs @@ -0,0 +1,86 @@ +use clap::{App, Arg, ArgMatches, SubCommand}; +use futures::{ + future, + prelude::* +}; +use hyper::{ + Error as HyperError, + Method, + client::{ + Config, + Request + } +}; +use tokio_core::reactor::{ + Handle +}; + +use super::{ + stdin_stream, + to_hyper_error +}; +use webmetro::{ + chunk::{ + Chunk, + WebmStream + }, + error::WebmetroError, + fixers::ChunkStream, + stream_parser::StreamEbml +}; + +pub fn options() -> App<'static, 'static> { + SubCommand::with_name("send") + .about("PUTs WebM from stdin to a relay server.") + .arg(Arg::with_name("url") + .help("The location to upload to") + .required(true)) + .arg(Arg::with_name("throttle") + .long("throttle") + .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) +} + +type BoxedChunkStream = Box>; +type BoxedHyperStream = Box>; + +pub fn run(handle: Handle, args: &ArgMatches) -> Box> { + let mut chunk_stream: BoxedChunkStream = Box::new( + stdin_stream() + .parse_ebml() + .chunk_webm() + .fix_timecodes() + ); + + let url_str = match args.value_of("url") { + Some(url) => String::from(url), + _ => return Box::new(Err(WebmetroError::from_str("Listen address wasn't provided")).into_future()) + }; + + if args.is_present("throttle") { + chunk_stream = Box::new(chunk_stream.throttle()); + } + + let request_body_stream = Box::new(chunk_stream.map_err(|err| { + eprintln!("{}", &err); + to_hyper_error(err) + })) as BoxedHyperStream; + + Box::new(future::lazy(move || { + url_str.parse().map_err(WebmetroError::from_err) + }).and_then(move |uri| { + let client = Config::default() + .body::() + .build(&handle); + + let mut request: Request = Request::new(Method::Put, uri); + request.set_body(request_body_stream); + + client.request(request) + .and_then(|response| { + response.body().for_each(|_chunk| { + Ok(()) + }) + }) + .map_err(WebmetroError::from_err) + })) +} diff --git a/src/data/encode_webm_test.webm b/src/data/encode_webm_test.webm new file mode 100644 index 0000000..badfef2 Binary files /dev/null and b/src/data/encode_webm_test.webm differ diff --git a/src/data/test1.webm b/src/data/test1.webm new file mode 100644 index 0000000..63d7ba2 Binary files /dev/null and b/src/data/test1.webm differ diff --git a/src/ebml.rs b/src/ebml.rs new file mode 100644 index 0000000..cfb9876 --- /dev/null +++ b/src/ebml.rs @@ -0,0 +1,471 @@ +use bytes::{BigEndian, ByteOrder, BufMut}; +use std::error::Error as ErrorTrait; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek, SeekFrom}; +use futures::Async; + +pub const EBML_HEAD_ID: u64 = 0x0A45DFA3; +pub const DOC_TYPE_ID: u64 = 0x0282; +pub const VOID_ID: u64 = 0x6C; + +#[derive(Debug, PartialEq)] +pub enum EbmlError { + CorruptVarint, + UnknownElementId, + UnknownElementLength, + CorruptPayload, +} +impl Display for EbmlError { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "{}", self.description()) + } +} +impl ErrorTrait for EbmlError { + fn description(&self) -> &str { + match self { + &EbmlError::CorruptVarint => "EBML Varint could not be parsed", + &EbmlError::UnknownElementId => "EBML element ID was \"unknown\"", + &EbmlError::UnknownElementLength => "EBML element length was \"unknown\" for an element not allowing that", + &EbmlError::CorruptPayload => "EBML element payload could not be parsed", + } + } +} + +#[derive(Debug, PartialEq)] +pub enum WriteError { + OutOfRange +} +impl Display for WriteError { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self { + &WriteError::OutOfRange => write!(f, "EBML Varint out of range") + } + } +} +impl ErrorTrait for WriteError { + fn description(&self) -> &str { + match self { + &WriteError::OutOfRange => "EBML Varint out of range" + } + } +} + +#[derive(Debug, PartialEq)] +pub enum Varint { + /// a numeric value + Value(u64), + /// the reserved "unknown" value + Unknown +} + +/// Try to parse an EBML varint at the start of the given slice. +/// Returns an Err() if the format is corrupt. +/// Returns Ok(None) if more bytes are needed to get a result. +/// Returns Ok(Some((varint, size))) to return a varint value and +/// the size of the parsed varint. +pub fn decode_varint(bytes: &[u8]) -> Result, EbmlError> { + let mut value: u64 = 0; + let mut value_length = 1; + let mut mask: u8 = 0x80; + let mut unknown_marker: u64 = !0; + + if bytes.len() == 0 { + return Ok(None) + } + + // get length marker bit from first byte & parse first byte + while mask > 0 { + if (mask & bytes[0]) != 0 { + value = (bytes[0] & !mask) as u64; + unknown_marker = (mask - 1) as u64; + break + } + value_length += 1; + mask = mask >> 1; + } + + if mask == 0 { + return Err(EbmlError::CorruptVarint) + } + + // check we have enough data to parse + if value_length > bytes.len() { + return Ok(None) + } + + // decode remaining bytes + for i in 1..value_length { + value = (value << 8) + (bytes[i] as u64); + unknown_marker = (unknown_marker << 8) + 0xFF; + } + + // determine result + if value == unknown_marker { + Ok(Some((Varint::Unknown, value_length))) + } else { + Ok(Some((Varint::Value(value), value_length))) + } +} + +/// Try to parse an EBML element header at the start of the given slice. +/// Returns an Err() if the format is corrupt. +/// Returns Ok(None) if more bytes are needed to get a result. +/// Returns Ok(Some((id, varint, size))) to return the element id, +/// the size of the payload, and the size of the parsed header. +pub fn decode_tag(bytes: &[u8]) -> Result, EbmlError> { + // parse element ID + match decode_varint(bytes) { + Ok(None) => Ok(None), + Err(err) => Err(err), + Ok(Some((Varint::Unknown, _))) => Err(EbmlError::UnknownElementId), + Ok(Some((Varint::Value(element_id), id_size))) => { + // parse payload size + match decode_varint(&bytes[id_size..]) { + Ok(None) => Ok(None), + Err(err) => Err(err), + Ok(Some((element_length, length_size))) => + Ok(Some(( + element_id, + element_length, + id_size + length_size + ))) + } + } + } +} + +pub fn decode_uint(bytes: &[u8]) -> Result { + if bytes.len() < 1 || bytes.len() > 8 { + return Err(EbmlError::CorruptPayload); + } + + Ok(BigEndian::read_uint(bytes, bytes.len())) +} + +const SMALL_FLAG: u64 = 0x80; +const EIGHT_FLAG: u64 = 0x01 << (8*7); +const EIGHT_MAX: u64 = EIGHT_FLAG - 2; + +/// Tries to write an EBML varint using minimal space +pub fn encode_varint(varint: Varint, output: &mut T) -> IoResult<()> { + let (size, number) = match varint { + Varint::Unknown => (1, 0xFF), + Varint::Value(too_big) if too_big > EIGHT_MAX => { + return Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)) + }, + Varint::Value(value) => { + let mut flag = SMALL_FLAG; + let mut size = 1; + // flag bit - 1 = UNKNOWN representation once OR'd with the flag; + // if we're less than that, we can OR with the flag bit to get a valid Varint + while value >= (flag - 1) { + // right shift length bit by 1 to indicate adding a new byte; + // left shift by 8 because there's a new byte at the end + flag = flag << (8 - 1); + size += 1; + }; + (size, flag | value) + } + }; + + let mut buffer = Cursor::new([0; 8]); + buffer.put_uint::(number, size); + + return output.write_all(&buffer.get_ref()[..size]); +} + +const FOUR_FLAG: u64 = 0x10 << (8*3); +const FOUR_MAX: u64 = FOUR_FLAG - 2; + +// tries to write a varint with a fixed 4-byte representation +pub fn encode_varint_4(varint: Varint, output: &mut T) -> IoResult<()> { + let number = match varint { + Varint::Unknown => FOUR_FLAG | (FOUR_FLAG - 1), + Varint::Value(too_big) if too_big > FOUR_MAX => { + return Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)) + }, + Varint::Value(value) => FOUR_FLAG | value + }; + + let mut buffer = Cursor::new([0; 4]); + buffer.put_u32::(number as u32); + + output.write_all(&buffer.get_ref()[..]) +} + +pub fn encode_element IoResult, X>(tag: u64, output: &mut T, content: F) -> IoResult<()> { + encode_varint(Varint::Value(tag), output)?; + encode_varint_4(Varint::Unknown, output)?; + + let start = output.seek(SeekFrom::Current(0))?; + content(output)?; + let end = output.seek(SeekFrom::Current(0))?; + + output.seek(SeekFrom::Start(start - 4))?; + encode_varint_4(Varint::Value(end - start), output)?; + output.seek(SeekFrom::Start(end))?; + + Ok(()) +} + +pub fn encode_tag_header(tag: u64, size: Varint, output: &mut T) -> IoResult<()> { + encode_varint(Varint::Value(tag), output)?; + encode_varint(size, output) +} + +/// Tries to write a simple EBML tag with a string or binary value +pub fn encode_bytes(tag: u64, bytes: &[u8], output: &mut T) -> IoResult<()> { + encode_tag_header(tag, Varint::Value(bytes.len() as u64), output)?; + output.write_all(bytes) +} + +/// Tries to write a simple EBML tag with an integer value +pub fn encode_integer(tag: u64, value: u64, output: &mut T) -> IoResult<()> { + encode_tag_header(tag, Varint::Value(8), output)?; + + let mut buffer = Cursor::new([0; 8]); + buffer.put_u64::(value); + + output.write_all(&buffer.get_ref()[..]) +} + +pub trait FromEbml<'a>: Sized { + /// Indicates if this tag's contents should be treated as a blob, + /// or if the tag header should be reported as an event and with further + /// parsing descending into its content. + /// + /// Unknown-size tags can *only* be parsed if unwrapped, and will error otherwise. + fn should_unwrap(element_id: u64) -> bool; + + /// Given an element's ID and its binary payload, if any, construct a suitable + /// instance of this type to represent the event. The instance may contain + /// references into the given buffer. + fn decode(element_id: u64, bytes: &'a[u8]) -> Result; + + /// Check if enough space exists in the given buffer for decode_element() to + /// be successful; parsing errors will be returned eagerly. + fn check_space(bytes: &[u8]) -> Result, EbmlError> { + match decode_tag(bytes) { + Ok(None) => Ok(None), + Err(err) => Err(err), + Ok(Some((element_id, payload_size_tag, tag_size))) => { + let should_unwrap = Self::should_unwrap(element_id); + + let payload_size = match (should_unwrap, payload_size_tag) { + (true, _) => 0, + (false, Varint::Unknown) => return Err(EbmlError::UnknownElementLength), + (false, Varint::Value(size)) => size as usize + }; + + let element_size = tag_size + payload_size; + if element_size > bytes.len() { + // need to read more still + Ok(None) + } else { + Ok(Some(element_size)) + } + } + } + } + + /// Attempt to construct an instance of this type from the given byte slice + fn decode_element(bytes: &'a[u8]) -> Result, EbmlError> { + match decode_tag(bytes) { + Ok(None) => Ok(None), + Err(err) => Err(err), + Ok(Some((element_id, payload_size_tag, tag_size))) => { + let should_unwrap = Self::should_unwrap(element_id); + + let payload_size = match (should_unwrap, payload_size_tag) { + (true, _) => 0, + (false, Varint::Unknown) => return Err(EbmlError::UnknownElementLength), + (false, Varint::Value(size)) => size as usize + }; + + let element_size = tag_size + payload_size; + if element_size > bytes.len() { + // need to read more still + return Ok(None); + } + + match Self::decode(element_id, &bytes[tag_size..element_size]) { + Ok(element) => Ok(Some((element, element_size))), + Err(error) => Err(error) + } + } + } + } +} + +pub trait EbmlEventSource { + type Error; + fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, Self::Error>; +} + +#[cfg(test)] +mod tests { + use bytes::{BytesMut}; + use ebml::*; + use ebml::EbmlError::{CorruptVarint, UnknownElementId}; + use ebml::Varint::{Unknown, Value}; + use std::io::Cursor; + use tests::TEST_FILE; + + #[test] + fn fail_corrupted_varints() { + assert_eq!(decode_varint(&[0]), Err(CorruptVarint)); + assert_eq!(decode_varint(&[0, 0, 0]), Err(CorruptVarint)); + } + + #[test] + fn incomplete_varints() { + assert_eq!(decode_varint(&[]), Ok(None)); + assert_eq!(decode_varint(&[0x40]), Ok(None)); + assert_eq!(decode_varint(&[0x01, 0, 0]), Ok(None)); + } + + #[test] + fn parse_varints() { + assert_eq!(decode_varint(&[0xFF]), Ok(Some((Unknown, 1)))); + assert_eq!(decode_varint(&[0x7F, 0xFF]), Ok(Some((Unknown, 2)))); + assert_eq!(decode_varint(&[0x80]), Ok(Some((Value(0), 1)))); + assert_eq!(decode_varint(&[0x81]), Ok(Some((Value(1), 1)))); + assert_eq!(decode_varint(&[0x40, 52]), Ok(Some((Value(52), 2)))); + + // test extra data in buffer + assert_eq!(decode_varint(&[0x83, 0x11]), Ok(Some((Value(3), 1)))); + } + + #[test] + fn encode_varints() { + let mut buffer = BytesMut::with_capacity(10).writer(); + + let mut no_space = Cursor::new([0; 0]).writer(); + assert_eq!(no_space.get_ref().remaining_mut(), 0); + + let mut six_buffer = Cursor::new([0; 6]).writer(); + assert_eq!(six_buffer.get_ref().remaining_mut(), 6); + + // 1 byte + encode_varint(Varint::Unknown, &mut buffer).unwrap(); + assert_eq!(buffer.get_mut().split_to(1), &[0xFF].as_ref()); + assert_eq!(encode_varint(Varint::Unknown, &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + encode_varint(Varint::Value(0), &mut buffer).unwrap(); + assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 0].as_ref()); + assert_eq!(encode_varint(Varint::Value(0), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + encode_varint(Varint::Value(1), &mut buffer).unwrap(); + assert_eq!(buffer.get_mut().split_to(1), &[0x80 | 1].as_ref()); + assert_eq!(encode_varint(Varint::Value(1), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + encode_varint(Varint::Value(126), &mut buffer).unwrap(); + assert_eq!(buffer.get_mut().split_to(1), &[0xF0 | 126].as_ref()); + assert_eq!(encode_varint(Varint::Value(126), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + // 2 bytes + encode_varint(Varint::Value(127), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 127].as_ref()); + assert_eq!(encode_varint(Varint::Value(127), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + encode_varint(Varint::Value(128), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(2), &[0x40, 128].as_ref()); + assert_eq!(encode_varint(Varint::Value(128), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + + // 6 bytes + assert_eq!(six_buffer.get_ref().remaining_mut(), 6); + encode_varint(Varint::Value(0x03FFFFFFFFFE), &mut six_buffer).unwrap(); + assert_eq!(six_buffer.get_ref().remaining_mut(), 0); + assert_eq!(&six_buffer.get_ref().get_ref(), &[0x07, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); + six_buffer = Cursor::new([0; 6]).writer(); + + // 7 bytes + encode_varint(Varint::Value(0x03FFFFFFFFFF), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(7), &[0x02, 0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF].as_ref()); + + encode_varint(Varint::Value(0x01000000000000), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00].as_ref()); + + encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(7), &[0x03, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); + + assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut no_space).unwrap_err().kind(), ErrorKind::WriteZero); + assert_eq!(encode_varint(Varint::Value(0x01FFFFFFFFFFFE), &mut six_buffer).unwrap_err().kind(), ErrorKind::WriteZero); + + // 8 bytes + encode_varint(Varint::Value(0x01FFFFFFFFFFFF), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(8), &[0x01, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF].as_ref()); + + encode_varint(Varint::Value(0xFFFFFFFFFFFFFE), &mut buffer).unwrap(); + assert_eq!(&buffer.get_mut().split_to(8), &[0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE].as_ref()); + + assert_eq!(encode_varint(Varint::Value(0xFFFFFFFFFFFFFF), &mut buffer).unwrap_err().kind(), ErrorKind::InvalidInput); + assert_eq!(encode_varint(Varint::Value(u64::max_value()), &mut buffer).unwrap_err().kind(), ErrorKind::InvalidInput); + } + + #[test] + fn fail_corrupted_tags() { + assert_eq!(decode_tag(&[0]), Err(CorruptVarint)); + assert_eq!(decode_tag(&[0x80, 0]), Err(CorruptVarint)); + assert_eq!(decode_tag(&[0xFF, 0x80]), Err(UnknownElementId)); + assert_eq!(decode_tag(&[0x7F, 0xFF, 0x40, 0]), Err(UnknownElementId)); + } + + #[test] + fn incomplete_tags() { + assert_eq!(decode_tag(&[]), Ok(None)); + assert_eq!(decode_tag(&[0x80]), Ok(None)); + assert_eq!(decode_tag(&[0x40, 0, 0x40]), Ok(None)); + } + + #[test] + fn parse_tags() { + assert_eq!(decode_tag(&[0x80, 0x80]), Ok(Some((0, Value(0), 2)))); + assert_eq!(decode_tag(&[0x81, 0x85]), Ok(Some((1, Value(5), 2)))); + assert_eq!(decode_tag(&[0x80, 0xFF]), Ok(Some((0, Unknown, 2)))); + assert_eq!(decode_tag(&[0x80, 0x7F, 0xFF]), Ok(Some((0, Unknown, 3)))); + assert_eq!(decode_tag(&[0x85, 0x40, 52]), Ok(Some((5, Value(52), 3)))); + } + + #[test] + fn bad_uints() { + assert_eq!(decode_uint(&[]), Err(EbmlError::CorruptPayload)); + assert_eq!(decode_uint(&[0; 9]), Err(EbmlError::CorruptPayload)); + } + + #[test] + fn parse_uints() { + assert_eq!(decode_uint(&[0]), Ok(0)); + assert_eq!(decode_uint(&[0; 8]), Ok(0)); + assert_eq!(decode_uint(&[1]), Ok(1)); + assert_eq!(decode_uint(&[0,0,0,0,0,0,0,1]), Ok(1)); + assert_eq!(decode_uint(&[38]), Ok(38)); + assert_eq!(decode_uint(&[0,0,0,0,0,0,0,38]), Ok(38)); + assert_eq!(decode_uint(&[0x7F,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF]), Ok(9223372036854775807)); + assert_eq!(decode_uint(&[0x80,0,0,0,0,0,0,0]), Ok(9223372036854775808)); + assert_eq!(decode_uint(&[0x80,0,0,0,0,0,0,1]), Ok(9223372036854775809)); + } + + #[derive(Debug, PartialEq)] + struct GenericElement(u64, usize); + + impl<'a> FromEbml<'a> for GenericElement { + fn should_unwrap(element_id: u64) -> bool { + match element_id { + _ => false + } + } + + fn decode(element_id: u64, bytes: &'a[u8]) -> Result { + match element_id { + _ => Ok(GenericElement(element_id, bytes.len())) + } + } + } + + #[test] + fn decode_sanity_test() { + let decoded = GenericElement::decode_element(TEST_FILE); + assert_eq!(decoded, Ok(Some((GenericElement(0x0A45DFA3, 31), 43)))); + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..40f2007 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,75 @@ +use std::{ + error::Error, + fmt::{ + Display, + Formatter, + Result as FmtResult + }, + io::Error as IoError +}; + +use ebml::EbmlError; + +#[derive(Debug)] +pub enum WebmetroError { + ResourcesExceeded, + EbmlError(EbmlError), + IoError(IoError), + Unknown(Box) +} + +impl WebmetroError { + pub fn from_str(string: &str) -> WebmetroError { + string.into() + } + + pub fn from_err(err: E) -> WebmetroError { + WebmetroError::Unknown(Box::new(err)) + } +} + +impl Display for WebmetroError { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self { + &WebmetroError::ResourcesExceeded => write!(f, "resources exceeded"), + &WebmetroError::EbmlError(ref err) => err.fmt(f), + &WebmetroError::IoError(ref err) => err.fmt(f), + &WebmetroError::Unknown(ref err) => err.fmt(f), + } + } +} +impl Error for WebmetroError { + fn description(&self) -> &str { + match self { + &WebmetroError::ResourcesExceeded => "resources exceeded", + &WebmetroError::EbmlError(ref err) => err.description(), + &WebmetroError::IoError(ref err) => err.description(), + &WebmetroError::Unknown(ref err) => err.description(), + } + } +} + +impl From for WebmetroError { + fn from(err: EbmlError) -> WebmetroError { + WebmetroError::EbmlError(err) + } +} + +impl From for WebmetroError { + fn from(err: IoError) -> WebmetroError { + WebmetroError::IoError(err) + } +} + +impl From> for WebmetroError { + fn from(err: Box) -> WebmetroError { + WebmetroError::Unknown(err) + } +} + +impl<'a> From<&'a str> for WebmetroError { + fn from(err: &'a str) -> WebmetroError { + let error: Box = err.into(); + WebmetroError::Unknown(error) + } +} diff --git a/src/fixers.rs b/src/fixers.rs new file mode 100644 index 0000000..dc6fd24 --- /dev/null +++ b/src/fixers.rs @@ -0,0 +1,144 @@ +use std::time::{Duration, Instant}; + +use futures::prelude::*; +use tokio::timer::Delay; + +use chunk::Chunk; +use error::WebmetroError; + +pub struct ChunkTimecodeFixer { + stream: S, + current_offset: u64, + last_observed_timecode: u64, + assumed_duration: u64 +} + +impl> Stream for ChunkTimecodeFixer +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Result>, Self::Error> { + let mut poll_chunk = self.stream.poll(); + match poll_chunk { + Ok(Async::Ready(Some(Chunk::ClusterHead(ref mut cluster_head)))) => { + let start = cluster_head.start; + if start < self.last_observed_timecode { + let next_timecode = self.last_observed_timecode + self.assumed_duration; + self.current_offset = next_timecode - start; + } + + cluster_head.update_timecode(start + self.current_offset); + self.last_observed_timecode = cluster_head.end; + }, + _ => {} + }; + poll_chunk + } +} + +pub struct StartingPointFinder { + stream: S, + seen_header: bool, + seen_keyframe: bool +} + +impl> Stream for StartingPointFinder +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Result>, Self::Error> { + loop { + return match self.stream.poll() { + Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => { + if cluster_head.keyframe { + self.seen_keyframe = true; + } + + if self.seen_keyframe { + Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) + } else { + continue; + } + }, + chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => { + if self.seen_keyframe { + chunk + } else { + continue; + } + }, + chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => { + if self.seen_header { + // new stream starting, we don't need a new header but should wait for a safe spot to resume + self.seen_keyframe = false; + continue; + } else { + self.seen_header = true; + chunk + } + }, + chunk => chunk + } + }; + } +} + +pub struct Throttle { + stream: S, + start_time: Instant, + sleep: Delay +} + +impl> Stream for Throttle +{ + type Item = S::Item; + type Error = WebmetroError; + + fn poll(&mut self) -> Result>, WebmetroError> { + match self.sleep.poll() { + Err(err) => return Err(WebmetroError::Unknown(Box::new(err))), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => { /* can continue */ } + } + + let next_chunk = self.stream.poll(); + if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk { + // snooze until real time has "caught up" to the stream + let offset = Duration::from_millis(cluster_head.end); + self.sleep.reset(self.start_time + offset); + } + next_chunk + } +} + +pub trait ChunkStream where Self : Sized + Stream { + fn fix_timecodes(self) -> ChunkTimecodeFixer { + ChunkTimecodeFixer { + stream: self, + current_offset: 0, + last_observed_timecode: 0, + assumed_duration: 33 + } + } + + fn find_starting_point(self) -> StartingPointFinder { + StartingPointFinder { + stream: self, + seen_header: false, + seen_keyframe: false + } + } + + fn throttle(self) -> Throttle { + let now = Instant::now(); + Throttle { + stream: self, + start_time: now, + sleep: Delay::new(now) + } + } +} + +impl> ChunkStream for T {} diff --git a/src/iterator.rs b/src/iterator.rs new file mode 100644 index 0000000..ed5c2fd --- /dev/null +++ b/src/iterator.rs @@ -0,0 +1,20 @@ +use std::marker::PhantomData; + +use ebml::FromEbml; + +pub struct EbmlIterator<'a, T: FromEbml<'a>>(&'a [u8], PhantomData T>); + +pub fn ebml_iter<'a, T: FromEbml<'a>>(source: &'a [u8])-> EbmlIterator<'a, T> { + EbmlIterator(source, PhantomData) +} + +impl<'a, T: FromEbml<'a>> Iterator for EbmlIterator<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + T::decode_element(self.0).unwrap_or(None).and_then(|(element, element_size)| { + self.0 = &self.0[element_size..]; + Some(element) + }) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..b15d184 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,37 @@ + +extern crate bytes; +extern crate futures; +extern crate odds; +extern crate tokio; + +pub mod ebml; +pub mod error; +pub mod iterator; +pub mod slice; +pub mod stream_parser; + +pub mod chunk; +pub mod fixers; +pub mod webm; + +pub mod channel; + +pub use ebml::{EbmlError, FromEbml}; + +#[cfg(test)] +mod tests { + use futures::future::{ok, Future}; + + pub const TEST_FILE: &'static [u8] = include_bytes!("data/test1.webm"); + pub const ENCODE_WEBM_TEST_FILE: &'static [u8] = include_bytes!("data/encode_webm_test.webm"); + + #[test] + fn hello_futures() { + let my_future = ok::("Hello".into()) + .map(|hello| hello + ", Futures!"); + + let string_result = my_future.wait().unwrap(); + + assert_eq!(string_result, "Hello, Futures!"); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a1232ef --- /dev/null +++ b/src/main.rs @@ -0,0 +1,65 @@ +#[macro_use] extern crate clap; +extern crate futures; +extern crate hyper; +extern crate tokio; +extern crate tokio_core; +extern crate tokio_io; +extern crate webmetro; + +mod commands; + +use clap::{App, AppSettings}; +use futures::prelude::*; +use tokio_core::reactor::Core; +use webmetro::error::WebmetroError; + +use commands::{ + relay, + filter, + send, + dump +}; + +fn options() -> App<'static, 'static> { + App::new("webmetro") + .version(crate_version!()) + .about("Utilities for broadcasting & relaying live WebM video/audio streams") + .setting(AppSettings::DisableHelpSubcommand) + .setting(AppSettings::VersionlessSubcommands) + .subcommand(relay::options()) + .subcommand(filter::options()) + .subcommand(send::options()) + .subcommand(dump::options()) +} + +fn main() { + let args = options().get_matches(); + + let core = Core::new().unwrap(); + let handle = core.handle(); + + tokio_run(core, match args.subcommand() { + ("filter", Some(sub_args)) => box_up(filter::run(sub_args)), + ("relay", Some(sub_args)) => box_up(relay::run(sub_args)), + ("send", Some(sub_args)) => box_up(send::run(handle, sub_args)), + ("dump", Some(sub_args)) => box_up(dump::run(sub_args)), + _ => box_up(futures::lazy(|| { + options().print_help().unwrap(); + println!(""); + Ok(()) + })) + }); +} + +fn tokio_run(mut core: Core, task: Box>) { + core.run(task.into_future()).unwrap_or_else(|err| { + eprintln!("Error: {}", err); + ::std::process::exit(1); + }); +} + +fn box_up>(task: F) -> Box> +where F::Future: 'static +{ + Box::new(task.into_future()) +} diff --git a/src/slice.rs b/src/slice.rs new file mode 100644 index 0000000..9b1b547 --- /dev/null +++ b/src/slice.rs @@ -0,0 +1,18 @@ +use futures::Async; + +use ebml::EbmlError; +use ebml::EbmlEventSource; +use ebml::FromEbml; + +pub struct EbmlSlice<'a>(pub &'a [u8]); + +impl<'b> EbmlEventSource for EbmlSlice<'b> { + type Error = EbmlError; + + fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, EbmlError> { + T::decode_element(self.0).map(|option| option.map(|(element, element_size)| { + self.0 = &self.0[element_size..]; + element + })).map(Async::Ready) + } +} diff --git a/src/stream_parser.rs b/src/stream_parser.rs new file mode 100644 index 0000000..0d3c9cb --- /dev/null +++ b/src/stream_parser.rs @@ -0,0 +1,93 @@ +use bytes::BytesMut; +use bytes::BufMut; +use futures::Async; +use futures::stream::Stream; + +use ebml::EbmlEventSource; +use ebml::FromEbml; +use error::WebmetroError; + +pub struct EbmlStreamingParser { + stream: S, + buffer: BytesMut, + buffer_size_limit: Option, + last_read: usize +} + +impl EbmlStreamingParser { + /// add a "soft" buffer size limit; if the input buffer exceeds this size, + /// error the stream instead of resuming. It's still possible for the buffer + /// to exceed this size *after* a fill, so ensure input sizes are reasonable. + pub fn with_soft_limit(mut self, limit: usize) -> Self { + self.buffer_size_limit = Some(limit); + self + } +} + +pub trait StreamEbml where Self: Sized + Stream, Self::Item: AsRef<[u8]> { + fn parse_ebml(self) -> EbmlStreamingParser { + EbmlStreamingParser { + stream: self, + buffer: BytesMut::new(), + buffer_size_limit: None, + last_read: 0 + } + } +} + +impl, S: Stream> StreamEbml for S {} + +impl, S: Stream> EbmlStreamingParser { + pub fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> { + // release buffer from previous event + self.buffer.advance(self.last_read); + self.last_read = 0; + + loop { + match T::check_space(&self.buffer) { + Ok(None) => { + // need to refill buffer, below + }, + other => return other.map_err(WebmetroError::EbmlError).and_then(move |_| { + match T::decode_element(&self.buffer) { + Err(err) => Err(WebmetroError::EbmlError(err)), + Ok(None) => panic!("Buffer was supposed to have enough data to parse element, somehow did not."), + Ok(Some((element, element_size))) => { + self.last_read = element_size; + Ok(Async::Ready(Some(element))) + } + } + }) + } + + if let Some(limit) = self.buffer_size_limit { + if limit <= self.buffer.len() { + return Err(WebmetroError::ResourcesExceeded); + } + } + + match self.stream.poll() { + Ok(Async::Ready(Some(chunk))) => { + self.buffer.reserve(chunk.as_ref().len()); + self.buffer.put_slice(chunk.as_ref()); + // ok can retry decoding now + }, + other => return other.map(|async| async.map(|_| None)) + } + } + } +} + +impl, S: Stream> EbmlEventSource for EbmlStreamingParser { + type Error = WebmetroError; + + fn poll_event<'a, T: FromEbml<'a>>(&'a mut self) -> Result>, WebmetroError> { + return EbmlStreamingParser::poll_event(self); + } +} + +#[cfg(test)] +mod tests { + //#[test] + +} diff --git a/src/webm.rs b/src/webm.rs new file mode 100644 index 0000000..6a48b60 --- /dev/null +++ b/src/webm.rs @@ -0,0 +1,220 @@ +use std::io::{Cursor, Error as IoError, ErrorKind, Result as IoResult, Write, Seek}; +use bytes::{BigEndian, BufMut, ByteOrder}; +use ebml::*; +use iterator::ebml_iter; +use iterator::EbmlIterator; + +const SEGMENT_ID: u64 = 0x08538067; +const SEEK_HEAD_ID: u64 = 0x014D9B74; +const SEGMENT_INFO_ID: u64 = 0x0549A966; +const CUES_ID: u64 = 0x0C53BB6B; +const TRACKS_ID: u64 = 0x0654AE6B; +const CLUSTER_ID: u64 = 0x0F43B675; +const TIMECODE_ID: u64 = 0x67; +const SIMPLE_BLOCK_ID: u64 = 0x23; + +pub fn parse_webm<'a, T: AsRef<[u8]> + ?Sized>(source: &'a T) -> EbmlIterator<'a, WebmElement> { + ebml_iter(source.as_ref()) +} + +#[derive(Debug, PartialEq, Copy, Clone)] +pub struct SimpleBlock<'b> { + pub track: u64, + pub timecode: i16, + pub flags: u8, + pub data: &'b[u8] +} + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum WebmElement<'b> { + EbmlHead, + Void, + Segment, + SeekHead, + Info, + Cues, + Tracks(&'b[u8]), + Cluster, + Timecode(u64), + SimpleBlock(SimpleBlock<'b>), + Unknown(u64) +} + +impl<'b> FromEbml<'b> for WebmElement<'b> { + fn should_unwrap(element_id: u64) -> bool { + match element_id { + // Segment + SEGMENT_ID => true, + CLUSTER_ID => true, + _ => false + } + } + + fn decode(element_id: u64, bytes: &'b[u8]) -> Result, EbmlError> { + match element_id { + EBML_HEAD_ID => Ok(WebmElement::EbmlHead), + VOID_ID => Ok(WebmElement::Void), + SEGMENT_ID => Ok(WebmElement::Segment), + SEEK_HEAD_ID => Ok(WebmElement::SeekHead), + SEGMENT_INFO_ID => Ok(WebmElement::Info), + CUES_ID => Ok(WebmElement::Cues), + TRACKS_ID => Ok(WebmElement::Tracks(bytes)), + CLUSTER_ID => Ok(WebmElement::Cluster), + TIMECODE_ID => decode_uint(bytes).map(WebmElement::Timecode), + SIMPLE_BLOCK_ID => decode_simple_block(bytes), + _ => Ok(WebmElement::Unknown(element_id)) + } + } +} + +fn decode_simple_block(bytes: &[u8]) -> Result { + if let Ok(Some((Varint::Value(track), track_field_len))) = decode_varint(bytes) { + let header_len = track_field_len + 2 + 1; + if bytes.len() < header_len { + return Err(EbmlError::CorruptPayload); + } + let timecode = BigEndian::read_i16(&bytes[track_field_len..]); + let flags = bytes[track_field_len + 2]; + return Ok(WebmElement::SimpleBlock(SimpleBlock { + track: track, + timecode: timecode, + flags: flags, + data: &bytes[header_len..], + })) + } else { + return Err(EbmlError::CorruptPayload); + } +} + +pub fn encode_simple_block(block: SimpleBlock, output: &mut T) -> IoResult<()> { + let SimpleBlock { + track, + timecode, + flags, + data + } = block; + + // limiting number of tracks for now + if track > 31 { + return Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)); + } + let header_len = 1 + 2 + 1; + encode_tag_header(SIMPLE_BLOCK_ID, Varint::Value((header_len + data.len()) as u64), output)?; + + encode_varint(Varint::Value(track), output)?; + + let mut buffer = Cursor::new([0; 3]); + buffer.put_i16::(timecode); + buffer.put_u8(flags); + + output.write_all(&buffer.get_ref()[..])?; + output.write_all(data) +} + +pub fn encode_webm_element(element: WebmElement, output: &mut T) -> IoResult<()> { + match element { + WebmElement::EbmlHead => encode_element(EBML_HEAD_ID, output, |output| { + encode_bytes(DOC_TYPE_ID, "webm".as_bytes(), output) + }), + WebmElement::Segment => encode_tag_header(SEGMENT_ID, Varint::Unknown, output), + WebmElement::SeekHead => Ok(()), + WebmElement::Cues => Ok(()), + WebmElement::Tracks(data) => encode_bytes(TRACKS_ID, data, output), + WebmElement::Cluster => encode_tag_header(CLUSTER_ID, Varint::Unknown, output), + WebmElement::Timecode(time) => encode_integer(TIMECODE_ID, time, output), + WebmElement::SimpleBlock(block) => encode_simple_block(block, output), + WebmElement::Void => Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)), + WebmElement::Info => Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)), + WebmElement::Unknown(_) => Err(IoError::new(ErrorKind::InvalidInput, WriteError::OutOfRange)) + } +} + +#[cfg(test)] +mod tests { + use tests::{ + TEST_FILE, + ENCODE_WEBM_TEST_FILE + }; + use webm::*; + + #[test] + fn decode_webm_test1() { + let mut iter = parse_webm(TEST_FILE); + + // test that we match the structure of the test file + assert_eq!(iter.next(), Some(WebmElement::EbmlHead)); + assert_eq!(iter.next(), Some(WebmElement::Segment)); + assert_eq!(iter.next(), Some(WebmElement::SeekHead)); + assert_eq!(iter.next(), Some(WebmElement::Void)); + assert_eq!(iter.next(), Some(WebmElement::Info)); + assert_eq!(iter.next(), Some(WebmElement::Tracks(&TEST_FILE[358..421]))); + + assert_eq!(iter.next(), Some(WebmElement::Cluster)); + assert_eq!(iter.next(), Some(WebmElement::Timecode(0))); + assert_eq!(iter.next(), Some(WebmElement::SimpleBlock(SimpleBlock { + track: 1, + timecode: 0, + flags: 0b10000000, + data: &TEST_FILE[443..3683] + }))); + assert_eq!(iter.next(), Some(WebmElement::SimpleBlock(SimpleBlock { + track: 1, + timecode: 33, + flags: 0b00000000, + data: &TEST_FILE[3690..4735] + }))); + assert_eq!(iter.next(), Some(WebmElement::SimpleBlock(SimpleBlock { + track: 1, + timecode: 67, + flags: 0b00000000, + data: &TEST_FILE[4741..4801] + }))); + for _ in 3..30 { + // skip remaining contents for brevity + iter.next(); + } + + assert_eq!(iter.next(), Some(WebmElement::Cluster)); + assert_eq!(iter.next(), Some(WebmElement::Timecode(1000))); + for _ in 0..30 { + // skip contents for brevity + iter.next(); + } + + assert_eq!(iter.next(), Some(WebmElement::Cluster)); + assert_eq!(iter.next(), Some(WebmElement::Timecode(2000))); + for _ in 0..30 { + // skip contents for brevity + iter.next(); + } + + assert_eq!(iter.next(), Some(WebmElement::Cues)); + assert_eq!(iter.next(), None); + } + + #[test] + fn encode_webm_test() { + let mut cursor = Cursor::new(Vec::new()); + + encode_webm_element(WebmElement::EbmlHead, &mut cursor).unwrap(); + encode_webm_element(WebmElement::Segment, &mut cursor).unwrap(); + + encode_webm_element(WebmElement::Tracks(&[]), &mut cursor).unwrap(); + + encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); + encode_webm_element(WebmElement::Timecode(0), &mut cursor).unwrap(); + + encode_webm_element(WebmElement::SimpleBlock(SimpleBlock { + track: 3, + flags: 0x0, + timecode: 123, + data: "Hello, World".as_bytes() + }), &mut cursor).unwrap(); + + encode_webm_element(WebmElement::Cluster, &mut cursor).unwrap(); + encode_webm_element(WebmElement::Timecode(1000), &mut cursor).unwrap(); + + assert_eq!(cursor.get_ref(), &ENCODE_WEBM_TEST_FILE); + } + +}