From b73686dfcbcb6ec2ab72463548bd6558ab8fcf8e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 20 Jul 2020 14:51:05 -0700 Subject: [PATCH] Properly use the configured schema when routing messages This removes the hard-coded hack that I had added previously for demo purposes, and now enables slipstream to use schemas in the directory or the message. NOTE: nested schemas in the schemas.d are still not yet supported properly. --- schemas.d/hello.yml | 6 +-- schemas.d/other.yml | 39 +++++++++++++++ slipstream.yml | 5 ++ src/main.rs | 117 +++++++++++++++++++++++++++++++++++++++----- 4 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 schemas.d/other.yml diff --git a/schemas.d/hello.yml b/schemas.d/hello.yml index 2c0d85b..0a8a93b 100644 --- a/schemas.d/hello.yml +++ b/schemas.d/hello.yml @@ -2,7 +2,7 @@ # # It is only ever expected to validate messages like: # { -# "$schema" : "schema.yml", +# "$id" : "schema.yml", # "hello" : "world", # "metadata" : { # } @@ -15,10 +15,10 @@ description: | type: object required: - - $schema + - $id - hello properties: - $schema: + $id: type: string description: | The $schema field contains a relative path to the schema that describes diff --git a/schemas.d/other.yml b/schemas.d/other.yml new file mode 100644 index 0000000..0a8a93b --- /dev/null +++ b/schemas.d/other.yml @@ -0,0 +1,39 @@ +# This file describes a JSON Schema for testing messages +# +# It is only ever expected to validate messages like: +# { +# "$id" : "schema.yml", +# "hello" : "world", +# "metadata" : { +# } +# } +--- +title: A slipstream test schema +description: | + This schema will validate only simple hello world messages for testing + the slipstream validation engine + +type: object +required: + - $id + - hello +properties: + $id: + type: string + description: | + The $schema field contains a relative path to the schema that describes + this message. In this test it is intended to be located within the configured + schemas directory + + hello: + type: string + description: | + A name of to whom this message should say hello + + metadata: + type: object + description: | + User-defined additional metadata. + + Note that the contents of this object are not subject to the schema + validation diff --git a/slipstream.yml b/slipstream.yml index 2ac9b63..328de5f 100644 --- a/slipstream.yml +++ b/slipstream.yml @@ -57,3 +57,8 @@ topics: # result in messages not being routed under certain conditions invalid: 'invalid-pings' + - name: 'other' + schema: + path: 'other.yml' + routing: + valid: '$name.valid' diff --git a/src/main.rs b/src/main.rs index 827b507..6dd4953 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,6 +48,31 @@ struct TopicContext { schemas: Arc>, } +impl TopicContext { + /** + * Return the schema to use for validating this topic + */ + pub fn schema_to_use(&self, message: &serde_json::Value) -> Option<&serde_json::Value> { + match &self.topic.schema { + settings::Schema::KeyType { key } => { + // Fish out the right sub-value for the key + if let Some(schema) = message.get(key) { + // Use the string value assuming we can get it + if let Some(schema) = schema.as_str() { + return self.schemas.get(schema); + } + } + }, + settings::Schema::PathType { path } => { + if let Some(path_str) = path.as_path().to_str() { + return self.schemas.get(path_str); + } + }, + } + None + } +} + #[async_std::main] async fn main() { @@ -109,7 +134,7 @@ async fn main() { * and then send messages along to the sender channel */ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender: Sender) -> Result<(), std::io::Error> { - let topic = ctx.topic; + let topic = &ctx.topic; consumer.subscribe(&[&topic.name]) .expect("Could not subscribe consumer"); @@ -140,15 +165,15 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender: let value: serde_json::Value = serde_json::from_str(payload) .expect("Failed to deserialize message payload"); - // TODO: properly handle the different types of schema definitions - // from the configuration file - let schema = value.get("$schema") - .expect("Message had no $schema"); - // TODO: make this safer - let schema = ctx.schemas.get(schema.as_str().unwrap()) - .expect("Unknown schema defined"); - trace!("Compiling schema: {}", schema); + let schema = ctx.schema_to_use(&value); + if schema.is_none() { + error!("Could not load a schema, skipping message on {}", topic.name); + continue; + } + let schema = schema.unwrap(); + + trace!("Compiling schema: {}", schema); // TODO: Make this compilation checking and whatnot happen outside // of the message loop let compiled = JSONSchema::compile(&schema, Some(Draft::Draft7)) @@ -194,6 +219,11 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender: Ok(()) } +/** + * Load all the .yml files which appear to be schemas in the given directory + * + * NOTE: This does not yet recurse through the directories + */ fn load_schemas_from(directory: std::path::PathBuf) -> Result { let mut schemas = HashMap::::new(); let schemas_d = fs::read_dir(&directory) @@ -229,6 +259,16 @@ fn load_schemas_from(directory: std::path::PathBuf) -> Result #[cfg(test)] mod tests { use super::*; + use serde_json::json; + + fn schemas_for_test() -> NamedSchemas { + load_schemas_from(std::path::PathBuf::from("./schemas.d")) + .expect("Failed to load schemas for test") + } + + fn settings_for_test() -> settings::Settings { + settings::load_settings() + } /** * This test is pretty primitive, and is coupled to slipstream.yml in the @@ -236,9 +276,60 @@ mod tests { */ #[test] fn test_load_schemas_from() { - let schemas = load_schemas_from(std::path::PathBuf::from("./schemas.d")); - assert!(schemas.is_ok()); - let schemas = schemas.unwrap(); - assert_eq!(schemas.keys().len(), 1); + let schemas = schemas_for_test(); + assert_eq!(schemas.keys().len(), 2); + } + + /** + * Validating that we can get the right schema to validat the topic + * by querying the topic itself + * + * This is complex enough that it's bordering on an integration test, eep! + */ + #[test] + fn test_topics_schema() { + let settings = Arc::new(settings_for_test()); + let schemas = Arc::new(schemas_for_test()); + + let ctx = TopicContext { + schemas: schemas.clone(), + topic: settings.topics[0].clone(), + settings, + }; + + let message = json!({"$id" : "hello.yml"}); + let expected_schema = schemas.get("hello.yml") + .expect("Failed to load hello.yml named schema"); + + let schema = ctx.schema_to_use(&message); + + assert!(schema.is_some()); + assert_eq!(expected_schema, schema.unwrap()); + } + + /** + * Validate that a path-based schema reference for a topic can be found + */ + #[test] + fn test_topics_schema_with_path() { + let settings = Arc::new(settings_for_test()); + let schemas = Arc::new(schemas_for_test()); + + let topics: Vec = settings.topics.clone().into_iter().filter(|t| t.name == "other").collect(); + + let ctx = TopicContext { + schemas: schemas.clone(), + topic: topics[0].clone(), + settings, + }; + + let message = json!({}); + let expected_schema = schemas.get("other.yml") + .expect("Failed to load other.yml named schema"); + + let schema = ctx.schema_to_use(&message); + + assert!(schema.is_some()); + assert_eq!(expected_schema, schema.unwrap()); } }