cargo fmt

This commit is contained in:
R Tyler Croy 2020-07-20 14:56:06 -07:00
parent b73686dfcb
commit 60b11c5846
2 changed files with 57 additions and 44 deletions

View File

@ -4,9 +4,9 @@ extern crate serde_derive;
use async_std::sync::Arc;
use async_std::task;
use futures::channel::mpsc::{channel, Sender};
use futures::StreamExt;
use futures::sink::SinkExt;
use jsonschema::{JSONSchema, Draft};
use futures::StreamExt;
use jsonschema::{Draft, JSONSchema};
use log::*;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
@ -62,26 +62,24 @@ impl TopicContext {
return self.schemas.get(schema);
}
}
},
}
settings::Schema::PathType { path } => {
if let Some(path_str) = path.as_path().to_str() {
return self.schemas.get(path_str);
}
},
}
}
None
}
}
#[async_std::main]
async fn main() {
pretty_env_logger::init();
let settings = Arc::new(settings::load_settings());
// Load schemas from directory
let schemas = load_schemas_from(settings.schemas.clone())
.expect("Failed to load schemas.d");
let schemas = load_schemas_from(settings.schemas.clone()).expect("Failed to load schemas.d");
// XXX: fix this magic number and make the channel size configurable
let (sender, mut receiver) = channel::<DispatchMessage>(1024);
@ -100,7 +98,8 @@ async fn main() {
* like it
*/
let consumer: StreamConsumer = kafka_config.clone()
let consumer: StreamConsumer = kafka_config
.clone()
.create()
.expect("Consumer creation failed");
@ -116,13 +115,12 @@ async fn main() {
// Need to block the main task here with something so the topic consumers don't die
task::block_on(async move {
let producer: FutureProducer = kafka_config.create()
.expect("Producer creation failed");
let producer: FutureProducer = kafka_config.create().expect("Producer creation failed");
while let Some(message) = receiver.next().await {
info!("Need to dispatch to Kafka: {:?}", message);
let record: FutureRecord<String, String> = FutureRecord::to(&message.destination)
.payload(&message.payload);
let record: FutureRecord<String, String> =
FutureRecord::to(&message.destination).payload(&message.payload);
// TODO: Make this more robust and not sequential
producer.send(record, -1 as i64).await;
}
@ -133,10 +131,15 @@ async fn main() {
* This function starts a runloop which will consume from the given topic
* and then send messages along to the sender channel
*/
async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender: Sender<DispatchMessage>) -> Result<(), std::io::Error> {
async fn consume_topic(
consumer: StreamConsumer,
ctx: TopicContext,
mut sender: Sender<DispatchMessage>,
) -> Result<(), std::io::Error> {
let topic = &ctx.topic;
consumer.subscribe(&[&topic.name])
consumer
.subscribe(&[&topic.name])
.expect("Could not subscribe consumer");
let mut stream = consumer.start();
debug!("Consuming from {}", topic.name);
@ -145,7 +148,7 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender:
match message {
Err(e) => {
error!("Failed to consume: {}", e);
},
}
Ok(message) => {
// TODO: might as well turn payload into an OwnedMessage since we need to copy it
// around a couple times anyways
@ -155,20 +158,23 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender:
Some(Err(e)) => {
error!("Could not parse message: {:?}", e);
""
},
}
};
debug!("Message consumed: {:?}", payload);
// Do the schema validation
// TODO: better error handling for non-JSON
let value: serde_json::Value = serde_json::from_str(payload)
.expect("Failed to deserialize message payload");
let value: serde_json::Value =
serde_json::from_str(payload).expect("Failed to deserialize message payload");
let schema = ctx.schema_to_use(&value);
if schema.is_none() {
error!("Could not load a schema, skipping message on {}", topic.name);
error!(
"Could not load a schema, skipping message on {}",
topic.name
);
continue;
}
let schema = schema.unwrap();
@ -195,7 +201,7 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender:
sender.send(message).await;
}
},
}
Ok(_) => {
if let Some(valid_topic) = &topic.routing.valid {
// TODO: this is ugly
@ -206,13 +212,12 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender:
};
sender.send(message).await;
}
else {
} else {
debug!("No valid topic defined for {}", topic.name);
}
},
}
};
},
}
}
}
@ -226,8 +231,8 @@ async fn consume_topic(consumer: StreamConsumer, ctx: TopicContext, mut sender:
*/
fn load_schemas_from(directory: std::path::PathBuf) -> Result<NamedSchemas, ()> {
let mut schemas = HashMap::<String, serde_json::Value>::new();
let schemas_d = fs::read_dir(&directory)
.expect(&format!("Failed to read directory: {:?}", directory));
let schemas_d =
fs::read_dir(&directory).expect(&format!("Failed to read directory: {:?}", directory));
for file in schemas_d {
if let Ok(file) = file {
@ -236,8 +241,8 @@ fn load_schemas_from(directory: std::path::PathBuf) -> Result<NamedSchemas, ()>
Some(ext) => {
if ext == "yml" {
info!("Loading schema: {:?}", file);
let buf = fs::read_to_string(&file)
.expect(&format!("Failed to read {:?}", file));
let buf =
fs::read_to_string(&file).expect(&format!("Failed to read {:?}", file));
let file_name = file.file_name().expect("Failed to unpack file_name()");
let value: serde_json::Value = serde_yaml::from_str(&buf)
@ -248,8 +253,8 @@ fn load_schemas_from(directory: std::path::PathBuf) -> Result<NamedSchemas, ()>
// This is gross, gotta be a better way to structure this
schemas.insert(key, value);
}
},
_ => {},
}
_ => {}
}
}
}
@ -298,7 +303,8 @@ mod tests {
};
let message = json!({"$id" : "hello.yml"});
let expected_schema = schemas.get("hello.yml")
let expected_schema = schemas
.get("hello.yml")
.expect("Failed to load hello.yml named schema");
let schema = ctx.schema_to_use(&message);
@ -315,7 +321,12 @@ mod tests {
let settings = Arc::new(settings_for_test());
let schemas = Arc::new(schemas_for_test());
let topics: Vec<settings::Topic> = settings.topics.clone().into_iter().filter(|t| t.name == "other").collect();
let topics: Vec<settings::Topic> = settings
.topics
.clone()
.into_iter()
.filter(|t| t.name == "other")
.collect();
let ctx = TopicContext {
schemas: schemas.clone(),
@ -324,7 +335,8 @@ mod tests {
};
let message = json!({});
let expected_schema = schemas.get("other.yml")
let expected_schema = schemas
.get("other.yml")
.expect("Failed to load other.yml named schema");
let schema = ctx.schema_to_use(&message);

View File

@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::path::PathBuf;
@ -7,10 +6,12 @@ use std::path::PathBuf;
*/
pub fn load_settings() -> Settings {
let mut settings = config::Config::default();
settings.merge(config::File::with_name("slipstream.yml"))
settings
.merge(config::File::with_name("slipstream.yml"))
.expect("Failed to load configuration from `slipstream.yml`");
settings.try_into()
settings
.try_into()
.expect("Failed to coerce configuration into our internal structures")
}
@ -41,16 +42,12 @@ pub enum Schema {
* A Key-based schema relies on the Kafka message to have a build in JSON key
* at the _root level_ which defines the path to the JSON Schema
*/
KeyType {
key: String,
},
KeyType { key: String },
/**
* A Path-based schema defines a schema that should be applied from outside the
* message content itself, i.e. the message needn't be self-describing.
*/
PathType {
path: PathBuf,
},
PathType { path: PathBuf },
}
#[derive(Clone, Debug, Deserialize)]
@ -68,10 +65,14 @@ mod tests {
#[test]
fn test_load_settings() {
let settings = load_settings();
let brokers = settings.kafka.get("bootstrap.servers")
let brokers = settings
.kafka
.get("bootstrap.servers")
.expect("Failed to look up the bootstrap.servers");
assert_eq!(brokers, "localhost:9092");
let group = settings.kafka.get("group.id")
let group = settings
.kafka
.get("group.id")
.expect("Failed to look up the group.id");
assert_eq!(group, "slipstream");
}