Compare commits

...

11 Commits

Author SHA1 Message Date
Bevan Hunt 287e1f02c6 bump ver 2021-03-25 17:51:58 -07:00
Bevan Hunt 3570138bb8 fix key for event 2021-03-25 17:48:49 -07:00
Bevan Hunt c18bce2263 update readme 2021-03-25 17:06:28 -07:00
Bevan Hunt 0c43a51f29 remove tenant_name from event 2021-03-25 17:01:23 -07:00
Bevan Hunt 31bbed2019 add http basic auth 2021-03-25 13:19:27 -07:00
Bevan Hunt 0ab4c72281 update to 8.0.0 2021-03-23 15:11:41 -07:00
Bevan Hunt f0e3c4bb38 add tide-acme 2021-03-22 15:15:22 -07:00
Bevan Hunt 7cb0941b6c add service and update readme 2021-03-21 22:12:39 -07:00
Bevan Hunt d4d42a1b37 update nippy ver 2021-03-21 22:07:26 -07:00
Bevan Hunt d3fd496363 replace broker-ntp with nippy 2021-03-21 21:25:58 -07:00
Bevan Hunt 8283767ef0 add changelog 2021-03-21 19:20:09 -07:00
6 changed files with 475 additions and 162 deletions

51
CHANGELOG.md Normal file
View File

@ -0,0 +1,51 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [9.0.2] - 2021-04-25
### Fixed
- keys on event
## [9.0.0] - 2021-04-25
### Removed
- removed tenant_name from event - uses users event name
## [8.1.x] - 2021-04-25
### Added
- Adds http basic auth
## [8.0.x] - 2021-03-23
### Added
- Adds admin_token to create user and command args
- Adds multi-tenancy
## [7.0.x] - 2021-03-22
### Added
- Adds tide-acme
## Changed
- Changes command args
## [6.1.x] - 2021-03-21
### Changed
- Replaced broker-ntp with nippy
## [6.0.x] - 2021-03-20
### Changed
- Replaced Warp with Tide
- Replaced Sled with RocksDB
### Removed
- Rmmoved GET JSON API endpoints
- Removed multi-tenancy
- Removed timestamps, event dispatcher, and cancellation
- Removed broker-grid support (current version)

317
Cargo.lock generated
View File

