mirror of https://github.com/apibillme/broker
update to 8.0.0
This commit is contained in:
parent
f0e3c4bb38
commit
0ab4c72281
10
CHANGELOG.md
10
CHANGELOG.md
|
@ -4,13 +4,19 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [8.0.x] - 2021-03-23
|
||||
|
||||
### Added
|
||||
- Adds admin_token to create user and command args
|
||||
- Adds multi-tenancy
|
||||
|
||||
## [7.0.x] - 2021-03-22
|
||||
|
||||
### Added
|
||||
- tide-acme
|
||||
- Adds tide-acme
|
||||
|
||||
## Changed
|
||||
- command args
|
||||
- Changes command args
|
||||
|
||||
## [6.1.x] - 2021-03-21
|
||||
|
||||
|
|
|
@ -461,7 +461,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "broker"
|
||||
version = "7.0.0"
|
||||
version = "8.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-std",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "broker"
|
||||
version = "7.0.0"
|
||||
version = "8.0.0"
|
||||
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
|
|
28
README.md
28
README.md
|
@ -18,6 +18,8 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
|
|||
* Under 500 lines of code
|
||||
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
|
||||
* Supports CORS
|
||||
* Add users with admin token permission
|
||||
* Multi-tenant
|
||||
* Supports SSL - full end-to-end encryption
|
||||
* Provides user authentication with JWTs with stored Argon2 passwords
|
||||
* Uses Global NTP servers and doesn't rely on your local server time
|
||||
|
@ -58,9 +60,10 @@ POST /users
|
|||
- public endpoint
|
||||
- POST JSON to create a user
|
||||
```json
|
||||
{"username":{...}, "password":{...}}
|
||||
{"username":{...}, "password":{...}, "admin_token":{...}, "tenant_name":{...}}
|
||||
```
|
||||
- where {...} is for username is a string and password is a string
|
||||
- where {...} is for username is a string, password is a string, admin_token is a string, and tenant_name is a string
|
||||
- admin_token is required and can be set in the command args - it is for not allowing everyone to add a user - the default is `letmein`
|
||||
|
||||
will return `200` or `500` or `400`
|
||||
|
||||
|
@ -99,9 +102,9 @@ POST /insert
|
|||
- authenticated endpoint (Authorization: Bearer {jwt})
|
||||
- POST JSON to insert an event
|
||||
```json
|
||||
{"event":{...}, "data":{...}}
|
||||
{"event":{...}, "data":{...}, "tenant_name":{...}}
|
||||
```
|
||||
- where {...} is for the event a string and data is any JSON you want
|
||||
- where {...} is for the event a string, data is any JSON you want, and tenant_name is the name of the tenant
|
||||
|
||||
will return: `200` or `500` or `400` or `401`
|
||||
|
||||
|
@ -109,15 +112,16 @@ will return: `200` or `500` or `400` or `401`
|
|||
|
||||
``` cargo install broker ```
|
||||
|
||||
- the origin can be passed in as a flag - default *
|
||||
- the port can be passed in as a flag - default 8080 - can only be set for unsecure connections
|
||||
- the jwt_expiry (for jwts) can be passed in as a flag - default 86400
|
||||
- the jwt_secret (for jwts) should be passed in as a flag - default secret
|
||||
- the secure flag (https) and can be true or false - default false
|
||||
- the certs flag is the storage path of LetsEncrypt certs - default certs
|
||||
- the db flag is the path where the embedded database will be saved - default tmp
|
||||
- the origin can be passed in as a flag - default `*`
|
||||
- the port can be passed in as a flag - default `8080` - can only be set for unsecure connections
|
||||
- the jwt_expiry (for jwts) can be passed in as a flag - default `86400`
|
||||
- the jwt_secret (for jwts) should be passed in as a flag - default `secret`
|
||||
- the secure flag (https) and can be true or false - default `false`
|
||||
- the certs flag is the storage path of LetsEncrypt certs - default `certs`
|
||||
- the db flag is the path where the embedded database will be saved - default `tmp`
|
||||
- the domain flag is the domain name (e.g. api.broker.com) of the domain you want to register with LetsEncrypt - must be fully resolvable
|
||||
- production example: `./broker --secure="true" --jwt_secret="xTJEX234$##$" --domain="api.broker.com"`
|
||||
- the admin_token flag is the password for the admin to add users - default `letmein`
|
||||
- production example: `./broker --secure="true" --admin_token"23ce4234@123$" --jwt_secret="xTJEX234$##$" --domain="api.broker.com"`
|
||||
|
||||
### Service
|
||||
|
||||
|
|
108
src/main.rs
108
src/main.rs
|
@ -40,6 +40,7 @@ pub struct EnvVarConfig {
|
|||
pub secure: bool,
|
||||
pub certs: String,
|
||||
pub domain: String,
|
||||
pub admin_token: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
|
@ -47,12 +48,15 @@ pub struct User {
|
|||
id: uuid::Uuid,
|
||||
username: String,
|
||||
password: String,
|
||||
tenant_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct UserForm {
|
||||
username: String,
|
||||
password: String,
|
||||
tenant_name: String,
|
||||
admin_token: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
|
@ -76,12 +80,14 @@ pub struct Event {
|
|||
pub event: String,
|
||||
pub timestamp: i64,
|
||||
pub data: serde_json::Value,
|
||||
pub tenant_name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct EventForm {
|
||||
event: String,
|
||||
data: serde_json::Value,
|
||||
tenant_name: String,
|
||||
}
|
||||
|
||||
fn replace(key: String, value: Vec<u8>) -> Result<()> {
|
||||
|
@ -140,21 +146,28 @@ fn puts_event(event: Event) -> Result<()> {
|
|||
|
||||
fn user_create(user_form: UserForm) -> Result<Option<String>> {
|
||||
|
||||
if !is_user_unique(user_form.clone().username)? {
|
||||
let j = json!({"error": "username already taken"}).to_string();
|
||||
return Ok(Some(j));
|
||||
let configure = env_var_config();
|
||||
|
||||
if configure.admin_token == user_form.clone().admin_token {
|
||||
if !is_user_unique(user_form.clone().username)? {
|
||||
let j = json!({"error": "username already taken"}).to_string();
|
||||
return Ok(Some(j));
|
||||
} else {
|
||||
// set as future value
|
||||
let uuid = Uuid::new_v4();
|
||||
let config = Argon2Config::default();
|
||||
let uuid_string = Uuid::new_v4().to_string();
|
||||
let salt = uuid_string.as_bytes();
|
||||
let password = user_form.password.as_bytes();
|
||||
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
|
||||
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, tenant_name: user_form.clone().tenant_name };
|
||||
|
||||
puts_user(new_user).unwrap();
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
// set as future value
|
||||
let uuid = Uuid::new_v4();
|
||||
let config = Argon2Config::default();
|
||||
let uuid_string = Uuid::new_v4().to_string();
|
||||
let salt = uuid_string.as_bytes();
|
||||
let password = user_form.password.as_bytes();
|
||||
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
|
||||
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed };
|
||||
|
||||
puts_user(new_user).unwrap();
|
||||
return Ok(None);
|
||||
let j = json!({"error": "admin_token is incorrect"}).to_string();
|
||||
return Ok(Some(j));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,6 +203,7 @@ fn env_var_config() -> EnvVarConfig {
|
|||
let mut db: String = "tmp".to_string();
|
||||
let mut certs = "certs".to_string();
|
||||
let mut domain = "localhost".to_string();
|
||||
let mut admin_token = "letmein".to_string();
|
||||
let _ : Vec<String> = go_flag::parse(|flags| {
|
||||
flags.add_flag("port", &mut port);
|
||||
flags.add_flag("origin", &mut origin);
|
||||
|
@ -199,9 +213,10 @@ fn env_var_config() -> EnvVarConfig {
|
|||
flags.add_flag("db", &mut db);
|
||||
flags.add_flag("domain", &mut domain);
|
||||
flags.add_flag("certs", &mut certs);
|
||||
flags.add_flag("admin_token", &mut admin_token);
|
||||
});
|
||||
|
||||
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, domain, certs, db}
|
||||
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, domain, certs, db, admin_token}
|
||||
}
|
||||
|
||||
fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
|
||||
|
@ -234,13 +249,14 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
|
|||
Some(user) => {
|
||||
let timestamp = nippy::get_unix_ntp_time().await?;
|
||||
let id = uuid::Uuid::new_v4();
|
||||
|
||||
|
||||
let event = Event{
|
||||
id,
|
||||
data: event_form.data,
|
||||
event: event_form.event,
|
||||
user_id: user.id,
|
||||
timestamp,
|
||||
tenant_name: event_form.tenant_name,
|
||||
};
|
||||
|
||||
puts_event(event.clone())?;
|
||||
|
@ -253,8 +269,8 @@ async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
|
|||
}
|
||||
|
||||
async fn create_user(mut req: Request<()>) -> tide::Result {
|
||||
let r = req.body_string().await.unwrap();
|
||||
let user_form : UserForm = serde_json::from_str(&r).unwrap();
|
||||
let r = req.body_string().await?;
|
||||
let user_form : UserForm = serde_json::from_str(&r)?;
|
||||
match user_create(user_form)? {
|
||||
Some(err) => {
|
||||
let err = format!("error: {}", err);
|
||||
|
@ -285,12 +301,12 @@ async fn insert_event(mut req: Request<()>) -> tide::Result {
|
|||
match token_value {
|
||||
Some(token_header) => {
|
||||
let token = token_header.last().to_string();
|
||||
let jwt_value = jwt_verify(token).unwrap();
|
||||
let jwt_value = jwt_verify(token)?;
|
||||
match jwt_value {
|
||||
Some(jwt) => {
|
||||
let r = req.body_string().await.unwrap();
|
||||
let event_form : EventForm = serde_json::from_str(&r).unwrap();
|
||||
insert(jwt.claims.sub, event_form).await.unwrap();
|
||||
let r = req.body_string().await?;
|
||||
let event_form : EventForm = serde_json::from_str(&r)?;
|
||||
insert(jwt.claims.sub, event_form).await?;
|
||||
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
|
||||
},
|
||||
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
|
||||
|
@ -324,35 +340,43 @@ async fn main() -> tide::Result<()> {
|
|||
match token_value {
|
||||
Some(token_header) => {
|
||||
let token = token_header.last().to_string();
|
||||
let jwt_value = jwt_verify(token).unwrap();
|
||||
let jwt_value = jwt_verify(token)?;
|
||||
match jwt_value {
|
||||
Some(_) => {
|
||||
Some(jwt) => {
|
||||
let user_value = get_user_by_username(jwt.claims.sub)?;
|
||||
|
||||
let mut cache: HashMap<String, Event> = HashMap::new();
|
||||
match user_value {
|
||||
Some(user) => {
|
||||
let mut cache: HashMap<String, Event> = HashMap::new();
|
||||
|
||||
let mut interval = stream::interval(Duration::from_millis(100));
|
||||
while let Some(_) = interval.next().await {
|
||||
let events = get_events()?;
|
||||
let mut interval = stream::interval(Duration::from_millis(100));
|
||||
while let Some(_) = interval.next().await {
|
||||
let events = get_events()?;
|
||||
|
||||
for evt in events {
|
||||
if !cache.contains_key(&evt.event) {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
|
||||
cache.insert(evt.event.clone(), evt.clone());
|
||||
} else {
|
||||
let value_maybe = cache.get_key_value(&evt.event);
|
||||
match value_maybe {
|
||||
Some((_, v)) => {
|
||||
if &evt != v {
|
||||
for evt in events {
|
||||
if evt.tenant_name == user.tenant_name {
|
||||
if !cache.contains_key(&evt.event) {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
|
||||
cache.insert(evt.event.clone(), evt.clone());
|
||||
cache.insert(evt.event.clone(), evt.clone());
|
||||
} else {
|
||||
let value_maybe = cache.get_key_value(&evt.event);
|
||||
match value_maybe {
|
||||
Some((_, v)) => {
|
||||
if &evt != v {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
|
||||
cache.insert(evt.event.clone(), evt.clone());
|
||||
}
|
||||
},
|
||||
None => { return Ok(()); }
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
None => { return Ok(()); }
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue