Compare commits

...

11 Commits

Author SHA1 Message Date
Bevan Hunt 287e1f02c6 bump ver 2021-03-25 17:51:58 -07:00
Bevan Hunt 3570138bb8 fix key for event 2021-03-25 17:48:49 -07:00
Bevan Hunt c18bce2263 update readme 2021-03-25 17:06:28 -07:00
Bevan Hunt 0c43a51f29 remove tenant_name from event 2021-03-25 17:01:23 -07:00
Bevan Hunt 31bbed2019 add http basic auth 2021-03-25 13:19:27 -07:00
Bevan Hunt 0ab4c72281 update to 8.0.0 2021-03-23 15:11:41 -07:00
Bevan Hunt f0e3c4bb38 add tide-acme 2021-03-22 15:15:22 -07:00
Bevan Hunt 7cb0941b6c add service and update readme 2021-03-21 22:12:39 -07:00
Bevan Hunt d4d42a1b37 update nippy ver 2021-03-21 22:07:26 -07:00
Bevan Hunt d3fd496363 replace broker-ntp with nippy 2021-03-21 21:25:58 -07:00
Bevan Hunt 8283767ef0 add changelog 2021-03-21 19:20:09 -07:00
6 changed files with 475 additions and 162 deletions

51
CHANGELOG.md Normal file
View File

@ -0,0 +1,51 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [9.0.2] - 2021-04-25
### Fixed
- keys on event
## [9.0.0] - 2021-04-25
### Removed
- removed tenant_name from event - uses users event name
## [8.1.x] - 2021-04-25
### Added
- Adds http basic auth
## [8.0.x] - 2021-03-23
### Added
- Adds admin_token to create user and command args
- Adds multi-tenancy
## [7.0.x] - 2021-03-22
### Added
- Adds tide-acme
## Changed
- Changes command args
## [6.1.x] - 2021-03-21
### Changed
- Replaced broker-ntp with nippy
## [6.0.x] - 2021-03-20
### Changed
- Replaced Warp with Tide
- Replaced Sled with RocksDB
### Removed
- Rmmoved GET JSON API endpoints
- Removed multi-tenancy
- Removed timestamps, event dispatcher, and cancellation
- Removed broker-grid support (current version)

317
Cargo.lock generated
View File

