back to tokio

This commit is contained in:
Bevan Hunt 2020-01-23 10:37:13 -08:00
parent 4bc57312a2
commit ca3e4cc477
3 changed files with 53 additions and 69 deletions

69
Cargo.lock generated
View File

@ -110,8 +110,8 @@ name = "broker"
version = "1.4.6" version = "1.4.6"
dependencies = [ dependencies = [
"bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "bcrypt 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"broker-tokio 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "go-flag 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -129,6 +129,29 @@ dependencies = [
"uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "broker-tokio"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "buf_redux" name = "buf_redux"
version = "0.8.4" version = "0.8.4"
@ -226,36 +249,6 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.8.0" version = "0.8.0"
@ -269,15 +262,6 @@ dependencies = [
"scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam-queue"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.7.0" version = "0.7.0"
@ -1946,6 +1930,7 @@ dependencies = [
"checksum block-cipher-trait 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774" "checksum block-cipher-trait 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774"
"checksum block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" "checksum block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
"checksum blowfish 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6aeb80d00f2688459b8542068abd974cfb101e7a82182414a99b5026c0d85cc3" "checksum blowfish 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6aeb80d00f2688459b8542068abd974cfb101e7a82182414a99b5026c0d85cc3"
"checksum broker-tokio 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "22b2532e9c77989f70f68c3177a5467b7f75e9de805b5b8778e3f2cd60d23233"
"checksum buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" "checksum buf_redux 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
"checksum bumpalo 3.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb8038c1ddc0a5f73787b130f4cc75151e96ed33e417fde765eb5a81e3532f4" "checksum bumpalo 3.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb8038c1ddc0a5f73787b130f4cc75151e96ed33e417fde765eb5a81e3532f4"
"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" "checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
@ -1960,11 +1945,7 @@ dependencies = [
"checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d" "checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d"
"checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
"checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c"
"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac" "checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" "checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" "checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e"

View File

@ -13,6 +13,7 @@ readme = "README.md"
[dependencies] [dependencies]
portal = "0.2" portal = "0.2"
futures = "0.3" futures = "0.3"
broker-tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"
@ -27,5 +28,4 @@ jsonwebtoken = "6"
reqwest = { version = "0.10", features = ["json", "blocking"] } reqwest = { version = "0.10", features = ["json", "blocking"] }
go-flag = "0.1" go-flag = "0.1"
envy = "0.4" envy = "0.4"
crossbeam = "0.7.3"
lazy_static = "1.4" lazy_static = "1.4"

View File

@ -1,3 +1,5 @@
use broker_tokio::stream::StreamExt;
use std::iter::Iterator;
use std::collections::HashMap; use std::collections::HashMap;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
@ -6,15 +8,14 @@ use bcrypt::{DEFAULT_COST, hash, verify};
use chrono::prelude::*; use chrono::prelude::*;
use portal::{Filter, http::StatusCode, sse::ServerSentEvent}; use portal::{Filter, http::StatusCode, sse::ServerSentEvent};
use jsonwebtoken::{encode, decode, Header, Validation}; use jsonwebtoken::{encode, decode, Header, Validation};
use crossbeam::channel::unbounded; use broker_tokio::sync::{mpsc};
use std::convert::Infallible; use std::convert::Infallible;
use std::time::Duration; use std::time::Duration;
use std::sync::{Mutex, Arc};
use lazy_static::lazy_static; use lazy_static::lazy_static;
lazy_static! { lazy_static! {
static ref CHANNEL: HashMap<String, (crossbeam::channel::Sender<SSE>, crossbeam::channel::Receiver<SSE>)> = { static ref CHANNEL: HashMap<String, (mpsc::UnboundedSender<SSE>, mpsc::UnboundedReceiver<SSE>)> = {
let (tx, rx) = unbounded(); let (tx, rx) = mpsc::unbounded_channel();
let guid = Uuid::new_v4().to_string(); let guid = Uuid::new_v4().to_string();
let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "connected".to_owned(), retry: Duration::from_millis(5000)}; let sse = SSE{id: guid, event: "internal_status".to_owned(), data: "connected".to_owned(), retry: Duration::from_millis(5000)};
let _ = tx.send(sse); let _ = tx.send(sse);
@ -22,8 +23,19 @@ lazy_static! {
m.insert("chan".to_owned(), (tx, rx)); m.insert("chan".to_owned(), (tx, rx));
m m
}; };
static ref TREE: HashMap<String, sled::Db> = {
let configure = config();
let tree = sled::open(configure.save_path).unwrap();
let mut m = HashMap::new();
m.insert("tree".to_owned(), tree);
m
};
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SSE { pub struct SSE {
pub event: String, pub event: String,
@ -244,16 +256,11 @@ fn event_stream(sse: SSE) -> Result<impl ServerSentEvent, Infallible> {
async fn main() { async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
let configure = config();
let tree = sled::open(configure.save_path).unwrap();
let tree_clone1 = tree.clone();
let tree_clone2 = tree.clone();
let tree_clone3 = Arc::new(Mutex::new(tree.clone()));
let user_create_route = portal::post() let user_create_route = portal::post()
.and(portal::path("users")) .and(portal::path("users"))
.and(portal::body::json()) .and(portal::body::json())
.map(move |user: UserForm| { .map(move |user: UserForm| {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let (check, value) = user_create(tree.clone(), user.clone()); let (check, value) = user_create(tree.clone(), user.clone());
if check { if check {
let reply = portal::reply::with_status(value, StatusCode::OK); let reply = portal::reply::with_status(value, StatusCode::OK);
@ -274,7 +281,8 @@ async fn main() {
.and(portal::body::json()) .and(portal::body::json())
.map(move |login_form: Login| { .map(move |login_form: Login| {
let configure = config(); let configure = config();
let (check, value) = login(tree_clone1.clone(), login_form.clone(), configure.clone()); let tree = TREE.get(&"tree".to_owned()).unwrap();
let (check, value) = login(tree.clone(), login_form.clone(), configure.clone());
if check { if check {
let reply = portal::reply::with_status(value, StatusCode::OK); let reply = portal::reply::with_status(value, StatusCode::OK);
portal::reply::with_header(reply, "Content-Type", "application/json") portal::reply::with_header(reply, "Content-Type", "application/json")
@ -289,7 +297,8 @@ async fn main() {
.and(auth_check) .and(auth_check)
.and(portal::body::json()) .and(portal::body::json())
.map(move |jwt: JWT, event_form: EventForm| { .map(move |jwt: JWT, event_form: EventForm| {
let record = insert(tree_clone2.clone(), jwt.claims.sub, event_form); let tree = TREE.get(&"tree".to_owned()).unwrap();
let record = insert(tree.clone(), jwt.claims.sub, event_form);
if jwt.check { if jwt.check {
let reply = portal::reply::with_status(record, StatusCode::OK); let reply = portal::reply::with_status(record, StatusCode::OK);
portal::reply::with_header(reply, "Content-Type", "application/json") portal::reply::with_header(reply, "Content-Type", "application/json")
@ -299,11 +308,10 @@ async fn main() {
} }
}); });
let tree_clone = tree_clone3.lock().unwrap().clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
loop { loop {
let vals : HashMap<String, Event> = tree_clone.iter().into_iter().filter(|x| { let tree = TREE.get(&"tree".to_owned()).unwrap();
let vals : HashMap<String, Event> = tree.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap(); let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned(); let k = std::str::from_utf8(&p.0).unwrap().to_owned();
let v = std::str::from_utf8(&p.1).unwrap().to_owned(); let v = std::str::from_utf8(&p.1).unwrap().to_owned();
@ -338,7 +346,7 @@ async fn main() {
let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)}; let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap(); let (tx, _) = CHANNEL.get(&"chan".to_owned()).unwrap();
let _ = tx.send(sse).unwrap(); let _ = tx.send(sse).unwrap();
let tree_cloned = tree_clone.clone(); let tree_cloned = tree.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b"")); let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap(); let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
@ -352,11 +360,8 @@ async fn main() {
}); });
let sse = portal::path("events").and(portal::get()).map(move || { let sse = portal::path("events").and(portal::get()).map(move || {
let tree = TREE.get(&"tree".to_owned()).unwrap();
let tree_clone = tree_clone3.lock().unwrap().clone(); let vals: HashMap<String, String> = tree.iter().into_iter().filter(|x| {
let tree_clone2 = tree_clone.clone();
let vals: HashMap<String, String> = tree_clone2.iter().into_iter().filter(|x| {
let p = x.as_ref().unwrap(); let p = x.as_ref().unwrap();
let k = std::str::from_utf8(&p.0).unwrap().to_owned(); let k = std::str::from_utf8(&p.0).unwrap().to_owned();
if !k.contains("_v_") && !k.contains("_u_") { if !k.contains("_v_") && !k.contains("_u_") {
@ -380,12 +385,10 @@ async fn main() {
} }
let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap(); let (_, rx) = CHANNEL.get(&"chan".to_owned()).unwrap();
let messages = rx.iter().map(|sse| { let events = rx.clone().map(|sse| {
event_stream(sse) event_stream(sse)
}); });
let events = futures::stream::iter(messages);
portal::sse::reply(portal::sse::keep_alive().interval(Duration::from_secs(5)).text("ping".to_string()).stream(events)) portal::sse::reply(portal::sse::keep_alive().interval(Duration::from_secs(5)).text("ping".to_string()).stream(events))
}); });