@ -167,7 +167,7 @@ dependencies = [
"http-types", "http-types",
"httparse", "httparse",
"lazy_static", "lazy_static",
"log 0.4.14", "log",
"pin-project", "pin-project",
] ]
@ -181,7 +181,7 @@ dependencies = [
"fastrand", "fastrand",
"futures-lite", "futures-lite",
"libc", "libc",
"log 0.4.14", "log",
"nb-connect", "nb-connect",
"once_cell", "once_cell",
"parking", "parking",
@ -225,6 +225,17 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "async-rustls"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c86f33abd5a4f3e2d6d9251a9e0c6a7e52eb1113caf893dae8429bf4a53f378"
dependencies = [
"futures-lite",
"rustls",
"webpki",
]
[[package]] [[package]]
name = "async-session" name = "async-session"
version = "2.0.1" version = "2.0.1"
@ -255,7 +266,7 @@ dependencies = [
"async-channel", "async-channel",
"async-std", "async-std",
"http-types", "http-types",
"log 0.4.14", "log",
"memchr", "memchr",
"pin-project-lite 0.1.12", "pin-project-lite 0.1.12",
] ]
@ -279,7 +290,7 @@ dependencies = [
"futures-lite", "futures-lite",
"gloo-timers", "gloo-timers",
"kv-log-macro", "kv-log-macro",
"log 0.4.14", "log",
"memchr", "memchr",
"num_cpus", "num_cpus",
"once_cell", "once_cell",
@ -295,19 +306,6 @@ version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-tls"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400"
dependencies = [
"futures-core",
"futures-io",
"rustls",
"webpki",
"webpki-roots",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.48" version = "0.1.48"
@ -362,9 +360,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.2" version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d175dfa69e619905c4c3cdb7c3c203fa3bdd5d51184e3afdb2742c0280493772" checksum = "f30d3a39baa26f9651f17b375061f3233dde33424a8b72b0dbe93a68a0bc896d"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"serde", "serde",
@ -384,7 +382,7 @@ dependencies = [
"env_logger", "env_logger",
"lazy_static", "lazy_static",
"lazycell", "lazycell",
"log 0.4.14", "log",
"peeking_take_while", "peeking_take_while",
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -400,6 +398,18 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]] [[package]]
name = "blake2b_simd" name = "blake2b_simd"
version = "0.5.11" version = "0.5.11"
@ -451,11 +461,11 @@ dependencies = [
[[package]] [[package]]
name = "broker" name = "broker"
version = "6.0.8" version = "9.0.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-std", "async-std",
"broker-ntp", "base64 0.13.0",
"driftwood", "driftwood",
"futures", "futures",
"go-flag", "go-flag",
@ -463,6 +473,7 @@ dependencies = [
"json", "json",
"jsonwebtoken", "jsonwebtoken",
"lazy_static", "lazy_static",
"nippy",
"rmp-serde", "rmp-serde",
"rocksdb", "rocksdb",
"rust-argon2", "rust-argon2",
@ -470,22 +481,11 @@ dependencies = [
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"tide", "tide",
"tide-acme",
"tide-rustls", "tide-rustls",
"uuid", "uuid",
] ]
[[package]]
name = "broker-ntp"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "037d0f8fa2a4e20313da3d34d3d40606e6366fdc1458f664fc03edc89aa34292"
dependencies = [
"byteorder",
"conv",
"custom_derive",
"log 0.3.9",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.6.1" version = "3.6.1"
@ -504,9 +504,9 @@ dependencies = [
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.3.4" version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "cache-padded" name = "cache-padded"
@ -529,7 +529,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27"
dependencies = [ dependencies = [
"nom", "nom 5.1.2",
] ]
[[package]] [[package]]
@ -757,6 +757,32 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "der-oid-macro"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd17d13ecf875e704369fdbde242483ac769fc18f6af21e43d5a692a079732fc"
dependencies = [
"nom 6.1.2",
"num-bigint 0.3.2",
"num-traits",
"proc-macro-hack",
]
[[package]]
name = "der-parser"
version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13e6cad1223a7b98b59275a56516ed8c40508d21284a32e404ed3fe2ae9a809a"
dependencies = [
"der-oid-macro",
"nom 6.1.2",
"num-bigint 0.3.2",
"num-traits",
"proc-macro-hack",
"rusticata-macros",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.9.0" version = "0.9.0"
@ -792,7 +818,7 @@ checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [ dependencies = [
"atty", "atty",
"humantime", "humantime",
"log 0.4.14", "log",
"regex", "regex",
"termcolor", "termcolor",
] ]
@ -820,7 +846,7 @@ checksum = "2af1a24f391a5a94d756db5092c6576aad494b88a71a5a36b20c67b63e0df034"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"js-sys", "js-sys",
"log 0.4.14", "log",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -838,6 +864,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.13" version = "0.3.13"
@ -1064,7 +1096,7 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"dashmap", "dashmap",
"http-types", "http-types",
"log 0.4.14", "log",
] ]
[[package]] [[package]]
@ -1180,7 +1212,7 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [ dependencies = [
"log 0.4.14", "log",
] ]
[[package]] [[package]]
@ -1195,6 +1227,19 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lexical-core"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if 1.0.0",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.90" version = "0.2.90"
@ -1223,15 +1268,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "log"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
dependencies = [
"log 0.4.14",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.14" version = "0.4.14"
@ -1270,6 +1306,20 @@ dependencies = [
"socket2", "socket2",
] ]
[[package]]
name = "nippy"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "966078feae73ebace00794f728f01c3851ca8c80a582b4f4b4309a35b4acd18c"
dependencies = [
"anyhow",
"async-std",
"byteorder",
"conv",
"custom_derive",
"log",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "5.1.2" version = "5.1.2"
@ -1280,6 +1330,19 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]] [[package]]
name = "num-bigint" name = "num-bigint"
version = "0.2.6" version = "0.2.6"
@ -1291,6 +1354,17 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "num-bigint"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d0a3d5e207573f948a9e5376662aa743a2ea13f7c50a554d7af443a73fbfeba"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.44" version = "0.1.44"
@ -1320,6 +1394,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "oid-registry"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2508c8f170e55be68508b1113956a760a82684f42022f8834fb16ca198621211"
dependencies = [
"der-parser",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.7.2" version = "1.7.2"
@ -1407,7 +1490,7 @@ checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"libc", "libc",
"log 0.4.14", "log",
"wepoll-sys", "wepoll-sys",
"winapi", "winapi",
] ]
@ -1465,6 +1548,12 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.7.3" version = "0.7.3"
@ -1546,6 +1635,18 @@ dependencies = [
"rand_core 0.6.2", "rand_core 0.6.2",
] ]
[[package]]
name = "rcgen"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cb7a2dc0e5307189b6933a61290ff06b65b35bdcaae2b2c50a0c3e355cb118e"
dependencies = [
"chrono",
"pem",
"ring",
"yasna",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.4.5" version = "1.4.5"
@ -1642,6 +1743,15 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "rusticata-macros"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7390af60e66c44130b4c5ea85f2555b7ace835d73b4b889c704dc3cb4c0468c8"
dependencies = [
"nom 6.1.2",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.19.0" version = "0.19.0"
@ -1649,12 +1759,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b" checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"log 0.4.14", "log",
"ring", "ring",
"sct", "sct",
"webpki", "webpki",
] ]
[[package]]
name = "rustls-acme"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa2fc1ea3fc15c70f7cb9b72d21b6a6057ebff6a251a86aa678c1a6ee8ef0c85"
dependencies = [
"async-h1",
"async-rustls",
"async-std",
"base64 0.13.0",
"chrono",
"futures",
"http-types",
"log",
"pem",
"rcgen",
"ring",
"serde",
"serde_json",
"thiserror",
"webpki-roots",
"x509-parser",
]
[[package]]
name = "rustversion"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.5" version = "1.0.5"
@ -1801,7 +1941,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b" checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b"
dependencies = [ dependencies = [
"chrono", "chrono",
"num-bigint", "num-bigint 0.2.6",
"num-traits", "num-traits",
] ]
@ -1852,6 +1992,12 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "stdweb" name = "stdweb"
version = "0.4.20" version = "0.4.20"
@ -1930,6 +2076,12 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.2" version = "1.1.2"
@ -1984,7 +2136,7 @@ dependencies = [
"http-client", "http-client",
"http-types", "http-types",
"kv-log-macro", "kv-log-macro",
"log 0.4.14", "log",
"pin-project-lite 0.2.6", "pin-project-lite 0.2.6",
"route-recognizer", "route-recognizer",
"serde", "serde",
@ -1992,15 +2144,27 @@ dependencies = [
] ]
[[package]] [[package]]
name = "tide-rustls" name = "tide-acme"
version = "0.2.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf8521961215f0ebbb27e10c9d610ff2a64f853dc3707c03748350c6dbe022" checksum = "7fccc497c6090bbcb2790c37b9641744e66946358a16e03b934f04fda50d7215"
dependencies = [
"async-std",
"async-trait",
"rustls-acme",
"tide-rustls",
]
[[package]]
name = "tide-rustls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a85b568b611840ba794ae749d4fa8b345b9f71a9c02b82cf0c28ff076fde6b7"
dependencies = [ dependencies = [
"async-dup", "async-dup",
"async-h1", "async-h1",
"async-rustls",
"async-std", "async-std",
"async-tls",
"rustls", "rustls",
"tide", "tide",
] ]
@ -2210,7 +2374,7 @@ checksum = "046ceba58ff062da072c7cb4ba5b22a37f00a302483f7e2a6cdc18fedbdc1fd3"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"lazy_static", "lazy_static",
"log 0.4.14", "log",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn",
@ -2335,3 +2499,38 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "x509-parser"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7999ae290e75ec1d4dc8e9ff9870e48e3542a8f2e9c1e2e07d7ca02b459e10"
dependencies = [
"base64 0.13.0",
"chrono",
"data-encoding",
"der-oid-macro",
"der-parser",
"lazy_static",
"nom 6.1.2",
"num-bigint 0.3.2",
"oid-registry",
"rusticata-macros",
"rustversion",
"thiserror",
]
[[package]]
name = "yasna"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de7bff972b4f2a06c85f6d8454b09df153af7e3a4ec2aac81db1b105b684ddb"
dependencies = [
"chrono",
]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "broker" name = "broker"
version = "6.0.8" version = "9.0.2"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"] authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
@ -22,11 +22,13 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
jsonwebtoken = "7.0.1" jsonwebtoken = "7.0.1"
go-flag = "0.1" go-flag = "0.1"
lazy_static = "1.4" lazy_static = "1.4"
broker-ntp = "0.0.1" nippy = "2"
rust-argon2 = "0.8" rust-argon2 = "0.8"
anyhow = "1" anyhow = "1"
rmp-serde = "0.15" rmp-serde = "0.15"
driftwood = "0.0.6" driftwood = "0.0.6"
http-types = "2" http-types = "2"
tide-rustls = "0.2" tide-rustls = "0.3"
futures = "0.3" futures = "0.3"
tide-acme = "0.1.0"
base64 = "0.13"