@ -167,7 +167,7 @@ dependencies = [
"http-types",
"httparse",
"lazy_static",
"log 0.4.14",
"log",
"pin-project",
]
@ -181,7 +181,7 @@ dependencies = [
"fastrand",
"futures-lite",
"libc",
"log 0.4.14",
"log",
"nb-connect",
"once_cell",
"parking",
@ -225,6 +225,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "async-rustls"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c86f33abd5a4f3e2d6d9251a9e0c6a7e52eb1113caf893dae8429bf4a53f378"
dependencies = [
"futures-lite",
"rustls",
"webpki",
]
[[package]]
name = "async-session"
version = "2.0.1"
@ -255,7 +266,7 @@ dependencies = [
"async-channel",
"async-std",
"http-types",
"log 0.4.14",
"log",
"memchr",
"pin-project-lite 0.1.12",
]
@ -279,7 +290,7 @@ dependencies = [
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log 0.4.14",
"log",
"memchr",
"num_cpus",
"once_cell",
@ -295,19 +306,6 @@ version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-tls"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400"
dependencies = [
"futures-core",
"futures-io",
"rustls",
"webpki",
"webpki-roots",
]
[[package]]
name = "async-trait"
version = "0.1.48"
@ -362,9 +360,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bincode"
version = "1.3.2"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d175dfa69e619905c4c3cdb7c3c203fa3bdd5d51184e3afdb2742c0280493772"
checksum = "f30d3a39baa26f9651f17b375061f3233dde33424a8b72b0dbe93a68a0bc896d"
dependencies = [
"byteorder",
"serde",
@ -384,7 +382,7 @@ dependencies = [
"env_logger",
"lazy_static",
"lazycell",
"log 0.4.14",
"log",
"peeking_take_while",
"proc-macro2",
"quote",
@ -400,6 +398,18 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]]
name = "blake2b_simd"
version = "0.5.11"
@ -451,11 +461,11 @@ dependencies = [
[[package]]
name = "broker"
version = "6.0.8"
version = "9.0.2"
dependencies = [
"anyhow",
"async-std",
"broker-ntp",
"base64 0.13.0",
"driftwood",
"futures",
"go-flag",
@ -463,6 +473,7 @@ dependencies = [
"json",
"jsonwebtoken",
"lazy_static",
"nippy",
"rmp-serde",
"rocksdb",
"rust-argon2",
@ -470,22 +481,11 @@ dependencies = [
"serde_derive",
"serde_json",
"tide",
"tide-acme",
"tide-rustls",
"uuid",
]
[[package]]
name = "broker-ntp"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "037d0f8fa2a4e20313da3d34d3d40606e6366fdc1458f664fc03edc89aa34292"
dependencies = [
"byteorder",
"conv",
"custom_derive",
"log 0.3.9",
]
[[package]]
name = "bumpalo"
version = "3.6.1"
@ -504,9 +504,9 @@ dependencies = [
[[package]]
name = "byteorder"
version = "1.3.4"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cache-padded"
@ -529,7 +529,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27"
dependencies = [
"nom",
"nom 5.1.2",
]
[[package]]
@ -757,6 +757,32 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "der-oid-macro"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd17d13ecf875e704369fdbde242483ac769fc18f6af21e43d5a692a079732fc"
dependencies = [
"nom 6.1.2",
"num-bigint 0.3.2",
"num-traits",
"proc-macro-hack",
]
[[package]]
name = "der-parser"
version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13e6cad1223a7b98b59275a56516ed8c40508d21284a32e404ed3fe2ae9a809a"
dependencies = [
"der-oid-macro",
"nom 6.1.2",
"num-bigint 0.3.2",
"num-traits",
"proc-macro-hack",
"rusticata-macros",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -792,7 +818,7 @@ checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime",
"log 0.4.14",
"log",
"regex",
"termcolor",
]
@ -820,7 +846,7 @@ checksum = "2af1a24f391a5a94d756db5092c6576aad494b88a71a5a36b20c67b63e0df034"
dependencies = [
"cfg-if 0.1.10",
"js-sys",
"log 0.4.14",
"log",
"serde",
"serde_derive",
"serde_json",
@ -838,6 +864,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "futures"
version = "0.3.13"
@ -1064,7 +1096,7 @@ dependencies = [
"cfg-if 1.0.0",
"dashmap",
"http-types",
"log 0.4.14",
"log",
]
[[package]]
@ -1180,7 +1212,7 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log 0.4.14",
"log",
]
[[package]]
@ -1195,6 +1227,19 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lexical-core"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21f866863575d0e1d654fbeeabdc927292fdf862873dc3c96c6f753357e13374"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if 1.0.0",
"ryu",
"static_assertions",
]
[[package]]
name = "libc"
version = "0.2.90"
@ -1223,15 +1268,6 @@ dependencies = [
"libc",
]
[[package]]
name = "log"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
dependencies = [
"log 0.4.14",
]
[[package]]
name = "log"
version = "0.4.14"
@ -1270,6 +1306,20 @@ dependencies = [
"socket2",
]
[[package]]
name = "nippy"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "966078feae73ebace00794f728f01c3851ca8c80a582b4f4b4309a35b4acd18c"
dependencies = [
"anyhow",
"async-std",
"byteorder",
"conv",
"custom_derive",
"log",
]
[[package]]
name = "nom"
version = "5.1.2"
@ -1280,6 +1330,19 @@ dependencies = [
"version_check",
]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
@ -1291,6 +1354,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d0a3d5e207573f948a9e5376662aa743a2ea13f7c50a554d7af443a73fbfeba"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -1320,6 +1394,15 @@ dependencies = [
"libc",
]
[[package]]
name = "oid-registry"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2508c8f170e55be68508b1113956a760a82684f42022f8834fb16ca198621211"
dependencies = [
"der-parser",
]
[[package]]
name = "once_cell"
version = "1.7.2"
@ -1407,7 +1490,7 @@ checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4"
dependencies = [
"cfg-if 0.1.10",
"libc",
"log 0.4.14",
"log",
"wepoll-sys",
"winapi",
]
@ -1465,6 +1548,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]]
name = "rand"
version = "0.7.3"
@ -1546,6 +1635,18 @@ dependencies = [
"rand_core 0.6.2",
]
[[package]]
name = "rcgen"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cb7a2dc0e5307189b6933a61290ff06b65b35bdcaae2b2c50a0c3e355cb118e"
dependencies = [
"chrono",
"pem",
"ring",
"yasna",
]
[[package]]
name = "regex"
version = "1.4.5"
@ -1642,6 +1743,15 @@ dependencies = [
"semver",
]
[[package]]
name = "rusticata-macros"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7390af60e66c44130b4c5ea85f2555b7ace835d73b4b889c704dc3cb4c0468c8"
dependencies = [
"nom 6.1.2",
]
[[package]]
name = "rustls"
version = "0.19.0"
@ -1649,12 +1759,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b"
dependencies = [
"base64 0.13.0",
"log 0.4.14",
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-acme"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa2fc1ea3fc15c70f7cb9b72d21b6a6057ebff6a251a86aa678c1a6ee8ef0c85"
dependencies = [
"async-h1",
"async-rustls",
"async-std",
"base64 0.13.0",
"chrono",
"futures",
"http-types",
"log",
"pem",
"rcgen",
"ring",
"serde",
"serde_json",
"thiserror",
"webpki-roots",
"x509-parser",
]
[[package]]
name = "rustversion"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
[[package]]
name = "ryu"
version = "1.0.5"
@ -1801,7 +1941,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b"
dependencies = [
"chrono",
"num-bigint",
"num-bigint 0.2.6",
"num-traits",
]
@ -1852,6 +1992,12 @@ dependencies = [
"version_check",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stdweb"
version = "0.4.20"
@ -1930,6 +2076,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "termcolor"
version = "1.1.2"
@ -1984,7 +2136,7 @@ dependencies = [
"http-client",
"http-types",
"kv-log-macro",
"log 0.4.14",
"log",
"pin-project-lite 0.2.6",
"route-recognizer",
"serde",
@ -1992,15 +2144,27 @@ dependencies = [
]
[[package]]
name = "tide-rustls"
version = "0.2.0"
name = "tide-acme"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf8521961215f0ebbb27e10c9d610ff2a64f853dc3707c03748350c6dbe022"
checksum = "7fccc497c6090bbcb2790c37b9641744e66946358a16e03b934f04fda50d7215"
dependencies = [
"async-std",
"async-trait",
"rustls-acme",
"tide-rustls",
]
[[package]]
name = "tide-rustls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a85b568b611840ba794ae749d4fa8b345b9f71a9c02b82cf0c28ff076fde6b7"
dependencies = [
"async-dup",
"async-h1",
"async-rustls",
"async-std",
"async-tls",
"rustls",
"tide",
]
@ -2210,7 +2374,7 @@ checksum = "046ceba58ff062da072c7cb4ba5b22a37f00a302483f7e2a6cdc18fedbdc1fd3"
dependencies = [
"bumpalo",
"lazy_static",
"log 0.4.14",
"log",
"proc-macro2",
"quote",
"syn",
@ -2335,3 +2499,38 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "x509-parser"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7999ae290e75ec1d4dc8e9ff9870e48e3542a8f2e9c1e2e07d7ca02b459e10"
dependencies = [
"base64 0.13.0",
"chrono",
"data-encoding",
"der-oid-macro",
"der-parser",
"lazy_static",
"nom 6.1.2",
"num-bigint 0.3.2",
"oid-registry",
"rusticata-macros",
"rustversion",
"thiserror",
]
[[package]]
name = "yasna"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de7bff972b4f2a06c85f6d8454b09df153af7e3a4ec2aac81db1b105b684ddb"
dependencies = [
"chrono",
]

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "6.0.8"
version = "9.0.2"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -22,11 +22,13 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
jsonwebtoken = "7.0.1"
go-flag = "0.1"
lazy_static = "1.4"
broker-ntp = "0.0.1"
nippy = "2"
rust-argon2 = "0.8"
anyhow = "1"
rmp-serde = "0.15"
driftwood = "0.0.6"
http-types = "2"
tide-rustls = "0.2"
tide-rustls = "0.3"
futures = "0.3"
tide-acme = "0.1.0"
base64 = "0.13"

View File

@ -18,11 +18,15 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
* Under 500 lines of code
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
* Supports CORS
* Add users with admin token permission
* Multi-tenant
* Supports SSL - full end-to-end encryption
* Provides user authentication with JWTs with stored Argon2 passwords
* Provides user authentication with JWTs or HTTP Basic
* Secure passwords with Argon2 encoding
* Uses Global NTP servers and doesn't rely on your local server time
* Insert event via JSON POST request
* Sync latest events on SSE client connection
* Auto-provision and renews SSL cert via LetsEncrypt
### How it works
@ -57,9 +61,10 @@ POST /users
- public endpoint
- POST JSON to create a user
```json
{"username":{...}, "password":{...}}
{"username":{...}, "password":{...}, "admin_token":{...}, "tenant_name":{...}}
```
- where {...} is for username is a string and password is a string
- where {...} is for username is a string, password is a string, admin_token is a string, and tenant_name is a string
- admin_token is required and can be set in the command args - it is for not allowing everyone to add a user - the default is `letmein`
will return `200` or `500` or `400`
@ -86,7 +91,7 @@ will return
```html
GET /sse
```
- authenticated endpoint (Authorization: Bearer {jwt})
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {username:password})
- connect your sse-client to this endpoint using [broker-client](https://www.npmjs.com/package/broker-client)
- `note`: broker-client uses fetch as eventsource doesn't support headers
@ -95,7 +100,7 @@ GET /sse
```html
POST /insert
```
- authenticated endpoint (Authorization: Bearer {jwt})
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {username:password})
- POST JSON to insert an event
```json
{"event":{...}, "data":{...}}
@ -108,16 +113,20 @@ will return: `200` or `500` or `400` or `401`
``` cargo install broker ```
- the origin can be passed in as a flag - default *
- the port can be passed in as a flag - default 8080
- the jwt_expiry (for jwts) can be passed in as a flag - default 86400
- the jwt_secret (for jwts) should be passed in as a flag - default secret
- the secure can be passed in as a flag (true or false) - default false
- the key_path can be passed in as a flag if connection https - default ./broker.rsa
- the cert_path can be passed in as a flag if connection https - default ./broker.pem
- the db can be passed in as a flag where the embedded database will be saved - default tmp
- example: `./broker --db="tmp" --port="443" --secure="true" --origin="*" --jwt_expiry="86400" --jwt_secret="secret" --key_path="broker.rsa" --cert_path="broker.pem"`
- the origin can be passed in as a flag - default `*`
- the port can be passed in as a flag - default `8080` - can only be set for unsecure connections
- the jwt_expiry (for jwts) can be passed in as a flag - default `86400`
- the jwt_secret (for jwts) should be passed in as a flag - default `secret`
- the secure flag (https) and can be true or false - default `false`
- the certs flag is the storage path of LetsEncrypt certs - default `certs`
- the db flag is the path where the embedded database will be saved - default `tmp`
- the domain flag is the domain name (e.g. api.broker.com) of the domain you want to register with LetsEncrypt - must be fully resolvable
- the admin_token flag is the password for the admin to add users - default `letmein`
- production example: `./broker --secure="true" --admin_token"23ce4234@123$" --jwt_secret="xTJEX234$##$" --domain="api.broker.com"`
### Service
There is an example `systemctl` service for Ubuntu called `broker.service` in the code
### TechStack
@ -133,11 +142,3 @@ will return: `200` or `500` or `400` or `401`
* [Event Sourcing](https://microservices.io/patterns/data/event-sourcing.html)
* [Best in Place](https://github.com/bernat/best_in_place)
* [Brock Whitten](https://www.youtube.com/watch?v=qljYMEfVukU)
### Migrations
- from 5.0 to 6.0: is a full rewrite - there is no upgrade path from 5.0 to 6.0
- from 4.0 to 5.0: multi-tenancy has been added and sled has been upgraded - there is no upgrade path from 4.0 to 5.0
- from 3.0 to 4.0: the sse endpoint now returns all events with all collections with the latest collection event rather than just the latest event data for all event types
- from 2.0 to 3.0: the sse endpoint is now secure and requires the use of the [broker-client](https://www.npmjs.com/package/broker-client) library
- from 1.0 to 2.0: the optional API endpoints URLs have been changed but have the same functionality

13
broker.service Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=Broker
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=root
ExecStart=/root/broker
[Install]
WantedBy=multi-user.target

View File

@ -10,10 +10,10 @@ use std::sync::Arc;
use tide::Request;
use http_types::headers::HeaderValue;
use tide::security::{CorsMiddleware, Origin};
use tide_rustls::TlsListener;
use async_std::stream;
use std::time::Duration;
use futures::StreamExt;
use tide_acme::{AcmeConfig, TideRustlsExt};
lazy_static! {
static ref DB : Arc<rocksdb::DB> = {
@ -38,8 +38,9 @@ pub struct EnvVarConfig {
pub jwt_secret: String,
pub db: String,
pub secure: bool,
pub cert_path: String,
pub key_path: String,
pub certs: String,
pub domain: String,
pub admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -47,12 +48,15 @@ pub struct User {
id: uuid::Uuid,
username: String,
password: String,
tenant_name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserForm {
username: String,
password: String,
tenant_name: String,
admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -76,12 +80,13 @@ pub struct Event {
pub event: String,
pub timestamp: i64,
pub data: serde_json::Value,
pub tenant_name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm {
event: String,
data: serde_json::Value,
data: serde_json::Value
}
fn replace(key: String, value: Vec<u8>) -> Result<()> {
@ -132,40 +137,40 @@ fn get_events() -> Result<Vec<Event>> {
}
fn puts_event(event: Event) -> Result<()> {
let key = format!("events_{}", event.event);
let key = format!("events_{}", event.id);
let value = rmp_serde::to_vec_named(&event)?;
replace(key, value)?;
Ok(())
}
pub fn get_ntp_time() -> i64 {
let pool_ntp = "pool.ntp.org:123";
let response = broker_ntp::request(pool_ntp).unwrap();
let timestamp = response.transmit_timestamp;
broker_ntp::unix_time::Instant::from(timestamp).secs()
}
fn user_create(user_form: UserForm) -> Result<Option<String>> {
if !is_user_unique(user_form.clone().username)? {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
let configure = env_var_config();
if configure.admin_token == user_form.clone().admin_token {
if !is_user_unique(user_form.clone().username)? {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, tenant_name: user_form.clone().tenant_name };
puts_user(new_user).unwrap();
return Ok(None);
}
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed };
puts_user(new_user).unwrap();
return Ok(None);
let j = json!({"error": "admin_token is incorrect"}).to_string();
return Ok(Some(j));
}
}
fn create_jwt(login: LoginForm) -> Result<Option<String>> {
async fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let user_value = get_user_by_username(login.username)?;
match user_value {
@ -173,7 +178,7 @@ fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified {
let app = env_var_config();
let iat = get_ntp_time();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
@ -194,24 +199,26 @@ fn env_var_config() -> EnvVarConfig {
let mut secure = false;
let mut origin = "*".to_string();
let mut jwt_secret = "secret".to_string();
let mut key_path = "./broker.rsa".to_string();
let mut cert_path = "./broker.pem".to_string();
let mut db: String = "tmp".to_string();
let mut certs = "certs".to_string();
let mut domain = "localhost".to_string();
let mut admin_token = "letmein".to_string();
let _ : Vec<String> = go_flag::parse(|flags| {
flags.add_flag("port", &mut port);
flags.add_flag("origin", &mut origin);
flags.add_flag("jwt_expiry", &mut jwt_expiry);
flags.add_flag("jwt_secret", &mut jwt_secret);
flags.add_flag("secure", &mut secure);
flags.add_flag("key_path", &mut key_path);
flags.add_flag("cert_path", &mut cert_path);
flags.add_flag("db", &mut db);
flags.add_flag("domain", &mut domain);
flags.add_flag("certs", &mut certs);
flags.add_flag("admin_token", &mut admin_token);
});
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, key_path, cert_path, db}
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, domain, certs, db, admin_token}
}
fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
async fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
let configure = env_var_config();
@ -220,16 +227,45 @@ fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
if auth_type == "Bearer" {
let token = parts.next().unwrap();
let _ = match decode::<Claims>(&token, &DecodingKey::from_secret(configure.jwt_secret.as_ref()), &Validation::default()) {
Ok(c) => {
return Ok(Some(c));
},
Err(_) => {
return Ok(None);
}
Ok(c) => { return Ok(Some(c)); },
Err(_) => { return Ok(None); }
};
} else if auth_type == "Basic" {
let basic_encoded = parts.next().unwrap();
let _ = match base64::decode(basic_encoded) {
Ok(c) => {
let _ = match std::str::from_utf8(&c) {
Ok(basic) => {
let mut basic_parts = basic.split(":");
let user_name = basic_parts.next().unwrap();
let password = basic_parts.next().unwrap();
let user_value = get_user_by_username(user_name.to_string())?;
match user_value {
Some(user) => {
if argon2::verify_encoded(&user.password, password.as_ref())? {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let my_token = TokenData{
header: Header::default(),
claims: my_claims,
};
return Ok(Some(my_token));
}
},
None => { return Ok(None); }
}
},
Err(_) => { return Ok(None); }
};
},
Err(_) => { return Ok(None); }
};
} else {
Ok(None)
}
Ok(None)
}
// insert an event
@ -239,15 +275,16 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
match user_value {
Some(user) => {
let timestamp = get_ntp_time();
let timestamp = nippy::get_unix_ntp_time().await?;
let id = uuid::Uuid::new_v4();
let event = Event{
id,
data: event_form.data,
event: event_form.event,
user_id: user.id,
timestamp,
tenant_name: user.tenant_name,
};
puts_event(event.clone())?;
@ -260,8 +297,8 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
}
async fn create_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await.unwrap();
let user_form : UserForm = serde_json::from_str(&r).unwrap();
let r = req.body_string().await?;
let user_form : UserForm = serde_json::from_str(&r)?;
match user_create(user_form)? {
Some(err) => {
let err = format!("error: {}", err);
@ -276,7 +313,7 @@ async fn create_user(mut req: Request<()>) -> tide::Result {
async fn login_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let login_form : LoginForm = serde_json::from_str(&r)?;
match create_jwt(login_form)? {
match create_jwt(login_form).await? {
Some(jwt) => {
let msg = format!("jwt: {}", jwt);
Ok(tide::Response::builder(200).body(msg).header("content-type", "application/json").build())
@ -292,12 +329,12 @@ async fn insert_event(mut req: Request<()>) -> tide::Result {
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap();
let jwt_value = jwt_verify(token).await?;
match jwt_value {
Some(jwt) => {
let r = req.body_string().await.unwrap();
let event_form : EventForm = serde_json::from_str(&r).unwrap();
insert(jwt.claims.sub, event_form).await.unwrap();
let r = req.body_string().await?;
let event_form : EventForm = serde_json::from_str(&r)?;
insert(jwt.claims.sub, event_form).await?;
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
@ -331,36 +368,44 @@ async fn main() -> tide::Result<()> {
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap();
let jwt_value = jwt_verify(token).await?;
match jwt_value {
Some(_) => {
Some(jwt) => {
let user_value = get_user_by_username(jwt.claims.sub)?;
let mut cache: HashMap<String, Event> = HashMap::new();
match user_value {
Some(user) => {
let mut cache: HashMap<String, Event> = HashMap::new();
let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await {
let events = get_events()?;
let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await {
let events = get_events()?;
for evt in events {
if !cache.contains_key(&evt.event) {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
for evt in events {
if evt.tenant_name == user.tenant_name {
if !cache.contains_key(&evt.event) {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
cache.insert(evt.event.clone(), evt.clone());
} else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
},
None => { println!("helo"); return Ok(()); }
}
}
},
None => {}
}
}
}
}
},
None => { return Ok(()); }
}
Ok(())
},
@ -375,12 +420,14 @@ async fn main() -> tide::Result<()> {
if configure.secure {
app.listen(
TlsListener::build()
.addrs(ip)
.cert(configure.cert_path)
.key(configure.key_path)
)
.await?;
tide_rustls::TlsListener::build().addrs("0.0.0.0:443").acme(
AcmeConfig::new()
.domains(vec![configure.domain])
.cache_dir(configure.certs)
.production(),
),
)
.await?;
} else {
app.listen(ip).await?;
}