get v5 working (#15)

This commit is contained in:
Bevan Hunt 2020-05-12 01:39:45 -07:00 committed by GitHub
parent 8ebe1bdc57
commit d0485b4c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 37664 additions and 872 deletions

8
Cargo.lock generated
View File

@ -121,7 +121,7 @@ dependencies = [
[[package]]
name = "broker"
version = "4.4.0"
version = "5.0.0"
dependencies = [
"Inflector 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -141,7 +141,7 @@ dependencies = [
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.30.3 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"warp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1520,7 +1520,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "sled"
version = "0.30.3"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2163,7 +2163,7 @@ dependencies = [
"checksum simple_asn1 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2b25ecba7165254f0c97d6c22a64b1122a03634b18d20a34daf21e18f892e618"
"checksum siphasher 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum sled 0.30.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb8c32cb0e34e67ad74fae1a77f4635d0cc7ffc873088a0136f3c4849336d71"
"checksum sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb6824dde66ad33bf20c6e8476f5b82b871bc8bc3c129a10ea2f7dae5060fa3"
"checksum smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc"
"checksum socket2 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
"checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "4.4.0"
version = "5.0.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -18,7 +18,7 @@ serde = "1"
serde_json = "1"
serde_derive = "1"
json = "0.12"
sled = "0.30"
sled = "0.31"
pretty_env_logger = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
bcrypt = "0.6"

View File

@ -1,5 +1,5 @@
run:
SAVE_PATH=./tmp/broker_data cargo run
SAVE_PATH=./tmp/broker_data ORIGIN=* cargo run
release:
SAVE_PATH=./tmp/broker_data cargo run --release
build:

View File

@ -17,6 +17,7 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
* Very performant with a low memory footprint that uses about 20MB and 2 CPU threads
* Under 1,000 lines of code
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
* Multi-tenanted
* Supports CORS
* Supports SSL - full end-to-end encryption
* Provides user authentication with JWTs or HTTP Basic with stored Bcrypt(ed) passwords
@ -63,9 +64,9 @@ POST /users
- public endpoint
- POST JSON to create a user
```json
{"username":{...}, "password":{...}, "collection_id":{...}}
{"username":{...}, "password":{...}, "collection_id":{...}, "tenant_id":{...}}
```
- where {...} is for username and string, password a string, and collection_id is the uuid of the event collection for user info
- where {...} is for username and string, password a string, collection_id is the uuid of the event collection for user info, tenant_id is the uuid of the tenant
will return
```json
@ -94,8 +95,9 @@ will return
#### Step 3 - connect to SSE
```html
GET /events
GET /events/{id}
```
- where {id} is the tenant_id
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {base64})
- connect your sse-client to this endpoint using [broker-client](https://www.npmjs.com/package/broker-client)
- note: broker-client uses fetch as eventsource doesn't support headers
@ -108,9 +110,9 @@ POST /insert
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {base64})
- POST JSON to insert an event
```json
{"event":{...}, "collection_id":{...}, "timestamp":{...}, "data":{...}}
{"event":{...}, "tenant_id":{...}, "collection_id":{...}, "timestamp":{...}, "data":{...}}
```
- where {...} is for the event a string, collection_id is an assigned uuid v4 for the event collection, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
- where {...} is for the event a string, tenant_id is an assigned uuid v4 for the tenant, collection_id is an assigned uuid v4 for the event collection, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
will return
```json
@ -124,7 +126,7 @@ will return
GET /collections/{collection_id}
```
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {base64})
- do a GET request where {collection_id} is the uuid of the collection you want (sorted by ascending timestamp)
- do a GET request where {collection_id} is the uuid of the collection you want (sorted by ascending timestamp) for the user's tenant
will return
```json
@ -148,7 +150,7 @@ will return
GET /cancel/{id}
```
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {base64})
- do a GET request where id is the uuid of the event to cancel a future event
- do a GET request where id is the uuid of the event to cancel a future event for the user's tenant
will return
```json
@ -169,7 +171,7 @@ pub async fn main() {
OR
``` cargo install broker ```
- the origin needs to be passed in as a flag - wildcard is not supported - default http://localhost:3000
- the origin needs to be passed in as a flag - wildcard is supported - default http://localhost:3000
- the port needs to be passed in as a flag - default 8080
- the expiry (for jwts) needs to be passed in as a flag - default 3600
- the secret (for jwts) needs to be passed in as a flag - default secret
@ -196,6 +198,7 @@ OR
### Migrations
- from 4.0 to 5.0: multi-tenancy has been added and sled has been upgraded - there is no upgrade path from 4.0 to 5.0
- from 3.0 to 4.0: the sse endpoint now returns all events with all collections with the latest collection event rather than just the latest event data for all event types
- from 2.0 to 3.0: the sse endpoint is now secure and requires the use of the [broker-client](https://www.npmjs.com/package/broker-client) library
- from 1.0 to 2.0: the optional API endpoints URLs have been changed but have the same functionality

View File

@ -1,3 +1,4 @@
REACT_APP_API=http://localhost:8080
REACT_APP_PASSWORD=password
REACT_APP_USERNAME=user
REACT_APP_TENANT=112718d1-a0be-4468-b902-0749c3d964ae

View File

@ -1,3 +1,4 @@
REACT_APP_API=https://demo-api.apibill.me
REACT_APP_PASSWORD=password
REACT_APP_USERNAME=user
REACT_APP_TENANT=112718d1-a0be-4468-b902-0749c3d964ae

1163
example/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -4,17 +4,17 @@
"private": true,
"dependencies": {
"@testing-library/jest-dom": "^4.2.4",
"@testing-library/react": "^9.4.0",
"@testing-library/react": "^9.5.0",
"@testing-library/user-event": "^7.2.1",
"autoprefixer": "^9.7.4",
"broker-grid": "0.1.0",
"postcss-cli": "^7.1.0",
"react": "^16.12.0",
"react-dom": "^16.12.0",
"react-hook-form": "^4.8.0",
"react-router-dom": "^5.1.2",
"autoprefixer": "^9.7.6",
"broker-grid": "1.0.5",
"postcss-cli": "^7.1.1",
"react": "^16.13.1",
"react-dom": "^16.13.1",
"react-hook-form": "^4.10.2",
"react-router-dom": "^5.2.0",
"react-scripts": "3.3.0",
"tailwindcss": "^1.1.4",
"tailwindcss": "^1.4.6",
"tailwindcss-spinner": "^1.0.0",
"uuid": "^3.4.0"
},

View File

@ -18,7 +18,7 @@ function Insert(props) {
const onSubmit = values => {
const apiEndpoint = process.env.REACT_APP_API + '/insert';
const vals = JSON.stringify(values);
const v = `{"collection_id": "${id}", "event": "covid", "timestamp": ${stamp}, "data": ${vals} }`;
const v = `{"collection_id": "${id}", "tenant_id":"112718d1-a0be-4468-b902-0749c3d964ae", "event": "covid", "timestamp": ${stamp}, "data": ${vals} }`;
fetch(apiEndpoint, {
method: 'post',
mode: 'cors',
@ -80,7 +80,8 @@ function Insert(props) {
function Load(props) {
const { handleSubmit, register, errors } = useForm();
const sseURL = `${process.env.REACT_APP_API}/events/${process.env.REACT_APP_TENANT}`;
const insertURL = `${process.env.REACT_APP_API}/insert`;
return (
<Router>
<div>
@ -93,7 +94,7 @@ function Load(props) {
}
{props.jwt.length > 0 &&
<div class="mt-20 mx-20">
<Grid endpoint={process.env.REACT_APP_API} eventListen={'covid'} title={'Fight Covid'} token={props.jwt} />
<Grid sseEndpoint={sseURL} tenantID={process.env.REACT_APP_TENANT} insertEndpoint={insertURL} eventListen={'covid'} title={'Fight Covid'} token={props.jwt} />
</div>
}
</Route>

File diff suppressed because it is too large Load Diff

View File

@ -57,6 +57,7 @@ pub struct SSE {
pub data: String,
pub id: String,
pub retry: Duration,
pub tenant_id: uuid::Uuid,
}
#[derive(Deserialize, Debug, Clone)]
@ -83,6 +84,7 @@ pub struct User {
username: String,
password: String,
collection_id: uuid::Uuid,
tenant_id: uuid::Uuid,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -90,6 +92,7 @@ pub struct UserForm {
username: String,
password: String,
collection_id: uuid::Uuid,
tenant_id: uuid::Uuid,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -115,6 +118,7 @@ pub struct Event {
pub id: uuid::Uuid,
pub user_id: uuid::Uuid,
pub collection_id: uuid::Uuid,
pub tenant_id: uuid::Uuid,
pub event: String,
pub timestamp: i64,
pub published: bool,
@ -125,13 +129,14 @@ pub struct Event {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm {
collection_id: uuid::Uuid,
tenant_id: uuid::Uuid,
event: String,
timestamp: i64,
data: serde_json::Value,
}
// helper function to create sse events
fn get_events() -> Vec<SSE> {
fn get_events(tenant_id: uuid::Uuid) -> Vec<SSE> {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let mut vals : Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
@ -139,7 +144,7 @@ fn get_events() -> Vec<SSE> {
if k.contains("_v_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let evt : Event = serde_json::from_str(&v).unwrap();
if !evt.cancelled {
if !evt.cancelled && evt.tenant_id == tenant_id {
return true
} else {
return false
@ -209,7 +214,7 @@ fn get_events() -> Vec<SSE> {
let guid = Uuid::new_v4().to_string();
let events_json = json!({"events": evts, "columns": colz, "rows": rows});
sse_events.push(SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000)});
sse_events.push(SSE{id: guid, event: evt, data: serde_json::to_string(&events_json).unwrap(), retry: Duration::from_millis(5000), tenant_id: tenant_id});
}
sse_events
}
@ -227,16 +232,23 @@ pub fn get_ntp_time() -> i64 {
}
// cancel future event
fn cancel(tree: sled::Db, id: String) -> String {
fn cancel(tree: sled::Db, event_id: String, user_id: String) -> String {
let versioned = format!("_v_{}", id);
let versioned = format!("_u_{}", user_id);
let g = tree.get(&versioned.as_bytes()).unwrap().unwrap();
let v = std::str::from_utf8(&g).unwrap().to_owned();
let user : User = serde_json::from_str(&v).unwrap();
let versioned = format!("_v_{}", event_id);
let g = tree.get(&versioned.as_bytes()).unwrap().unwrap();
let v = std::str::from_utf8(&g).unwrap().to_owned();
let mut json : Event = serde_json::from_str(&v).unwrap();
let j = json.clone();
json.cancelled = true;
let _ = tree.compare_and_swap(versioned.as_bytes(), Some(serde_json::to_string(&j).unwrap().as_bytes()), Some(serde_json::to_string(&json).unwrap().as_bytes()));
let _ = tree.flush();
if json.tenant_id == user.tenant_id {
json.cancelled = true;
let _ = tree.compare_and_swap(versioned.as_bytes(), Some(serde_json::to_string(&j).unwrap().as_bytes()), Some(serde_json::to_string(&json).unwrap().as_bytes()));
let _ = tree.flush();
}
json!({"event": json}).to_string()
}
@ -302,15 +314,20 @@ fn user_collection(tree: sled::Db, id: String) -> String {
}
// display collection of events based on collection_id
fn collection(tree: sled::Db, id: String) -> String {
fn collection(tree: sled::Db, collection_id: String, user_id: String) -> String {
let versioned = format!("_u_{}", user_id);
let g = tree.get(&versioned.as_bytes()).unwrap().unwrap();
let v = std::str::from_utf8(&g).unwrap().to_owned();
let user : User = serde_json::from_str(&v).unwrap();
let mut records: Vec<Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if k.contains(&"_v_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let j : Event = serde_json::from_str(&v).unwrap();
if j.collection_id.to_string() == id {
if j.collection_id.to_string() == collection_id && j.tenant_id == user.tenant_id {
return true
} else {
return false
@ -363,7 +380,7 @@ fn user_create(tree: sled::Db, user_form: UserForm) -> (bool, String) {
let uuid = Uuid::new_v4();
let versioned = format!("_u_{}", uuid.to_string());
let hashed = hash(user_form.clone().password, DEFAULT_COST).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, collection_id: user_form.collection_id };
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, collection_id: user_form.clone().collection_id, tenant_id: user_form.clone().tenant_id };
let _ = tree.compare_and_swap(versioned.as_bytes(), None as Option<&[u8]>, Some(serde_json::to_string(&new_user).unwrap().as_bytes()));
let _ = tree.flush();
@ -514,32 +531,42 @@ fn jwt_verify(config: Config, token: String) -> JWT {
}
// insert an event
fn insert(tree: sled::Db, user_id_str: String, evt: EventForm) -> String {
fn insert(tree: sled::Db, user_id: String, evt: EventForm) -> String {
let user_id = uuid::Uuid::parse_str(&user_id_str).unwrap();
// get user
let versioned = format!("_u_{}", user_id);
let g = tree.get(&versioned.as_bytes()).unwrap().unwrap();
let v = std::str::from_utf8(&g).unwrap().to_owned();
let user : User = serde_json::from_str(&v).unwrap();
// build event object
let id = Uuid::new_v4();
let j = Event{id: id, published: false, cancelled: false, data: evt.data, event: evt.event, timestamp: evt.timestamp, user_id: user_id, collection_id: evt.collection_id};
let j = Event{id: id, published: false, cancelled: false, data: evt.data, event: evt.event, timestamp: evt.timestamp, user_id: user.id, collection_id: evt.collection_id, tenant_id: evt.tenant_id};
let new_value_string = serde_json::to_string(&j).unwrap();
let new_value = new_value_string.as_bytes();
let versioned = format!("_v_{}", id.to_string());
let _ = tree.compare_and_swap(versioned, None as Option<&[u8]>, Some(new_value.clone()));
let _ = tree.flush();
// only write if form tenant_id and user tenant_id
if user.tenant_id == evt.tenant_id {
let _ = tree.compare_and_swap(versioned, None as Option<&[u8]>, Some(new_value.clone()));
let _ = tree.flush();
return json!({"event": j}).to_string()
}
json!({"event": j}).to_string()
json!({"error": "trying to write to wrong tenant"}).to_string()
}
// create a sse event
fn event_stream(rx: crossbeam::channel::Receiver<SSE>, allowed: bool) -> Result<impl ServerSentEvent, Infallible> {
if allowed {
let sse = match rx.try_recv() {
Ok(sse) => sse,
Err(_) => {
let guid = Uuid::new_v4().to_string();
let id = Uuid::new_v4();
let guid = id.to_string();
let polling = json!({"status": "polling"});
SSE{id: guid, event: "internal_status".to_owned(), data: polling.to_string(), retry: Duration::from_millis(5000)}
SSE{id: guid, event: "internal_status".to_owned(), data: polling.to_string(), retry: Duration::from_millis(5000), tenant_id: id}
}
};
Ok((
@ -549,9 +576,10 @@ fn event_stream(rx: crossbeam::channel::Receiver<SSE>, allowed: bool) -> Result<
warp::sse::retry(sse.retry),
))
} else {
let guid = Uuid::new_v4().to_string();
let id = Uuid::new_v4();
let guid = id.to_string();
let denied = json!({"error": "denied"});
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: denied.to_string(), retry: Duration::from_millis(5000)};
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: denied.to_string(), retry: Duration::from_millis(5000), tenant_id: id};
Ok((
warp::sse::id(sse.id),
warp::sse::data(sse.data),
@ -668,19 +696,15 @@ pub async fn broker() {
let mut new_json = v.clone();
new_json.published = true;
let newest_json = new_json.clone();
let newer_json = newest_json.clone();
let tree_cloned = tree.clone();
let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newest_json).unwrap().as_bytes()));
let _ = tree_cloned.flush();
}).await;
// only publish if events match - need to be published
for event in get_events() {
if event.event == new_json.event {
tx2.lock().unwrap().broadcast(event);
}
}
tx2.lock().unwrap().broadcast(newer_json);
}
}
});
@ -692,7 +716,8 @@ pub async fn broker() {
let sse_route = warp::path("events")
.and(auth_check)
.and(with_sender)
.and(warp::get()).map(move |jwt: JWT, tx_main: Arc<Mutex<bus::Bus<SSE>>>| {
.and(warp::path::param::<uuid::Uuid>())
.and(warp::get()).map(move |jwt: JWT, tx_main: Arc<Mutex<bus::Bus<Event>>>, tenant_id: uuid::Uuid| {
// create recv for bus (each sse instance must have its own)
let mut rx_main = tx_main.lock().unwrap().add_rx();
@ -701,21 +726,29 @@ pub async fn broker() {
let (tx, rx) = unbounded();
// loop through sse events to send on load of sse route
for event in get_events() {
for event in get_events(tenant_id) {
let _ = tx.send(event);
}
// every 100ms check the bus and if any messages send to local channel also check local channel and publish to stream (sse route)
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
let sse = match rx_main.try_recv() {
Ok(sse) => sse,
let evt = match rx_main.try_recv() {
Ok(evt) => {
if tenant_id == evt.tenant_id {
evt
} else {
let id = Uuid::new_v4();
Event{id: id, published: false, cancelled: false, data: json!({"test": "test"}), event: "fake".to_owned(), timestamp: 123, user_id: id, collection_id: id, tenant_id: id}
}
},
Err(_) => {
let guid = Uuid::new_v4().to_string();
let polling = json!({"status": "polling"});
SSE{id: guid, event: "internal_status".to_owned(), data: polling.to_string(), retry: Duration::from_millis(5000)}
let id = Uuid::new_v4();
Event{id: id, published: false, cancelled: false, data: json!({"test": "test"}), event: "fake".to_owned(), timestamp: 123, user_id: id, collection_id: id, tenant_id: id}
}
};
let _ = tx.send(sse);
for event in get_events(evt.tenant_id) {
let _ = tx.send(event);
}
event_stream(rx.clone(), jwt.check)
});
warp::sse::reply(event_stream)
@ -726,10 +759,10 @@ pub async fn broker() {
.and(warp::path("cancel"))
.and(auth_check)
.and(warp::path::param::<String>())
.map(move |jwt: JWT, id: String| {
.map(move |jwt: JWT, event_id: String| {
if jwt.check {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let record = cancel(tree.clone(), id);
let record = cancel(tree.clone(), event_id, jwt.claims.sub);
let reply = warp::reply::with_status(record, StatusCode::OK);
warp::reply::with_header(reply, "Content-Type", "application/json")
} else {
@ -743,10 +776,10 @@ pub async fn broker() {
.and(warp::path("collections"))
.and(auth_check)
.and(warp::path::param::<String>())
.map(move |jwt: JWT, id: String| {
.map(move |jwt: JWT, collection_id: String| {
if jwt.check {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let record = collection(tree.clone(), id);
let record = collection(tree.clone(), collection_id, jwt.claims.sub);
let reply = warp::reply::with_status(record, StatusCode::OK);
warp::reply::with_header(reply, "Content-Type", "application/json")
} else {
@ -773,7 +806,12 @@ pub async fn broker() {
// create cors wrapper
let configure = config();
let cors = warp::cors().allow_origin(&*configure.origin).allow_methods(vec!["GET", "POST"]).allow_headers(vec![warp::http::header::AUTHORIZATION, warp::http::header::CONTENT_TYPE]);
let mut cors = warp::cors().allow_origin(&*configure.origin).allow_methods(vec!["GET", "POST"]).allow_headers(vec![warp::http::header::AUTHORIZATION, warp::http::header::CONTENT_TYPE]);
// handle allow any origin case
if configure.origin == "*" {
cors = warp::cors().allow_any_origin().allow_methods(vec!["GET", "POST"]).allow_headers(vec![warp::http::header::AUTHORIZATION, warp::http::header::CONTENT_TYPE]);
}
// create routes
let routes = warp::any().and(login_route).or(user_create_route).or(insert_route).or(sse_route).or(cancel_route).or(collections_route).or(user_collection_route).with(cors);

View File

@ -5,13 +5,13 @@ use base64::encode;
#[tokio::test]
async fn test1() {
let user1 = json!({"username": "rust22", "password": "rust", "collection_id":"3ca76743-8d99-4d3f-b85c-633ea456f90c"});
let user2 = json!({"username": "rust23", "password": "rust", "collection_id":"3ca76743-8d99-4d3f-b85c-633ea456f90d"});
let user1 = json!({"username": "rust22", "password": "rust", "collection_id":"3ca76743-8d99-4d3f-b85c-633ea456f90c", "tenant_id": "e69d88c2-135e-4280-9cd8-d4a5edd8642a"});
let user2 = json!({"username": "rust23", "password": "rust", "collection_id":"3ca76743-8d99-4d3f-b85c-633ea456f90d", "tenant_id": "e69d88c2-135e-4280-9cd8-d4a5edd8642a"});
let user1_login = json!({"username": "rust22", "password": "rust"});
let event1 = json!({"event": "test", "collection_id": "3ca76743-8d99-4d3f-b85c-633ea456f90c", "timestamp": 1578667309, "data": "{}"});
let event1 = json!({"event": "test", "tenant_id": "e69d88c2-135e-4280-9cd8-d4a5edd8642a", "collection_id": "3ca76743-8d99-4d3f-b85c-633ea456f90c", "timestamp": 1578667309, "data": "{}"});
let now = broker::get_ntp_time();
let x = now + 1000;
let event2 = json!({"event": "user", "collection_id": "3ca76743-8d99-4d3f-b85c-633ea456f90d", "timestamp": x, "data": "{}"});
let event2 = json!({"event": "user", "tenant_id": "e69d88c2-135e-4280-9cd8-d4a5edd8642a", "collection_id": "3ca76743-8d99-4d3f-b85c-633ea456f90d", "timestamp": x, "data": "{}"});
let client = reqwest::Client::new();