Checkpointing some work in progress
This commit is contained in:
parent
1c8b7fecb1
commit
d5cb28ef2e
|
@ -227,6 +227,14 @@ dependencies = [
|
|||
"yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cloudabi"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "1.1.1"
|
||||
|
@ -379,6 +387,7 @@ dependencies = [
|
|||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"meows 0.3.0 (git+https://github.com/rtyler/meows)",
|
||||
"otto-eventbus 0.1.0",
|
||||
"parking_lot 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"pretty_env_logger 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.56 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -581,6 +590,11 @@ dependencies = [
|
|||
"bytes 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "0.4.6"
|
||||
|
@ -638,6 +652,14 @@ name = "linked-hash-map"
|
|||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.8"
|
||||
|
@ -669,7 +691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
[[package]]
|
||||
name = "meows"
|
||||
version = "0.3.0"
|
||||
source = "git+https://github.com/rtyler/meows#0f28c3e4725c6945ab84187507366bde311095b9"
|
||||
source = "git+https://github.com/rtyler/meows#87cfb1d3dabeec3fc9ca3d4f1e24dfe49b3828fa"
|
||||
dependencies = [
|
||||
"async-tungstenite 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -832,6 +854,30 @@ name = "parking"
|
|||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"instant 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lock_api 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parking_lot_core 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"cloudabi 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"instant 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"smallvec 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.1.0"
|
||||
|
@ -1080,6 +1126,11 @@ name = "scoped-tls"
|
|||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "0.4.4"
|
||||
|
@ -1194,6 +1245,11 @@ name = "slab"
|
|||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "smol"
|
||||
version = "0.1.18"
|
||||
|
@ -1583,6 +1639,7 @@ dependencies = [
|
|||
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
|
||||
"checksum chrono 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "f0fee792e164f78f5fe0c296cc2eb3688a2ca2b70cdff33040922d298203f0c4"
|
||||
"checksum clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
|
||||
"checksum cloudabi 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467"
|
||||
"checksum concurrent-queue 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080"
|
||||
"checksum config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "19b076e143e1d9538dde65da30f8481c2a6c44040edb8e02b9bf1351edb92ce3"
|
||||
"checksum const-random 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
|
||||
|
@ -1622,6 +1679,7 @@ dependencies = [
|
|||
"checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
|
||||
"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
|
||||
"checksum input_buffer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754"
|
||||
"checksum instant 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485"
|
||||
"checksum itoa 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
|
||||
"checksum js-sys 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "c4b9172132a62451e56142bff9afc91c8e4a4500aa5b847da36815b63bfda916"
|
||||
"checksum kv-log-macro 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "4ff57d6d215f7ca7eb35a9a64d656ba4d9d2bef114d741dc08048e75e2f5d418"
|
||||
|
@ -1630,6 +1688,7 @@ dependencies = [
|
|||
"checksum libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)" = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49"
|
||||
"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd"
|
||||
"checksum linked-hash-map 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
|
||||
"checksum lock_api 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de302ce1fe7482db13738fbaf2e21cfb06a986b89c0bf38d88abf16681aada4e"
|
||||
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
|
||||
"checksum loom 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4ecc775857611e1df29abba5c41355cdf540e7e9d4acfdf0f355eefee82330b7"
|
||||
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
|
||||
|
@ -1648,6 +1707,8 @@ dependencies = [
|
|||
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
|
||||
"checksum openssl-sys 0.9.58 (registry+https://github.com/rust-lang/crates.io-index)" = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de"
|
||||
"checksum parking 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c4029bc3504a62d92e42f30b9095fdef73b8a0b2a06aa41ce2935143b05a1a06"
|
||||
"checksum parking_lot 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733"
|
||||
"checksum parking_lot_core 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
|
||||
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||
"checksum pin-project 0.4.22 (registry+https://github.com/rust-lang/crates.io-index)" = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17"
|
||||
"checksum pin-project-internal 0.4.22 (registry+https://github.com/rust-lang/crates.io-index)" = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7"
|
||||
|
@ -1679,6 +1740,7 @@ dependencies = [
|
|||
"checksum schannel 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
|
||||
"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28"
|
||||
"checksum scoped-tls 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
|
||||
"checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
"checksum security-framework 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535"
|
||||
"checksum security-framework-sys 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405"
|
||||
"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
|
||||
|
@ -1692,6 +1754,7 @@ dependencies = [
|
|||
"checksum serde_yaml 0.8.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ae3e2dd40a7cdc18ca80db804b7f461a39bb721160a85c9a1fa30134bf3c02a5"
|
||||
"checksum sha-1 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
|
||||
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
|
||||
"checksum smallvec 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4"
|
||||
"checksum smol 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "620cbb3c6e34da57d3a248cda0cd01cd5848164dc062e764e65d06fe3ea7aed5"
|
||||
"checksum socket2 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
|
||||
"checksum static_assertions 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||
|
|
|
@ -15,6 +15,7 @@ pretty_env_logger = "~0.4.0"
|
|||
# WebSocket message exchange support
|
||||
meows = {"git" = "https://github.com/rtyler/meows", "branch" = "master" }
|
||||
otto-eventbus = { path = "../eventbus" }
|
||||
parking_lot = "~0.11.0"
|
||||
serde = { version = "~1.0.111", features = ["rc"] }
|
||||
serde_json = "~1.0.53"
|
||||
smol = "~0.1.12"
|
||||
|
|
|
@ -9,6 +9,7 @@ use log::*;
|
|||
use meows;
|
||||
use otto_eventbus::message;
|
||||
use otto_eventbus::server::*;
|
||||
use parking_lot::Mutex;
|
||||
use serde_json;
|
||||
use smol;
|
||||
use std::future::Future;
|
||||
|
@ -18,13 +19,16 @@ use uuid::Uuid;
|
|||
|
||||
pub type EventbusServer = meows::Server<Arc<MemoryBus>, ()>;
|
||||
|
||||
/**
|
||||
* Create a valid server for use
|
||||
*/
|
||||
pub fn create_server() -> EventbusServer {
|
||||
let eventbus = MemoryBus::default();
|
||||
let mut server = meows::Server::<Arc<MemoryBus>, ()>::with_state(eventbus);
|
||||
|
||||
server.on("register", register_client);
|
||||
server.on("subscribe", subscribe_client);
|
||||
|
||||
server.on("publish", publish_message);
|
||||
server.default(default_handler);
|
||||
server
|
||||
}
|
||||
|
@ -82,13 +86,45 @@ async fn register_client(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option<
|
|||
async fn subscribe_client(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option<meows::Message> {
|
||||
if let Some(subscribe) = req.from_value::<message::Subscribe>() {
|
||||
info!("Subscribe received: {:?}", subscribe);
|
||||
|
||||
// If the client sends an invalid header, bail out early
|
||||
if ! req.state.validate_client(&subscribe.header.uuid, &subscribe.header.token) {
|
||||
warn!("Client could not be validated, perhaps the wrong token?");
|
||||
// TODO: make a useful error
|
||||
return None;
|
||||
}
|
||||
|
||||
info!("Subscribing {} to {}", subscribe.header.uuid, subscribe.channel);
|
||||
req.state.subscribe_client(&subscribe.header.uuid, subscribe.channel);
|
||||
|
||||
// TODO: What is the right protocol response for a subscribe?
|
||||
Some(meows::Message::text("ack"))
|
||||
//Some(meows::Message::text("ack"))
|
||||
None
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish_message(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option<meows::Message> {
|
||||
if let Some(publish) = req.from_value::<message::Publish>() {
|
||||
info!("Publish received: {:?}", publish);
|
||||
if ! req.state.validate_client(&publish.header.uuid, &publish.header.token) {
|
||||
warn!("Client could not be validated, perhaps the wrong token?");
|
||||
// TODO: make a useful error
|
||||
return None;
|
||||
}
|
||||
|
||||
req.state.publish_message(publish.channel, publish.value);
|
||||
return Some(meows::Message::text("ack"))
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility type to help keep track of sinks where messages are dropped in
|
||||
*/
|
||||
type MessageSink = Sender<meows::Message>;
|
||||
|
||||
/**
|
||||
* The ClientId is the agreed upon uuid that the client(s) will use to identify
|
||||
* their registration, subscription, etc requests
|
||||
|
@ -107,7 +143,7 @@ pub struct Client {
|
|||
* The sink allows for writing Message objects back to the client's
|
||||
* connected websocket
|
||||
*/
|
||||
sink: Sender<meows::Message>,
|
||||
sink: MessageSink,
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,15 +154,99 @@ pub struct Client {
|
|||
* This is the most simple and primitive implementation of the Engine trait
|
||||
*/
|
||||
pub struct MemoryBus {
|
||||
/**
|
||||
* Clients is a map of identified client structs
|
||||
*
|
||||
* Each of these clients represents a currently connected client
|
||||
*/
|
||||
clients: Arc<DashMap<ClientId, Client>>,
|
||||
/**
|
||||
* Channels is a map of topic, to [MessageSink] couplings
|
||||
*
|
||||
* Channels allow for pub/sub style messaging by the client
|
||||
*/
|
||||
channels: Arc<DashMap<Topic, Vec<MessageSink>>>,
|
||||
/**
|
||||
* Topics are the commit logs which exist for each named topic
|
||||
*
|
||||
* This allows for retrieval of messages by specific offsets
|
||||
*/
|
||||
topics: Arc<DashMap<Topic, Vec<Message>>>,
|
||||
/**
|
||||
* Offsets tracks the caller's offsets for each given topic
|
||||
*
|
||||
* Offsets do not work or make sense for clients who wish to consume pub-sub
|
||||
* style at this point
|
||||
*/
|
||||
offsets: Arc<DashMap<(Topic, CallerId), Offset>>,
|
||||
}
|
||||
|
||||
impl MemoryBus {
|
||||
/**
|
||||
* Add the client to the internal tracking map, this is assuming that the
|
||||
* client's request has already been verified
|
||||
*/
|
||||
pub fn add_client(&self, id: ClientId, client: Client) {
|
||||
self.clients.insert(id, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that the provided token matches the client the MemoryBus has in
|
||||
* its internal map
|
||||
*/
|
||||
pub fn validate_client(&self, id: &ClientId, token: &Uuid) -> bool {
|
||||
if ! self.clients.contains_key(id) {
|
||||
warn!("Could not validate client {}, they don't exist!", id);
|
||||
return false;
|
||||
}
|
||||
if let Some(client) = self.clients.get(id) {
|
||||
return client.token == *token;
|
||||
}
|
||||
error!("Could not discover the client in order to validate it, this shouldn't happen");
|
||||
false
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the given client's sink to internal channels for receiving messages
|
||||
* in a pub/sub manner
|
||||
*/
|
||||
pub fn subscribe_client(&self, id: &ClientId, channel: String) -> Result<(), ()> {
|
||||
if let Some(client) = self.clients.get(id) {
|
||||
let sink = Mutex::new(client.sink.clone());
|
||||
let sink = client.sink.clone();
|
||||
if let Some(mut sinks) = self.channels.get_mut(&channel) {
|
||||
sinks.push(sink);
|
||||
}
|
||||
else {
|
||||
let mut sinks = vec![];
|
||||
sinks.push(sink);
|
||||
self.channels.insert(channel, sinks);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn publish_message(&self, channel: String, value: serde_json::Value) {
|
||||
info!("publish_message({}, {})", channel, value);
|
||||
|
||||
if let Some(mut sinks) = self.channels.get_mut(&channel) {
|
||||
// TODO: make a real error
|
||||
let serialized = value.to_string();
|
||||
for sink in sinks.iter_mut() {
|
||||
let msg = meows::Message::text(serialized.clone());
|
||||
/*
|
||||
* TODO: Need to figure out how to wrap this safely in an async
|
||||
* task or wrap self.channels' keys in Arc<Vec<MessageSink>> rather
|
||||
* than just Vec<MessageSink>
|
||||
*
|
||||
* Another stupid idea might be to put the values of the channels
|
||||
* map into their own map Either way, need to safely iterate thorugh the sinks in
|
||||
* order to send all these messages out
|
||||
*/
|
||||
sink.send(msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Engine for MemoryBus {
|
||||
|
@ -223,7 +343,7 @@ impl Engine for MemoryBus {
|
|||
self: Arc<Self>,
|
||||
topic: Topic,
|
||||
message: Message,
|
||||
caller: CallerId,
|
||||
_caller: CallerId,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Offset, ()>> + Send>> {
|
||||
let topics = self.topics.clone();
|
||||
|
||||
|
@ -250,6 +370,7 @@ impl Engine for MemoryBus {
|
|||
Self: Sized,
|
||||
{
|
||||
Arc::new(Self {
|
||||
channels: Arc::new(DashMap::default()),
|
||||
clients: Arc::new(DashMap::default()),
|
||||
topics: Arc::new(DashMap::default()),
|
||||
offsets: Arc::new(DashMap::default()),
|
||||
|
@ -261,6 +382,7 @@ impl Engine for MemoryBus {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use futures::channel::mpsc::channel;
|
||||
use smol;
|
||||
|
||||
fn test_topic() -> Topic {
|
||||
|
@ -383,4 +505,23 @@ mod tests {
|
|||
let result = bus.at(test_topic(), 0, test_caller()).await;
|
||||
assert!(result.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_client() {
|
||||
let bus = MemoryBus::default();
|
||||
// create a test sink, won't ever be used
|
||||
let (sender, _) = channel(1);
|
||||
let token = Uuid::new_v4();
|
||||
let client = Client {
|
||||
token,
|
||||
sink: sender,
|
||||
};
|
||||
let id = Uuid::new_v4();
|
||||
|
||||
bus.add_client(id, client);
|
||||
|
||||
assert!(bus.validate_client(&id, &token));
|
||||
assert!(!bus.validate_client(&id, &Uuid::new_v4()));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
use log::*;
|
||||
use otto_eventbus::message::*;
|
||||
use tungstenite::Message;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod common;
|
||||
|
@ -15,7 +17,46 @@ async fn register_and_subscribe() -> std::io::Result<()> {
|
|||
if let Ok((mut socket, _response)) = ws_connect() {
|
||||
info!("Connected to the server");
|
||||
let client = Uuid::new_v4();
|
||||
let _auth = register_client(client, &mut socket);
|
||||
let auth = register_client(client, &mut socket).expect("Failed to get auth token");
|
||||
|
||||
let header = ClientHeader {
|
||||
uuid: client,
|
||||
token: auth,
|
||||
};
|
||||
let channel = "/internal/echo".to_string();
|
||||
|
||||
/*
|
||||
* Subscribing to the internal echo channel to ensure that we get whatever
|
||||
* messages we had sent back
|
||||
*/
|
||||
let subscribe = Subscribe {
|
||||
channel: channel.clone(),
|
||||
header: header.clone(),
|
||||
};
|
||||
let buffer = wrap_in_envelope("subscribe".to_string(), &subscribe);
|
||||
|
||||
socket
|
||||
.write_message(Message::text(buffer))
|
||||
.expect("Failed to send message to test server");
|
||||
|
||||
let value = serde_json::from_str(r#"{"hello":"world"}"#)
|
||||
.expect("Failed to generate test value");
|
||||
let publish = Publish {
|
||||
header: header.clone(),
|
||||
channel: channel.clone(),
|
||||
value: value,
|
||||
};
|
||||
let buffer = wrap_in_envelope("publish".to_string(), &publish);
|
||||
info!("Sending publish buffer: {}", buffer);
|
||||
|
||||
socket
|
||||
.write_message(Message::text(buffer))
|
||||
.expect("Failed to send message to test server");
|
||||
|
||||
if let Ok(m) = socket.read_message() {
|
||||
info!("Read from server: {:?}", m);
|
||||
}
|
||||
|
||||
socket
|
||||
.close(None)
|
||||
.expect("Failed to cleanly close connection");
|
||||
|
|
|
@ -29,13 +29,19 @@ pub mod message {
|
|||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct Subscribe {
|
||||
client: ClientHeader,
|
||||
channel: String,
|
||||
pub header: ClientHeader,
|
||||
pub channel: String,
|
||||
}
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct Publish {
|
||||
pub header: ClientHeader,
|
||||
pub channel: String,
|
||||
pub value: serde_json::Value,
|
||||
}
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ClientHeader {
|
||||
uuid: String,
|
||||
token: String,
|
||||
pub uuid: uuid::Uuid,
|
||||
pub token: uuid::Uuid,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -180,6 +180,28 @@ The store operation will persist some specified pattern of files into the Otto o
|
|||
{
|
||||
"op_id" : "uuid",
|
||||
"type" : "STORE",
|
||||
"data" : {
|
||||
"pattern" : "build/*.tar.gz",
|
||||
"permanent" : true,
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
Properties:
|
||||
|
||||
* **pattern**:
|
||||
* **permanent**: (_optional_) mark the files to persist past the termination of the CI/CD
|
||||
process
|
||||
|
||||
==== `FETCH`
|
||||
|
||||
The fetch operation will retrieve persisted files from the Otto object store.
|
||||
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"op_id" : "uuid",
|
||||
"type" : "FETCH",
|
||||
"data" : {
|
||||
"pattern" : "build/*.tar.gz",
|
||||
"permanent" : true
|
||||
|
|
|
@ -0,0 +1,245 @@
|
|||
= RFC-0010: Eventbus design and implementation
|
||||
:toc: preamble
|
||||
:toclevels: 3
|
||||
ifdef::env-github[]
|
||||
:tip-caption: :bulb:
|
||||
:note-caption: :information_source:
|
||||
:important-caption: :heavy_exclamation_mark:
|
||||
:caution-caption: :fire:
|
||||
:warning-caption: :warning:
|
||||
endif::[]
|
||||
|
||||
.**RFC Template**
|
||||
|
||||
.Metadata
|
||||
[cols="1h,1"]
|
||||
|===
|
||||
| RFC
|
||||
| 0010
|
||||
|
||||
| Title
|
||||
| Eventbus design and implementation
|
||||
|
||||
| Sponsor
|
||||
| https://github.com/rtyluer[R Tyler Croy]
|
||||
|
||||
| Status
|
||||
| Not Submitted :information_source:
|
||||
|
||||
| Type
|
||||
| Standard
|
||||
|
||||
| Created
|
||||
| 2020-06-20
|
||||
|
||||
|===
|
||||
|
||||
== Abstract
|
||||
|
||||
THe eventbus is the core of the Otto ecosystem, which provides stateless
|
||||
link:https://en.wikipedia.org/wiki/Publish-subscribe_pattern[pubsub]
|
||||
channels and stateful queues over
|
||||
link:https://en.wikipedia.org/wiki/WebSocket[WebSocket]
|
||||
connections.
|
||||
|
||||
Other services in the Otto ecosystem are expected to register for events by
|
||||
subscribing to different channels. Events from these channels will be received
|
||||
over a single WebSocket connection.
|
||||
|
||||
== Specification
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Provide a detailed specification what is being proposed. Be as technical and
|
||||
detailed as needed to allow new or existing developers to reasonably understand
|
||||
the scope/impact of an implementation.
|
||||
|
||||
* Use present tense - describe what the proposal "does" (as if it were already done) not what it will do.
|
||||
* Do not discuss alternative designs that were rejected, those belong in the Reasoning section.
|
||||
* Avoid in-depth discussion or justification of design choices, that belongs in the Reasoning section.
|
||||
====
|
||||
|
||||
|
||||
=== Events
|
||||
|
||||
==== Register
|
||||
|
||||
The register message is required in order to identify the client with the eventbus for
|
||||
all future communications. The `register` event will also ensure that an "inbox" is
|
||||
created for the sending client, allowing it to receive unicasted messages.
|
||||
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"type" : "register",
|
||||
"value" : {
|
||||
"uuid" : "some-uuid-string", // <1>
|
||||
"token" : "optional-auth-token" // <2>
|
||||
}
|
||||
}
|
||||
----
|
||||
<1> The UUID should be the client's own identifier which is guaranteed to be identical between restarts
|
||||
<2> The `token` is optional for first-time clients, but required for reconnecting clients to resume their previous identifier
|
||||
|
||||
===== Responses
|
||||
|
||||
.Success
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"type" : "registered",
|
||||
"value" : {
|
||||
"token" : "client's auth token" // <1>
|
||||
}
|
||||
}
|
||||
----
|
||||
<1> The client's secret authentication token to use in the future
|
||||
|
||||
Errors:
|
||||
|
||||
* Invalid auth token for identifier
|
||||
* Client already registered
|
||||
|
||||
|
||||
|
||||
==== Subscribe
|
||||
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"type" : "subscribe",
|
||||
"value" : {
|
||||
"client" : {
|
||||
"uuid" : "client-uuid",
|
||||
"token" : "auth-token"
|
||||
},
|
||||
"channel" : "/channel/identifier"
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
===== Responses
|
||||
|
||||
.Success
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"type" : "subscribed",
|
||||
"value" : {
|
||||
"client" : "client-uuid",
|
||||
"channel": "/channel/identifier"
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
.Error
|
||||
[source,json]
|
||||
----
|
||||
{
|
||||
"type" : "error",
|
||||
"value" : {
|
||||
"code" : 10, // <1>
|
||||
"msg" : "errors.subscription_failed" // <2>
|
||||
}
|
||||
}
|
||||
----
|
||||
<1> A predefined error code matching the message
|
||||
<2> A string denoting which type of error message should be shown to the user
|
||||
|
||||
|
||||
== Motivation
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Explain why the existing code base or process is inadequate to address the problem that the RFC solves.
|
||||
This section may also contain any historal context such as how things were done before this proposal.
|
||||
|
||||
* Do not discuss design choices or alternative designs that were rejected, those belong in the Reasoning section.
|
||||
====
|
||||
|
||||
== Reasoning
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Explain why particular design decisions were made.
|
||||
Describe alternate designs that were considered and related work, e.g. how the feature is supported in other systems.
|
||||
Provide evidence of consensus within the community and discuss important objections or concerns raised during discussion.
|
||||
|
||||
* Use sub-headings to organize this section for ease of readability.
|
||||
* Do not talk about history or why this needs to be done, that is part of Motivation section.
|
||||
====
|
||||
|
||||
== Backwards Compatibility
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Describe any incompatibilities and their severity.
|
||||
Describe how the RFC proposes to deal with these incompatibilities.
|
||||
|
||||
If there are no backwards compatibility concerns, this section may simply say:
|
||||
There are no backwards compatibility concerns related to this proposal.
|
||||
====
|
||||
|
||||
== Security
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Describe the security impact of this proposal.
|
||||
Outline what was done to identify and evaluate security issues,
|
||||
discuss of potential security issues and how they are mitigated or prevented,
|
||||
and how the RFC interacts with existing permissions, authentication, authorization, etc.
|
||||
|
||||
If this proposal will have no impact on security, this section may simply say:
|
||||
There are no security risks related to this proposal.
|
||||
====
|
||||
|
||||
|
||||
== Testing
|
||||
|
||||
[TIP]
|
||||
====
|
||||
If the RFC involves any kind of behavioral change to code give a summary of how
|
||||
its correctness (and, if applicable, compatibility, security, etc.) can be
|
||||
tested.
|
||||
|
||||
In the preferred case that automated tests can be developed to cover all
|
||||
significant changes, simply give a short summary of the nature of these tests.
|
||||
|
||||
If some or all of changes will require human interaction to verify, explain why
|
||||
automated tests are considered impractical. Then summarize what kinds of test
|
||||
cases might be required: user scenarios with action steps and expected
|
||||
outcomes. Might behavior vary by platform (operating system, servlet
|
||||
container, web browser, etc.)? Are there foreseeable interactions between
|
||||
different permissible versions of components?
|
||||
Are any special tools, proprietary software, or online service accounts
|
||||
required to exercise a related code path (Active Directory server, GitHub
|
||||
login, etc.)? When will testing take place relative to merging code changes,
|
||||
and might retesting be required if other changes are made to this area in the
|
||||
future?
|
||||
|
||||
If this proposal requires no testing, this section may simply say:
|
||||
There are no testing issues related to this proposal.
|
||||
====
|
||||
|
||||
== Prototype Implementation
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Link to any open source reference implementation of code changes for this proposal.
|
||||
The implementation need not be completed before the RFC is accepted
|
||||
but must be completed before the RFC is given "final" status.
|
||||
|
||||
RFCs which will not include code changes may omit this section.
|
||||
====
|
||||
|
||||
== References
|
||||
|
||||
[TIP]
|
||||
====
|
||||
Provide links to any related documents. This will include links to discussions
|
||||
on the mailing list, pull requests, and meeting notes.
|
||||
====
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue