Secure SSE channel (#8)

This commit is contained in:
Bevan Hunt 2020-01-26 02:28:55 -08:00 committed by GitHub
parent 5cdcc64dcf
commit 066140d9cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 72 deletions

16
Cargo.lock generated
View File

@ -99,7 +99,7 @@ dependencies = [
[[package]]
name = "broker"
version = "2.0.4"
version = "3.0.0"
dependencies = [
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
@ -552,7 +552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -562,7 +562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -600,7 +600,7 @@ dependencies = [
"http 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -658,7 +658,7 @@ dependencies = [
[[package]]
name = "itoa"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -1431,7 +1431,7 @@ name = "serde_json"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1442,7 +1442,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2029,7 +2029,7 @@ dependencies = [
"checksum indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc"
"checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
"checksum js-sys 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7889c7c36282151f6bf465be4700359318aef36baa951462382eae49e9577cf9"
"checksum json 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9a38661a28126f8621fb246611288ae28935ddf180f5e21f2d0fbfe5e4131dbe"
"checksum jsonwebtoken 7.0.0-beta.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8c24a5d8627e3dd2982521e858f74b79f48c2f4981fed860cc765230045302a"

View File

@ -1,6 +1,6 @@
[package]
name = "broker"
version = "2.0.4"
version = "3.0.0"
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
edition = "2018"
license = "MIT"

View File

@ -18,7 +18,7 @@ Broker follows an insert-only/publish/subscribe paradigm rather than a REST CRUD
* Very performant with a low memory footprint that uses about 20MB and 1 CPU thread
* Under 500 lines of code
* Ships as a [Linux Snap](https://snapcraft.io/broker) or [Rust Crate](https://crates.io/crates/broker)
* Real-time Event Stream via SSE
* Secure Real-time Event Stream via SSE - requires the use of [broker-client](https://www.npmjs.com/package/broker-client)
* Has CORS support
* Provides user authentication with JWTs and Bcrypt(ed) passwords
* Handles future events via Epoch UNIX timestamp
@ -38,8 +38,8 @@ The side-effect of this system is that the latest event is the schema. Old event
### Recommeded Services/Libraries to use with Broker
* [broker-client](https://www.npmjs.com/package/broker-client) - the official front-end client for broker
* [Integromat](https://www.integromat.com/) - No-code Event Scheduler that supports many apps like GitHub, Meetup, and etc.
* [React Hooks SSE](https://www.npmjs.com/package/react-hooks-sse) - SSE hook for React
* [React Hook Form](https://react-hook-form.com/) - Best form library for React
* [React Debounce Input](https://www.npmjs.com/package/react-debounce-input) - React input for Real-time Submission (Edit in Place forms)
@ -90,13 +90,16 @@ will return
```
- where {...} is a JWT (string)
#### Step 3 - insert an event
#### Step 3 - connect to SSE
```html
GET /events
```
- public endpoint
- connect your sse-client to this endpoint
- authenticated endpoint (Authorization: Bearer {jwt})
- connect your sse-client to this endpoint using [broker-client](https://www.npmjs.com/package/broker-client)
- note: broker-client uses fetch as eventsource doesn't support headers
#### Step 4 - insert an event
```html
POST /insert
@ -179,7 +182,7 @@ pub async fn main() {
- the save_path where the embedded database will save needs to be passed in as an environment variable
- example: SAVE_PATH=./tmp/broker_data broker -port 8080 -origin http://localhost:3000 -expiry 3600 -secret secret
## Install Linux Snap
### Install Linux Snap
``` sudo snap install broker ```
- note: does not run as a daemon as requires flags
@ -213,6 +216,7 @@ pub async fn main() {
* [Best in Place](https://github.com/bernat/best_in_place)
* [Brock Whitten](https://www.youtube.com/watch?v=qljYMEfVukU)
### Migration from 1.0 to 2.0
### Migrations
- the optional API endpoints URLs have been changed but have the same functionality
- from 2.0 to 3.0: the sse endpoint is now secure and requires to use the broker-client library
- from 1.0 to 2.0: the optional API endpoints URLs have been changed but have the same functionality

View File

@ -2764,6 +2764,11 @@
}
}
},
"broker-client": {
"version": "0.0.4",
"resolved": "https://registry.npmjs.org/broker-client/-/broker-client-0.0.4.tgz",
"integrity": "sha512-+4rKekbpHK9YuQG2k2z4fO/6OiIMVYKKpDaDLIwjnFndcX8y3S3BC/hsms7SsEKu7uJabdLqKk6SMY7a5VtzRA=="
},
"brorand": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/brorand/-/brorand-1.1.0.tgz",

View File

@ -9,6 +9,7 @@
"@testing-library/jest-dom": "^4.2.4",
"@testing-library/react": "^9.4.0",
"@testing-library/user-event": "^7.2.1",
"broker-client": "0.0.4",
"react": "^16.12.0",
"react-debounce-input": "^3.2.2",
"react-dom": "^16.12.0",

View File

@ -6,6 +6,7 @@ import CardContent from '@material-ui/core/CardContent';
import Typography from '@material-ui/core/Typography';
import { useForm } from "react-hook-form";
import {DebounceInput} from 'react-debounce-input';
import BrokerClient from 'broker-client';
const useStyles = makeStyles({
card: {
@ -47,17 +48,13 @@ function App() {
const onSubmit = values => {
const ts = Math.round((new Date()).getTime() / 1000);
const v = `{"event": "user", "published": false, "timestamp": ${ts}, "data": {"user": "${values.user}"}}`;
fetch(apiEndpoint, {
method: 'post',
mode: 'no-cors',
headers: {
'Content-Type': 'application/json'
},
body: v
}).then(response => {
return response.json();
}, err => {
console.log(err);
const sse = new BrokerClient('http://localhost:8080/events', {
headers: new Headers({
authorization: 'Bearer 123',
})
});
sse.addEventListener('user', (messageEvent) => {
console.log(messageEvent);
});
};

View File

@ -1,6 +1,6 @@
name: broker # you probably want to 'snapcraft register <name>'
base: core18 # the base snap is the execution environment for this snap
version: '2.0.4' # just for humans, typically '1.2+git' or '1.3.2'
version: '3.0.0' # just for humans, typically '1.2+git' or '1.3.2'
summary: Real-time Zero-Code API Server # 79 char long summary
description: |
The purpose of this library is to be your real-time zero-code API server.

View File

@ -371,22 +371,33 @@ fn insert(tree: sled::Db, user_id_str: String, evt: EventForm) -> String {
json!({"event": j}).to_string()
}
fn event_stream() -> Result<impl ServerSentEvent, Infallible> {
fn event_stream(allowed: bool) -> Result<impl ServerSentEvent, Infallible> {
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let sse = match rx.try_recv() {
Ok(sse) => sse,
Err(_) => {
let guid = Uuid::new_v4().to_string();
SSE{id: guid, event: "internal_status".to_owned(), data: "polling".to_owned(), retry: Duration::from_millis(5000)}
}
};
Ok((
warp::sse::id(sse.id),
warp::sse::data(sse.data),
warp::sse::event(sse.event),
warp::sse::retry(sse.retry),
))
if allowed {
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let sse = match rx.try_recv() {
Ok(sse) => sse,
Err(_) => {
let guid = Uuid::new_v4().to_string();
SSE{id: guid, event: "internal_status".to_owned(), data: "polling".to_owned(), retry: Duration::from_millis(5000)}
}
};
Ok((
warp::sse::id(sse.id),
warp::sse::data(sse.data),
warp::sse::event(sse.event),
warp::sse::retry(sse.retry),
))
} else {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "denied".to_owned(), retry: Duration::from_millis(5000)};
Ok((
warp::sse::id(sse.id),
warp::sse::data(sse.data),
warp::sse::event(sse.event),
warp::sse::retry(sse.retry),
))
}
}
pub async fn broker() {
@ -495,36 +506,38 @@ pub async fn broker() {
}
});
let sse_route = warp::path("events").and(warp::get()).map(move || {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let vals: HashMap<String, String> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if !k.contains("_v_") && !k.contains("_u_") {
return true
} else {
return false
let sse_route = warp::path("events")
.and(auth_check)
.and(warp::get()).map(move |jwt: JWT| {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let vals: HashMap<String, String> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if !k.contains("_v_") && !k.contains("_u_") {
return true
} else {
return false
}
}).map(|x| {
let p = x.as_ref().unwrap();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let data : String = serde_json::to_string(&json.data).unwrap();
(json.event, data)
}).collect();
for (k, v) in vals {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: k, data: v, retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
}).map(|x| {
let p = x.as_ref().unwrap();
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
let json : Event = serde_json::from_str(&v).unwrap();
let data : String = serde_json::to_string(&json.data).unwrap();
(json.event, data)
}).collect();
for (k, v) in vals {
let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: k, data: v, retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse);
}
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream()
});
warp::sse::reply(event_stream)
let event_stream = interval(Duration::from_millis(100)).map(move |_| {
event_stream(jwt.check)
});
warp::sse::reply(event_stream)
});
let cancel_route = warp::get()