update to 10.0.0

This commit is contained in:
Bevan Hunt 2021-04-05 12:11:07 -07:00
parent 155b14d023
commit f8ed649000
5 changed files with 298 additions and 62 deletions

View File

@ -4,25 +4,44 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [10.0.0] - 2021-04-05
## Fixed
- Fixed infinite SSE event sending
### Changed
- Changed RocksDB keys for user and event to be tenanted
- Changed default RocksDB path from tmp to db
- Changed create user endpoint URL
### Added
- Added list users endpoint
- Added revoke user endpoint
- Added get user endpoint
- Added unrevoke user endpoint
### Updated
- Updated README
## [9.1.0] - 2021-04-02
### Added
- verify optional endpoint
- Added verify endpoint
## [9.0.2] - 2021-03-25
### Fixed
- keys on event
- Fixed keys on event
## [9.0.0] - 2021-03-25
### Removed
- removed tenant_name from event - uses users event name
- Removed tenant_name from event - uses users event name
## [8.1.x] - 2021-03-25
### Added
- Adds http basic auth
- Added http basic auth
## [8.0.x] - 2021-03-23
@ -33,10 +52,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [7.0.x] - 2021-03-22
### Added
- Adds tide-acme
- Added tide-acme
## Changed
- Changes command args
- Changed command args
## [6.1.x] - 2021-03-21

2
Cargo.lock generated
View File