View File

@ -18,11 +18,15 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
* Under 500 lines of code * Under 500 lines of code
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client) * Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
* Supports CORS * Supports CORS
* Add users with admin token permission
* Multi-tenant
* Supports SSL - full end-to-end encryption * Supports SSL - full end-to-end encryption
* Provides user authentication with JWTs with stored Argon2 passwords * Provides user authentication with JWTs or HTTP Basic
* Secure passwords with Argon2 encoding
* Uses Global NTP servers and doesn't rely on your local server time * Uses Global NTP servers and doesn't rely on your local server time
* Insert event via JSON POST request * Insert event via JSON POST request
* Sync latest events on SSE client connection * Sync latest events on SSE client connection
* Auto-provision and renews SSL cert via LetsEncrypt
### How it works ### How it works
@ -57,9 +61,10 @@ POST /users
- public endpoint - public endpoint
- POST JSON to create a user - POST JSON to create a user
```json ```json
{"username":{...}, "password":{...}} {"username":{...}, "password":{...}, "admin_token":{...}, "tenant_name":{...}}
``` ```
- where {...} is for username is a string and password is a string - where {...} is for username is a string, password is a string, admin_token is a string, and tenant_name is a string
- admin_token is required and can be set in the command args - it is for not allowing everyone to add a user - the default is `letmein`
will return `200` or `500` or `400` will return `200` or `500` or `400`
@ -86,7 +91,7 @@ will return
```html ```html
GET /sse GET /sse
``` ```
- authenticated endpoint (Authorization: Bearer {jwt}) - authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {username:password})
- connect your sse-client to this endpoint using [broker-client](https://www.npmjs.com/package/broker-client) - connect your sse-client to this endpoint using [broker-client](https://www.npmjs.com/package/broker-client)
- `note`: broker-client uses fetch as eventsource doesn't support headers - `note`: broker-client uses fetch as eventsource doesn't support headers
@ -95,7 +100,7 @@ GET /sse
```html ```html
POST /insert POST /insert
``` ```
- authenticated endpoint (Authorization: Bearer {jwt}) - authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {username:password})
- POST JSON to insert an event - POST JSON to insert an event
```json ```json
{"event":{...}, "data":{...}} {"event":{...}, "data":{...}}
@ -108,16 +113,20 @@ will return: `200` or `500` or `400` or `401`
``` cargo install broker ``` ``` cargo install broker ```
- the origin can be passed in as a flag - default * - the origin can be passed in as a flag - default `*`
- the port can be passed in as a flag - default 8080 - the port can be passed in as a flag - default `8080` - can only be set for unsecure connections
- the jwt_expiry (for jwts) can be passed in as a flag - default 86400 - the jwt_expiry (for jwts) can be passed in as a flag - default `86400`
- the jwt_secret (for jwts) should be passed in as a flag - default secret - the jwt_secret (for jwts) should be passed in as a flag - default `secret`
- the secure can be passed in as a flag (true or false) - default false - the secure flag (https) and can be true or false - default `false`
- the key_path can be passed in as a flag if connection https - default ./broker.rsa - the certs flag is the storage path of LetsEncrypt certs - default `certs`
- the cert_path can be passed in as a flag if connection https - default ./broker.pem - the db flag is the path where the embedded database will be saved - default `tmp`
- the db can be passed in as a flag where the embedded database will be saved - default tmp - the domain flag is the domain name (e.g. api.broker.com) of the domain you want to register with LetsEncrypt - must be fully resolvable
- example: `./broker --db="tmp" --port="443" --secure="true" --origin="*" --jwt_expiry="86400" --jwt_secret="secret" --key_path="broker.rsa" --cert_path="broker.pem"` - the admin_token flag is the password for the admin to add users - default `letmein`
- production example: `./broker --secure="true" --admin_token"23ce4234@123$" --jwt_secret="xTJEX234$##$" --domain="api.broker.com"`
### Service
There is an example `systemctl` service for Ubuntu called `broker.service` in the code
### TechStack ### TechStack
@ -133,11 +142,3 @@ will return: `200` or `500` or `400` or `401`
* [Event Sourcing](https://microservices.io/patterns/data/event-sourcing.html) * [Event Sourcing](https://microservices.io/patterns/data/event-sourcing.html)
* [Best in Place](https://github.com/bernat/best_in_place) * [Best in Place](https://github.com/bernat/best_in_place)
* [Brock Whitten](https://www.youtube.com/watch?v=qljYMEfVukU) * [Brock Whitten](https://www.youtube.com/watch?v=qljYMEfVukU)
### Migrations
- from 5.0 to 6.0: is a full rewrite - there is no upgrade path from 5.0 to 6.0
- from 4.0 to 5.0: multi-tenancy has been added and sled has been upgraded - there is no upgrade path from 4.0 to 5.0
- from 3.0 to 4.0: the sse endpoint now returns all events with all collections with the latest collection event rather than just the latest event data for all event types
- from 2.0 to 3.0: the sse endpoint is now secure and requires the use of the [broker-client](https://www.npmjs.com/package/broker-client) library
- from 1.0 to 2.0: the optional API endpoints URLs have been changed but have the same functionality

13
broker.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=Broker
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=root
ExecStart=/root/broker
[Install]
WantedBy=multi-user.target

View File

@ -10,10 +10,10 @@ use std::sync::Arc;
use tide::Request; use tide::Request;
use http_types::headers::HeaderValue; use http_types::headers::HeaderValue;
use tide::security::{CorsMiddleware, Origin}; use tide::security::{CorsMiddleware, Origin};
use tide_rustls::TlsListener;
use async_std::stream; use async_std::stream;
use std::time::Duration; use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use tide_acme::{AcmeConfig, TideRustlsExt};
lazy_static! { lazy_static! {
static ref DB : Arc<rocksdb::DB> = { static ref DB : Arc<rocksdb::DB> = {
@ -38,8 +38,9 @@ pub struct EnvVarConfig {
pub jwt_secret: String, pub jwt_secret: String,
pub db: String, pub db: String,
pub secure: bool, pub secure: bool,
pub cert_path: String, pub certs: String,
pub key_path: String, pub domain: String,
pub admin_token: String,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -47,12 +48,15 @@ pub struct User {
id: uuid::Uuid, id: uuid::Uuid,
username: String, username: String,
password: String, password: String,
tenant_name: String,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserForm { pub struct UserForm {
username: String, username: String,
password: String, password: String,
tenant_name: String,
admin_token: String,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -76,12 +80,13 @@ pub struct Event {
pub event: String, pub event: String,
pub timestamp: i64, pub timestamp: i64,
pub data: serde_json::Value, pub data: serde_json::Value,
pub tenant_name: String,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm { pub struct EventForm {
event: String, event: String,
data: serde_json::Value, data: serde_json::Value
} }
fn replace(key: String, value: Vec<u8>) -> Result<()> { fn replace(key: String, value: Vec<u8>) -> Result<()> {
@ -132,40 +137,40 @@ fn get_events() -> Result<Vec<Event>> {
} }
fn puts_event(event: Event) -> Result<()> { fn puts_event(event: Event) -> Result<()> {
let key = format!("events_{}", event.event); let key = format!("events_{}", event.id);
let value = rmp_serde::to_vec_named(&event)?; let value = rmp_serde::to_vec_named(&event)?;
replace(key, value)?; replace(key, value)?;
Ok(()) Ok(())
} }
pub fn get_ntp_time() -> i64 {
let pool_ntp = "pool.ntp.org:123";
let response = broker_ntp::request(pool_ntp).unwrap();
let timestamp = response.transmit_timestamp;
broker_ntp::unix_time::Instant::from(timestamp).secs()
}
fn user_create(user_form: UserForm) -> Result<Option<String>> { fn user_create(user_form: UserForm) -> Result<Option<String>> {
if !is_user_unique(user_form.clone().username)? { let configure = env_var_config();
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j)); if configure.admin_token == user_form.clone().admin_token {
if !is_user_unique(user_form.clone().username)? {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, tenant_name: user_form.clone().tenant_name };
puts_user(new_user).unwrap();
return Ok(None);
}
} else { } else {
// set as future value let j = json!({"error": "admin_token is incorrect"}).to_string();
let uuid = Uuid::new_v4(); return Ok(Some(j));
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed };
puts_user(new_user).unwrap();
return Ok(None);
} }
} }
fn create_jwt(login: LoginForm) -> Result<Option<String>> { async fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let user_value = get_user_by_username(login.username)?; let user_value = get_user_by_username(login.username)?;
match user_value { match user_value {
@ -173,7 +178,7 @@ fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?; let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified { if verified {
let app = env_var_config(); let app = env_var_config();
let iat = get_ntp_time(); let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry; let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string(); let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss}; let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
@ -194,24 +199,26 @@ fn env_var_config() -> EnvVarConfig {
let mut secure = false; let mut secure = false;
let mut origin = "*".to_string(); let mut origin = "*".to_string();
let mut jwt_secret = "secret".to_string(); let mut jwt_secret = "secret".to_string();
let mut key_path = "./broker.rsa".to_string();
let mut cert_path = "./broker.pem".to_string();
let mut db: String = "tmp".to_string(); let mut db: String = "tmp".to_string();
let mut certs = "certs".to_string();
let mut domain = "localhost".to_string();
let mut admin_token = "letmein".to_string();
let _ : Vec<String> = go_flag::parse(|flags| { let _ : Vec<String> = go_flag::parse(|flags| {
flags.add_flag("port", &mut port); flags.add_flag("port", &mut port);
flags.add_flag("origin", &mut origin); flags.add_flag("origin", &mut origin);
flags.add_flag("jwt_expiry", &mut jwt_expiry); flags.add_flag("jwt_expiry", &mut jwt_expiry);
flags.add_flag("jwt_secret", &mut jwt_secret); flags.add_flag("jwt_secret", &mut jwt_secret);
flags.add_flag("secure", &mut secure); flags.add_flag("secure", &mut secure);
flags.add_flag("key_path", &mut key_path);
flags.add_flag("cert_path", &mut cert_path);
flags.add_flag("db", &mut db); flags.add_flag("db", &mut db);
flags.add_flag("domain", &mut domain);
flags.add_flag("certs", &mut certs);
flags.add_flag("admin_token", &mut admin_token);
}); });
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, key_path, cert_path, db} EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, domain, certs, db, admin_token}
} }
fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> { async fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
let configure = env_var_config(); let configure = env_var_config();
@ -220,16 +227,45 @@ fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
if auth_type == "Bearer" { if auth_type == "Bearer" {
let token = parts.next().unwrap(); let token = parts.next().unwrap();
let _ = match decode::<Claims>(&token, &DecodingKey::from_secret(configure.jwt_secret.as_ref()), &Validation::default()) { let _ = match decode::<Claims>(&token, &DecodingKey::from_secret(configure.jwt_secret.as_ref()), &Validation::default()) {
Ok(c) => { Ok(c) => { return Ok(Some(c)); },
return Ok(Some(c)); Err(_) => { return Ok(None); }
}, };
Err(_) => { } else if auth_type == "Basic" {
return Ok(None); let basic_encoded = parts.next().unwrap();
}
let _ = match base64::decode(basic_encoded) {
Ok(c) => {
let _ = match std::str::from_utf8(&c) {
Ok(basic) => {
let mut basic_parts = basic.split(":");
let user_name = basic_parts.next().unwrap();
let password = basic_parts.next().unwrap();
let user_value = get_user_by_username(user_name.to_string())?;
match user_value {
Some(user) => {
if argon2::verify_encoded(&user.password, password.as_ref())? {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let my_token = TokenData{
header: Header::default(),
claims: my_claims,
};
return Ok(Some(my_token));
}
},
None => { return Ok(None); }
}
},
Err(_) => { return Ok(None); }
};
},
Err(_) => { return Ok(None); }
}; };
} else {
Ok(None)
} }
Ok(None)
} }
// insert an event // insert an event
@ -239,15 +275,16 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
match user_value { match user_value {
Some(user) => { Some(user) => {
let timestamp = get_ntp_time(); let timestamp = nippy::get_unix_ntp_time().await?;
let id = uuid::Uuid::new_v4(); let id = uuid::Uuid::new_v4();
let event = Event{ let event = Event{
id, id,
data: event_form.data, data: event_form.data,
event: event_form.event, event: event_form.event,
user_id: user.id, user_id: user.id,
timestamp, timestamp,
tenant_name: user.tenant_name,
}; };
puts_event(event.clone())?; puts_event(event.clone())?;
@ -260,8 +297,8 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
} }
async fn create_user(mut req: Request<()>) -> tide::Result { async fn create_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await.unwrap(); let r = req.body_string().await?;
let user_form : UserForm = serde_json::from_str(&r).unwrap(); let user_form : UserForm = serde_json::from_str(&r)?;
match user_create(user_form)? { match user_create(user_form)? {
Some(err) => { Some(err) => {
let err = format!("error: {}", err); let err = format!("error: {}", err);
@ -276,7 +313,7 @@ async fn create_user(mut req: Request<()>) -> tide::Result {
async fn login_user(mut req: Request<()>) -> tide::Result { async fn login_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?; let r = req.body_string().await?;
let login_form : LoginForm = serde_json::from_str(&r)?; let login_form : LoginForm = serde_json::from_str(&r)?;
match create_jwt(login_form)? { match create_jwt(login_form).await? {
Some(jwt) => { Some(jwt) => {
let msg = format!("jwt: {}", jwt); let msg = format!("jwt: {}", jwt);
Ok(tide::Response::builder(200).body(msg).header("content-type", "application/json").build()) Ok(tide::Response::builder(200).body(msg).header("content-type", "application/json").build())
@ -292,12 +329,12 @@ async fn insert_event(mut req: Request<()>) -> tide::Result {
match token_value { match token_value {
Some(token_header) => { Some(token_header) => {
let token = token_header.last().to_string(); let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap(); let jwt_value = jwt_verify(token).await?;
match jwt_value { match jwt_value {
Some(jwt) => { Some(jwt) => {
let r = req.body_string().await.unwrap(); let r = req.body_string().await?;
let event_form : EventForm = serde_json::from_str(&r).unwrap(); let event_form : EventForm = serde_json::from_str(&r)?;
insert(jwt.claims.sub, event_form).await.unwrap(); insert(jwt.claims.sub, event_form).await?;
Ok(tide::Response::builder(200).header("content-type", "application/json").build()) Ok(tide::Response::builder(200).header("content-type", "application/json").build())
}, },
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) } None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
@ -331,36 +368,44 @@ async fn main() -> tide::Result<()> {
match token_value { match token_value {
Some(token_header) => { Some(token_header) => {
let token = token_header.last().to_string(); let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap(); let jwt_value = jwt_verify(token).await?;
match jwt_value { match jwt_value {
Some(_) => { Some(jwt) => {
let user_value = get_user_by_username(jwt.claims.sub)?;
let mut cache: HashMap<String, Event> = HashMap::new(); match user_value {
Some(user) => {
let mut cache: HashMap<String, Event> = HashMap::new();
let mut interval = stream::interval(Duration::from_millis(100)); let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await { while let Some(_) = interval.next().await {
let events = get_events()?; let events = get_events()?;
for evt in events { for evt in events {
if !cache.contains_key(&evt.event) { if evt.tenant_name == user.tenant_name {
let id = uuid::Uuid::new_v4(); if !cache.contains_key(&evt.event) {
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let id = uuid::Uuid::new_v4(); let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?; sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone()); cache.insert(evt.event.clone(), evt.clone());
} else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
},
None => { println!("helo"); return Ok(()); }
}
} }
}, }
None => {}
} }
} }
} },
None => { return Ok(()); }
} }
Ok(()) Ok(())
}, },
@ -375,12 +420,14 @@ async fn main() -> tide::Result<()> {
if configure.secure { if configure.secure {
app.listen( app.listen(
TlsListener::build() tide_rustls::TlsListener::build().addrs("0.0.0.0:443").acme(
.addrs(ip) AcmeConfig::new()
.cert(configure.cert_path) .domains(vec![configure.domain])
.key(configure.key_path) .cache_dir(configure.certs)
) .production(),
.await?; ),
)
.await?;
} else { } else {
app.listen(ip).await?; app.listen(ip).await?;
} }