Allow the message to reference a schema defined in schemas.d/

This commit is contained in:
R Tyler Croy 2020-07-16 12:41:42 -07:00
parent 248e17e9fa
commit 0911b3e31d
5 changed files with 140 additions and 41 deletions

View File

@ -7,7 +7,7 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test:3:1,test.valid:1:1,test.invalid:1:1,test.error:1:1"
KAFKA_CREATE_TOPICS: "test:3:1,test-valid:1:1,test-invalid:1:1,test-error:1:1,tests-v1:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
zookeeper:

26
schemas.d/pings/v1.yml Normal file
View File

@ -0,0 +1,26 @@
#
# This file describes a simple JSON Schema for a "ping" message
#
# {
# "ping" : 1,
# "valid" : true
# }
#
---
title: A slipstream test schema for pings
description: |
This schema validates a "ping" message, and is used mostly to help validate schemas not defined in-message but rather those defined externally.
type: object
required:
- ping
properties:
ping:
type: number
minimum: 0
maximum: 10
description: Ping a number between 0-10
valid:
type: boolean
description: Tell us if this is a valid ping

View File

@ -22,7 +22,7 @@ topics:
# This will *only* be valid for a root key. This is to avoid the
# need to do extra parsing and searching within each message.
schema:
key: '$schema'
key: '$id'
# THe routing configuration can be used to route the messages after
# validation to additional Kafka topics for consumption.
#
@ -37,6 +37,16 @@ topics:
# Could not be deserialized as JSON
error: '$name.error'
# Slipstream topic mappings can be chained to allow for secondary schema
# validation, i.e. "this message wasn't valid for v2 of the spec, what about
# v1?"
#
- name: 'test.invalid'
schema:
path: 'some-legacy-schema.yml'
routing:
valid: 'tests-v1'
- name: 'pings'
# When using the `path` key for the schema, all messages in the
# topic will be validated against the specific schema definition
@ -45,5 +55,5 @@ topics:
routing:
# The keys under `routing` are optional, omitted entries will
# result in messages not being routed under certain conditions
invalid: 'valid-pings'
invalid: 'invalid-pings'

View File