@ -459,7 +459,7 @@ dependencies = [
[[package]]
name = "broker"
version = "9.1.0"
version = "10.0.0"
dependencies = [
"anyhow",
"async-std",

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "9.1.0"
version = "10.0.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -19,7 +19,7 @@ serde_derive = "1"
json = "0.12"
rocksdb = "0.15"
uuid = { version = "0.8", features = ["serde", "v4"] }
jsonwebtoken = "7.0.1"
jsonwebtoken = "7"
go-flag = "0.1"
lazy_static = "1.4"
nippy = "2"

129
README.md
View File

@ -15,7 +15,7 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
### Features
* Very performant with almost no CPU and memory usage
* Under 500 lines of code
* About 500 lines of code
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
* Supports CORS
* Add users with admin token permission
@ -28,6 +28,7 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
* Sync latest events on SSE client connection
* Auto-provision and renews SSL cert via LetsEncrypt
* Verify endpoint for external services using Broker user system like [portal](https://crates.io/crates/portal)
* User Management API endpoints (revoke, unrevoke, list, get)
### How it works
@ -44,9 +45,11 @@ The side-effect of this system is that the latest event is the schema. This is p
* [React Debounce Input](https://www.npmjs.com/package/react-debounce-input) - React input for Real-time Submission (Edit in Place forms)
### Broker FAQ
* Why compete against Parse Server and Firebase?
* Why compete against Parse Server, Auth0, and Firebase?
[Firebase](https://firebase.google.com/) is not open-source, is not free, and has complicated pricing. [Parse Server](https://github.com/parse-community/parse-server) doesn't have real-time features and is about 30,000 LOC of JS.
* [Firebase](https://firebase.google.com/) is not open-source, is not free, and has complicated pricing.
* [Parse Server](https://github.com/parse-community/parse-server) doesn't have real-time features and is about 30,000 LOC of JS.
* [Auth0](https://auth0.com) is not open-source, is not free, and is expensive.
* Will broker work with mobile apps?
@ -57,14 +60,17 @@ Yes with React Native. There may be native 3rd party libraries for SSE that work
#### Step 1 - create a user
```html
POST /users
POST /create_user
```
- public endpoint
- POST JSON to create a user
```json
{"username":{...}, "password":{...}, "admin_token":{...}, "tenant_name":{...}}
{
"username": "bob",
"password": "password1",
"admin_token": "letmein",
"tenant_name": "tenant_1"
}
```
- where {...} is for username is a string, password is a string, admin_token is a string, and tenant_name is a string
- admin_token is required and can be set in the command args - it is for not allowing everyone to add a user - the default is `letmein`
will return `200` or `500` or `400`
@ -75,15 +81,18 @@ will return `200` or `500` or `400`
POST /login
```
- public endpoint
- POST JSON to login
```json
{"username":{...}, "password":{...}}
{
"username": "bob",
"password": "password1"
}
```
- where {...} is for username is a string and password is a string
will return
```json
{"jwt":{...}}
{
"jwt": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTc2NzQ5MTUsImlhdCI6MTYxNzU4ODUxNSwiaXNzIjoiRGlzcGF0Y2hlciIsInN1YiI6ImZvbyJ9.OwiaZJcFUC_B0CA0ffRZVTWKRf5_vQ7vt5USNJEeKRE"
}
```
- where {...} is a JWT (string)
@ -102,11 +111,15 @@ GET /sse
POST /insert
```
- authenticated endpoint (Authorization: Bearer {jwt}) or (Authorization: Basic {username:password})
- POST JSON to insert an event
```json
{"event":{...}, "data":{...}}
{
"event": "test",
"data": {
"name": "robert",
"image": "https://img.com/bucket/123/123.jpg"
}
}
```
- where {...} is for the event a string and data is any JSON you want
will return: `200` or `500` or `400` or `401`
@ -120,6 +133,91 @@ GET /verify
will return: `200` or `500` or `401`
#### Optional - revoke user
```html
POST /revoke_user
```
- public endpoint
```json
{
"admin_token": "letmein",
"username": "bob"
}
```
will return: `200` or `500` or `400` or `401`
- note: revoked users cannot login
#### Optional - unrevoke user
```html
POST /unrevoke_user
```
- public endpoint
```json
{
"admin_token": "letmein",
"username": "bob"
}
```
will return: `200` or `500` or `400` or `401`
#### Optional - list users
```html
POST /list_users
```
- public endpoint
```json
{
"admin_token": "letmein",
}
```
- where {...} is for the event a string and data is any JSON you want
will return: `200` or `500` or `400` or `401`
200 - will return an array of objects
```json
[
{
"id": "69123c04-fa42-4193-a6c5-ab2fc27658b1",
"password": "***",
"revoked": false,
"tenant_name": "tenant_1",
"username": "bob"
}
]
```
#### Optional - get user
```html
POST /get_user
```
- public endpoint
```json
{
"admin_token": "letmein",
"username": "bob"
}
```
- where {...} is for the event a string and data is any JSON you want
will return: `200` or `500` or `400` or `401`
200 - will return an array of objects
```json
{
"id": "69123c04-fa42-4193-a6c5-ab2fc27658b1",
"password": "***",
"revoked": false,
"tenant_name": "tenant_1",
"username": "bob"
}
```
### Install
@ -131,7 +229,7 @@ will return: `200` or `500` or `401`
- the jwt_secret (for jwts) should be passed in as a flag - default `secret`
- the secure flag (https) and can be true or false - default `false`
- the certs flag is the storage path of LetsEncrypt certs - default `certs`
- the db flag is the path where the embedded database will be saved - default `tmp`
- the db flag is the path where the embedded database will be saved - default `db`
- the domain flag is the domain name (e.g. api.broker.com) of the domain you want to register with LetsEncrypt - must be fully resolvable
- the admin_token flag is the password for the admin to add users - default `letmein`
- production example: `./broker --secure="true" --admin_token"23ce4234@123$" --jwt_secret="xTJEX234$##$" --domain="api.broker.com"`
@ -147,6 +245,7 @@ There is an example `systemctl` service for Ubuntu called `broker.service` in th
### Inspiration
* [Auth0](https://auth0.com)
* [React Hooks](https://reactjs.org/docs/hooks-intro.html)
* [Meteor](https://meteor.com)
* [MongoDB](https://www.mongodb.com/)

View File

@ -45,24 +45,36 @@ pub struct EnvVarConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct User {
id: uuid::Uuid,
username: String,
password: String,
tenant_name: String,
pub id: uuid::Uuid,
pub revoked: bool,
pub username: String,
pub password: String,
pub tenant_name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserForm {
username: String,
password: String,
tenant_name: String,
admin_token: String,
pub username: String,
pub password: String,
pub tenant_name: String,
pub admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AdminTokenForm {
pub admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RevokeUserForm {
pub admin_token: String,
pub username: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LoginForm {
username: String,
password: String,
pub username: String,
pub password: String,
}
#[derive(Debug, Serialize, Deserialize)]
@ -85,8 +97,8 @@ pub struct Event {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm {
event: String,
data: serde_json::Value
pub event: String,
pub data: serde_json::Value,
}
fn replace(key: String, value: Vec<u8>) -> Result<()> {
@ -94,14 +106,36 @@ fn replace(key: String, value: Vec<u8>) -> Result<()> {
Ok(())
}
fn soft_delete_user(username: String) -> Result<()> {
match get_user_by_username(username)? {
Some(mut user) => {
user.revoked = true;
puts_user(user)?;
Ok(())
},
None => { Ok(()) }
}
}
fn activate_user(username: String) -> Result<()> {
match get_user_by_username(username)? {
Some(mut user) => {
user.revoked = false;
puts_user(user)?;
Ok(())
},
None => { Ok(()) }
}
}
fn get_user_by_username(user_username: String) -> Result<Option<User>> {
let users = get_users()?;
Ok(users.into_iter().filter(|user| user.username == user_username).last())
}
fn get_users() -> Result<Vec<User>> {
let prefix = "users".as_bytes();
let i = DB.prefix_iterator(prefix);
let prefix = "users".to_string();
let i = DB.prefix_iterator(prefix.as_bytes());
let res : Vec<User> = i.map(|(_, v)| {
let data: User = rmp_serde::from_read_ref(&v).unwrap();
data
@ -110,7 +144,7 @@ fn get_users() -> Result<Vec<User>> {
}
fn puts_user(user: User) -> Result<()> {
let key = format!("users_{}", user.id);
let key = format!("users_{}_{}", user.tenant_name, user.id);
let value = rmp_serde::to_vec_named(&user)?;
replace(key, value)?;
Ok(())
@ -126,9 +160,20 @@ fn is_user_unique(user_username: String) -> Result<bool> {
Ok(true)
}
fn get_events() -> Result<Vec<Event>> {
let prefix = "events".as_bytes();
let i = DB.prefix_iterator(prefix);
fn get_events(tenant_name: Option<String>) -> Result<Vec<Event>> {
let prefix: String;
match tenant_name {
Some(tn) => {
prefix = format!("events_{}", tn);
},
None => {
prefix = format!("events");
}
}
let i = DB.prefix_iterator(prefix.as_bytes());
let res : Vec<Event> = i.map(|(_, v)| {
let data: Event = rmp_serde::from_read_ref(&v).unwrap();
data
@ -137,7 +182,7 @@ fn get_events() -> Result<Vec<Event>> {
}
fn puts_event(event: Event) -> Result<()> {
let key = format!("events_{}", event.id);
let key = format!("events_{}_{}", event.tenant_name, event.event);
let value = rmp_serde::to_vec_named(&event)?;
replace(key, value)?;
Ok(())
@ -152,14 +197,19 @@ fn user_create(user_form: UserForm) -> Result<Option<String>> {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, tenant_name: user_form.clone().tenant_name };
let new_user = User{
id: uuid,
username: user_form.clone().username,
password: hashed,
tenant_name: user_form.clone().tenant_name,
revoked: false,
};
puts_user(new_user).unwrap();
return Ok(None);
@ -172,18 +222,21 @@ fn user_create(user_form: UserForm) -> Result<Option<String>> {
async fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let user_value = get_user_by_username(login.username)?;
match user_value {
match get_user_by_username(login.username)? {
Some(user) => {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let token = encode(&Header::default(), &my_claims, &EncodingKey::from_secret(app.jwt_secret.as_ref()))?;
Ok(Some(token))
if !user.revoked {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let token = encode(&Header::default(), &my_claims, &EncodingKey::from_secret(app.jwt_secret.as_ref()))?;
Ok(Some(token))
} else {
Ok(None)
}
} else {
Ok(None)
}
@ -199,7 +252,7 @@ fn env_var_config() -> EnvVarConfig {
let mut secure = false;
let mut origin = "*".to_string();
let mut jwt_secret = "secret".to_string();
let mut db: String = "tmp".to_string();
let mut db: String = "db".to_string();
let mut certs = "certs".to_string();
let mut domain = "localhost".to_string();
let mut admin_token = "letmein".to_string();
@ -247,7 +300,7 @@ async fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let iss = "Broker".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let my_token = TokenData{
header: Header::default(),
@ -363,6 +416,64 @@ async fn verify_user(req: Request<()>) -> tide::Result {
}
}
async fn get_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let admin_token_form : AdminTokenForm = serde_json::from_str(&r)?;
let configure = env_var_config();
if configure.admin_token == admin_token_form.admin_token {
let users = get_users()?;
let users: Vec<_> = users.iter().map(|user| {
let mut u = user.to_owned();
u.password = "***".to_string();
u
}).collect();
Ok(tide::Response::builder(200).body(json!(users)).header("content-type", "application/json").build())
} else {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
async fn list_users(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let admin_token_form : AdminTokenForm = serde_json::from_str(&r)?;
let configure = env_var_config();
if configure.admin_token == admin_token_form.admin_token {
let users = get_users()?;
let users: Vec<_> = users.iter().map(|user| {
let mut u = user.to_owned();
u.password = "***".to_string();
u
}).collect();
Ok(tide::Response::builder(200).body(json!(users)).header("content-type", "application/json").build())
} else {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
async fn revoke_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let revoke_user_form : RevokeUserForm = serde_json::from_str(&r)?;
let configure = env_var_config();
if configure.admin_token == revoke_user_form.admin_token {
soft_delete_user(revoke_user_form.username)?;
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
} else {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
async fn unrevoke_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let revoke_user_form : RevokeUserForm = serde_json::from_str(&r)?;
let configure = env_var_config();
if configure.admin_token == revoke_user_form.admin_token {
activate_user(revoke_user_form.username)?;
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
} else {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
#[async_std::main]
async fn main() -> tide::Result<()> {
@ -378,9 +489,13 @@ async fn main() -> tide::Result<()> {
app.with(driftwood::DevLogger);
app.with(cors);
app.at("/insert").post(insert_event);
app.at("/users").post(create_user);
app.at("/create_user").post(create_user);
app.at("/login").post(login_user);
app.at("/verify").get(verify_user);
app.at("/list_users").post(list_users);
app.at("/revoke_user").post(revoke_user);
app.at("/get_user").post(get_user);
app.at("/unrevoke_user").post(unrevoke_user);
app.at("/sse").get(tide::sse::endpoint(|req: Request<()>, sender| async move {
@ -400,7 +515,7 @@ async fn main() -> tide::Result<()> {
let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await {
let events = get_events()?;
let events = get_events(None)?;
for evt in events {
if evt.tenant_name == user.tenant_name {
@ -412,13 +527,16 @@ async fn main() -> tide::Result<()> {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let current_data = evt.data.to_string();
let stored_data = v.data.to_string();
if current_data != stored_data {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
},
None => { println!("helo"); return Ok(()); }
None => { return Ok(()); }
}
}
}