add collection_id and fix login

This commit is contained in:
Bevan Hunt 2020-01-07 11:58:46 -08:00
parent 637b921676
commit fde8ee56fd
4 changed files with 31 additions and 24 deletions

8
Cargo.lock generated
View File

@ -386,7 +386,7 @@ dependencies = [
[[package]]
name = "broker"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"actix-cors 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -402,7 +402,7 @@ dependencies = [
"serde_derive 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.30.3 (registry+https://github.com/rust-lang/crates.io-index)",
"sse-actix-web 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sse-actix-web 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1341,7 +1341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "sse-actix-web"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1756,7 +1756,7 @@ dependencies = [
"checksum smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44e59e0c9fa00817912ae6e4e6e3c4fe04455e75699d06eedc7d85917ed8e8f4"
"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
"checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
"checksum sse-actix-web 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "edcc6049640022e5d1fa9faf28390e32ad9ac0490d9f2d5a9f287004812e47c3"
"checksum sse-actix-web 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f74d7d85159b1b22274e7c757839f9439b5cb91cf9725342a0514ee12f88ca82"
"checksum syn 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "dff0acdb207ae2fe6d5976617f887eb1e35a2ba52c13c7234c790960cdad9238"
"checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545"
"checksum termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "96d6098003bde162e4277c70665bd87c326f5a0c3f3fbfb285787fa482d54e6e"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "0.6.0"
version = "0.7.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"
@ -15,7 +15,7 @@ actix-web = "2.0.0"
actix-cors = "0.2.0"
futures = "0.3.1"
tokio = "0.2.4"
sse-actix-web = "0.8.0"
sse-actix-web = "0.8.1"
serde = "1.0.103"
serde_json = "1.0.44"
serde_derive = "1.0.103"

View File

@ -15,7 +15,7 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
### How it works
In Broker you create a user, login, then insert an event and its data with a timestamp. Broker publishes the event when the timestamp is reached to the event stream via SSE. Broker keeps all event versions in its database that can be viewed. Broker can also cancel future events.
In Broker you create a user, login, then insert an event with its data, a collection_id, and a timestamp. Broker publishes the event when the timestamp is reached to the event stream via SSE. Broker keeps all events its database that can be viewed in collections (by collection_id). Broker can also cancel future events.
When the client first subscribes to the SSE connection all the latest events and data is sent to the client. Combined with sending the latest event via SSE when subscribed negates the necessity to do any GET API requests in the lifecycle of an event.
@ -24,6 +24,8 @@ The side-effect of this system is that the latest event is the schema. Old event
#### API
##### Step 1 - create a user
```html
/users
```
@ -40,6 +42,8 @@ will return
```
- where {...} is the uuid (string) of the user
##### Step 2 - login with the user
```html
/login
```
@ -56,6 +60,8 @@ will return
```
- where {...} is a JWT (string)
##### Step 3 - insert an event
```html
/events
```
@ -68,9 +74,9 @@ will return
- authenticated endpoint
- POST JSON to insert an event
```json
{"event":{...}, "id":{...}, "timestamp":{...}, "data":{...}}
{"event":{...}, "collection_id":{...}, "timestamp":{...}, "data":{...}}
```
- where {...} is for the event a string, id is an assigned uuid v4 for the event, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
- where {...} is for the event a string, collection_id is an assigned uuid v4 for the event collection, timestamp is the epoch unix timestamp when you want the event to become the current event, and data is any JSON you want
will return
```json
@ -78,11 +84,13 @@ will return
```
- where {...} is the uuid (string) of the event
##### Optional Endpoints
```html
/events/{id}
/events/collections/{collection_id}
```
- authenticated endpoint
- do a GET request where {id} is the uuid of the event you want the events queue (sorted by ascending timestamp)
- do a GET request where {collection_id} is the uuid of the collection you want (sorted by ascending timestamp)
```html
/events/{id}/cancel

View File

@ -53,7 +53,7 @@ struct UserForm {
struct Event {
id: uuid::Uuid,
user_id: uuid::Uuid,
evt_id: uuid::Uuid,
collection_id: uuid::Uuid,
event: String,
timestamp: i64,
published: bool,
@ -64,7 +64,7 @@ struct Event {
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EventForm {
id: uuid::Uuid,
collection_id: uuid::Uuid,
event: String,
timestamp: i64,
data: serde_json::Value,
@ -124,7 +124,7 @@ async fn collection(data: web::Data<MyData>, path: web::Path<Path>, req: HttpReq
if k.contains(&"_v_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let j : Event = serde_json::from_str(&v).unwrap();
if j.evt_id.to_string() == path.id {
if j.collection_id.to_string() == path.id {
return true
} else {
return false
@ -152,7 +152,7 @@ async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>
let origin = config.origin;
// turn iVec(s) to String(s) and make HashMap
let mut vals: HashMap<String, String> = data.db.iter().into_iter().filter(|x| {
let vals: HashMap<String, String> = data.db.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if !k.contains("_v_") && !k.contains("_u_") {
@ -168,11 +168,6 @@ async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>
(json.event, data)
}).collect();
if vals.is_empty() {
vals = HashMap::new();
vals.insert("connection".to_owned(), "connected".to_owned());
}
// create new client for sse with hashmap of initial values
let rx = broad.lock().unwrap().new_client(vals);
@ -210,7 +205,7 @@ async fn insert(data: web::Data<MyData>, json: web::Json<EventForm>, req: HttpRe
let user_id = uuid::Uuid::parse_str(&user_id_str).unwrap();
let id = Uuid::new_v4();
let j = Event{id: id, published: false, cancelled: false, data: json_clone.data, event: json_clone.event, timestamp: json.timestamp, user_id: user_id, evt_id: json.id};
let j = Event{id: id, published: false, cancelled: false, data: json_clone.data, event: json_clone.event, timestamp: json.timestamp, user_id: user_id, collection_id: json.collection_id};
let new_value_string = serde_json::to_string(&j).unwrap();
let new_value = new_value_string.as_bytes();
let versioned = format!("_v_{}", id.to_string());
@ -337,7 +332,9 @@ async fn login(data: web::Data<MyData>, json: web::Json<Login>) -> Result<HttpRe
let records : HashMap<String, String> = data.db.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if k.contains(&"_u_") {
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let user : User = serde_json::from_str(&v).unwrap();
if k.contains(&"_u_") && user.username == json.username {
return true;
} else {
return false;
@ -351,7 +348,9 @@ async fn login(data: web::Data<MyData>, json: web::Json<Login>) -> Result<HttpRe
for (_k, v) in records {
let user : User = serde_json::from_str(&v).unwrap();
if user.username == json.username && verify(json.clone().password, &user.password).unwrap() {
let json_cloned = json.clone();
let verified = verify(json_cloned.password, &user.password).unwrap();
if verified {
let my_claims = Claims{company: "".to_owned(), sub: user.id.to_string(), exp: expiry};
let token = encode(&Header::default(), &my_claims, secret.as_ref()).unwrap();
return Ok(HttpResponse::Ok().json(Token{jwt: token}))
@ -439,7 +438,7 @@ pub async fn broker_run(origin: String) -> std::result::Result<(), std::io::Erro
.data(MyData{ db: tree_actix.clone() })
.route("/insert", web::post().to(insert))
.route("/events", web::get().to(new_client))
.route("/events/{id}", web::get().to(collection))
.route("/events/collections/{id}", web::get().to(collection))
.route("/events/{id}/cancel", web::get().to(cancel))
.route("/users", web::post().to(user_create))
.route("/login", web::post().to(login))