Missed these in the commit, oops
This commit is contained in:
parent
e79a6505c9
commit
993a6515d2
|
@ -88,14 +88,21 @@ async fn subscribe_client(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option
|
|||
info!("Subscribe received: {:?}", subscribe);
|
||||
|
||||
// If the client sends an invalid header, bail out early
|
||||
if ! req.state.validate_client(&subscribe.header.uuid, &subscribe.header.token) {
|
||||
if !req
|
||||
.state
|
||||
.validate_client(&subscribe.header.uuid, &subscribe.header.token)
|
||||
{
|
||||
warn!("Client could not be validated, perhaps the wrong token?");
|
||||
// TODO: make a useful error
|
||||
return None;
|
||||
}
|
||||
|
||||
info!("Subscribing {} to {}", subscribe.header.uuid, subscribe.channel);
|
||||
req.state.subscribe_client(&subscribe.header.uuid, subscribe.channel);
|
||||
info!(
|
||||
"Subscribing {} to {}",
|
||||
subscribe.header.uuid, subscribe.channel
|
||||
);
|
||||
req.state
|
||||
.subscribe_client(&subscribe.header.uuid, subscribe.channel);
|
||||
|
||||
// TODO: What is the right protocol response for a subscribe?
|
||||
//Some(meows::Message::text("ack"))
|
||||
|
@ -108,14 +115,17 @@ async fn subscribe_client(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option
|
|||
async fn publish_message(mut req: meows::Request<Arc<MemoryBus>, ()>) -> Option<meows::Message> {
|
||||
if let Some(publish) = req.from_value::<message::Publish>() {
|
||||
info!("Publish received: {:?}", publish);
|
||||
if ! req.state.validate_client(&publish.header.uuid, &publish.header.token) {
|
||||
if !req
|
||||
.state
|
||||
.validate_client(&publish.header.uuid, &publish.header.token)
|
||||
{
|
||||
warn!("Client could not be validated, perhaps the wrong token?");
|
||||
// TODO: make a useful error
|
||||
return None;
|
||||
}
|
||||
|
||||
req.state.publish_message(publish.channel, publish.value);
|
||||
return Some(meows::Message::text("ack"))
|
||||
return Some(meows::Message::text("ack"));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
@ -195,7 +205,7 @@ impl MemoryBus {
|
|||
* its internal map
|
||||
*/
|
||||
pub fn validate_client(&self, id: &ClientId, token: &Uuid) -> bool {
|
||||
if ! self.clients.contains_key(id) {
|
||||
if !self.clients.contains_key(id) {
|
||||
warn!("Could not validate client {}, they don't exist!", id);
|
||||
return false;
|
||||
}
|
||||
|
@ -216,8 +226,7 @@ impl MemoryBus {
|
|||
let sink = client.sink.clone();
|
||||
if let Some(mut sinks) = self.channels.get_mut(&channel) {
|
||||
sinks.push(sink);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
let mut sinks = vec![];
|
||||
sinks.push(sink);
|
||||
self.channels.insert(channel, sinks);
|
||||
|
@ -522,6 +531,5 @@ mod tests {
|
|||
|
||||
assert!(bus.validate_client(&id, &token));
|
||||
assert!(!bus.validate_client(&id, &Uuid::new_v4()));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,8 +39,8 @@ async fn register_and_subscribe() -> std::io::Result<()> {
|
|||
.write_message(Message::text(buffer))
|
||||
.expect("Failed to send message to test server");
|
||||
|
||||
let value = serde_json::from_str(r#"{"hello":"world"}"#)
|
||||
.expect("Failed to generate test value");
|
||||
let value =
|
||||
serde_json::from_str(r#"{"hello":"world"}"#).expect("Failed to generate test value");
|
||||
let publish = Publish {
|
||||
header: header.clone(),
|
||||
channel: channel.clone(),
|
||||
|
|
Loading…
Reference in New Issue