@ -20,47 +20,43 @@ use std::fs;
mod settings;
#[derive(Debug)]
/**
* A struct to carry a message over a channel to be published into Kafka
*/
#[derive(Clone, Debug)]
struct DispatchMessage {
// Topic the message should be written to
destination: String,
payload: String,
}
/**
* Collection of named parsed JSON Schemas
*
* Note: these schemas have not yet been compiled!
*/
type NamedSchemas = HashMap<String, serde_json::Value>;
/**
* TopicContext will carry the necessary context into a topic consumer to enable
* it to properly parse and route messages
*/
#[derive(Clone, Debug)]
struct TopicContext {
topic: settings::Topic,
settings: Arc<settings::Settings>,
schemas: Arc<HashMap<String, serde_json::Value>>,
}
#[async_std::main]
async fn main() {
pretty_env_logger::init();
let settings = Arc::new(settings::load_settings());
// Load schemas from directory
let mut schemas = HashMap::<String, serde_json::Value>::new();
let schemas_d = fs::read_dir(settings.schemas.clone())
.expect(&format!("Failed to read directory: {:?}", settings.schemas));
for file in schemas_d {
if let Ok(file) = file {
let file = file.path();
match file.extension() {
Some(ext) => {
if ext == "yml" {
info!("Loading schema: {:?}", file);
let buf = fs::read_to_string(&file)
.expect(&format!("Failed to read {:?}", file));
let file_name = file.file_name().expect("Failed to unpack file_name()");
let value: serde_json::Value = serde_yaml::from_str(&buf)
.expect(&format!("Failed to parse {:?}", file));
let key = file_name.to_str().unwrap().to_string();
debug!("Inserting schema for key: {}", key);
// This is gross, gotta be a better way to structure this
schemas.insert(key, value);
}
},
_ => {},
}
}
}
let schemas = load_schemas_from(settings.schemas.clone())
.expect("Failed to load schemas.d");
// XXX: fix this magic number and make the channel size configurable
let (sender, mut receiver) = channel::<DispatchMessage>(1024);
@ -83,8 +79,14 @@ async fn main() {
.create()
.expect("Consumer creation failed");
let ctx = TopicContext {
topic: topic.clone(),
settings: settings.clone(),
schemas: schemas.clone(),
};
// Launch a consumer task for each topic
task::spawn(consume_topic(consumer, topic.clone(), schemas.clone(), sender.clone()));
task::spawn(consume_topic(consumer, ctx, sender.clone()));
}
// Need to block the main task here with something so the topic consumers don't die
@ -102,12 +104,12 @@ async fn main() {
});
}
/**
* This function starts a runloop which will consume from the given topic
* and then send messages along to the sender channel
*/
async fn consume_topic(consumer: StreamConsumer, topic: settings::Topic, schemas: Arc<HashMap<String, serde_json::Value>>, mut sender: Sender<DispatchMessage>) -> Result<(), std::io::Error> {
async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender: Sender<DispatchMessage>) -> Result<(), std::io::Error> {
let topic = ctx.topic;
consumer.subscribe(&[&topic.name])
.expect("Could not subscribe consumer");
@ -143,7 +145,7 @@ async fn consume_topic(consumer: StreamConsumer, topic: settings::Topic, schemas
let schema = value.get("$schema")
.expect("Message had no $schema");
// TODO: make this safer
let schema = schemas.get(schema.as_str().unwrap())
let schema = ctx.schemas.get(schema.as_str().unwrap())
.expect("Unknown schema defined");
trace!("Compiling schema: {}", schema);
@ -191,3 +193,52 @@ async fn consume_topic(consumer: StreamConsumer, topic: settings::Topic, schemas
Ok(())
}
fn load_schemas_from(directory: std::path::PathBuf) -> Result<NamedSchemas, ()> {
let mut schemas = HashMap::<String, serde_json::Value>::new();
let schemas_d = fs::read_dir(&directory)
.expect(&format!("Failed to read directory: {:?}", directory));
for file in schemas_d {
if let Ok(file) = file {
let file = file.path();
match file.extension() {
Some(ext) => {
if ext == "yml" {
info!("Loading schema: {:?}", file);
let buf = fs::read_to_string(&file)
.expect(&format!("Failed to read {:?}", file));
let file_name = file.file_name().expect("Failed to unpack file_name()");
let value: serde_json::Value = serde_yaml::from_str(&buf)
.expect(&format!("Failed to parse {:?}", file));
let key = file_name.to_str().unwrap().to_string();
debug!("Inserting schema for key: {}", key);
// This is gross, gotta be a better way to structure this
schemas.insert(key, value);
}
},
_ => {},
}
}
}
Ok(schemas)
}
#[cfg(test)]
mod tests {
use super::*;
/**
* This test is pretty primitive, and is coupled to slipstream.yml in the
* repository
*/
#[test]
fn test_load_schemas_from() {
let schemas = load_schemas_from(std::path::PathBuf::from("./schemas.d"));
assert!(schemas.is_ok());
let schemas = schemas.unwrap();
assert_eq!(schemas.keys().len(), 1);
}
}

View File

@ -30,15 +30,27 @@ pub struct Settings {
#[serde(rename_all = "camelCase")]
pub struct Topic {
pub name: String,
pub schema: SchemaType,
pub schema: Schema,
pub routing: RoutingInfo,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SchemaType {
pub key: Option<String>,
pub path: Option<PathBuf>,
#[serde(untagged, rename_all = "camelCase")]
pub enum Schema {
/**
* A Key-based schema relies on the Kafka message to have a build in JSON key
* at the _root level_ which defines the path to the JSON Schema
*/
KeyType {
key: String,
},
/**
* A Path-based schema defines a schema that should be applied from outside the
* message content itself, i.e. the message needn't be self-describing.
*/
PathType {
path: PathBuf,
},
}
#[derive(Clone, Debug, Deserialize)]