broker/src/main.rs

384 lines
12 KiB
Rust

use std::{collections::HashMap, iter::Iterator};
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use argon2::{self, Config as Argon2Config};
use anyhow::Result;
use jsonwebtoken::{encode, TokenData, decode, Header, Validation, EncodingKey, DecodingKey};
use lazy_static::lazy_static;
use std::sync::Arc;
use tide::Request;
use http_types::headers::HeaderValue;
use tide::security::{CorsMiddleware, Origin};
use tide_rustls::TlsListener;
use async_std::stream;
use std::time::Duration;
use futures::StreamExt;
lazy_static! {
static ref DB : Arc<rocksdb::DB> = {
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(3);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_prefix_extractor(prefix_extractor);
let configure = env_var_config();
let db = rocksdb::DB::open(&opts, configure.db).unwrap();
Arc::new(db)
};
}
#[derive(Deserialize, Debug, Clone)]
pub struct EnvVarConfig {
pub port: u16,
pub jwt_expiry: i64,
pub origin: String,
pub jwt_secret: String,
pub db: String,
pub secure: bool,
pub cert_path: String,
pub key_path: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct User {
id: uuid::Uuid,
username: String,
password: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserForm {
username: String,
password: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LoginForm {
username: String,
password: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub exp: i64,
pub iat: i64,
pub iss: String,
pub sub: String,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Event {
pub id: uuid::Uuid,
pub user_id: uuid::Uuid,
pub event: String,
pub timestamp: i64,
pub data: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm {
event: String,
data: serde_json::Value,
}
fn replace(key: String, value: Vec<u8>) -> Result<()> {
DB.put(key.clone(), value.clone())?;
Ok(())
}
fn get_user_by_username(user_username: String) -> Result<Option<User>> {
let users = get_users()?;
Ok(users.into_iter().filter(|user| user.username == user_username).last())
}
fn get_users() -> Result<Vec<User>> {
let prefix = "users".as_bytes();
let i = DB.prefix_iterator(prefix);
let res : Vec<User> = i.map(|(_, v)| {
let data: User = rmp_serde::from_read_ref(&v).unwrap();
data
}).collect();
Ok(res)
}
fn puts_user(user: User) -> Result<()> {
let key = format!("users_{}", user.id);
let value = rmp_serde::to_vec_named(&user)?;
replace(key, value)?;
Ok(())
}
fn is_user_unique(user_username: String) -> Result<bool> {
let users = get_users()?;
for user in users {
if user.username == user_username {
return Ok(false);
}
}
Ok(true)
}
fn get_events() -> Result<Vec<Event>> {
let prefix = "events".as_bytes();
let i = DB.prefix_iterator(prefix);
let res : Vec<Event> = i.map(|(_, v)| {
let data: Event = rmp_serde::from_read_ref(&v).unwrap();
data
}).collect();
Ok(res)
}
fn puts_event(event: Event) -> Result<()> {
let key = format!("events_{}", event.event);
let value = rmp_serde::to_vec_named(&event)?;
replace(key, value)?;
Ok(())
}
fn user_create(user_form: UserForm) -> Result<Option<String>> {
if !is_user_unique(user_form.clone().username)? {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed };
puts_user(new_user).unwrap();
return Ok(None);
}
}
async fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let user_value = get_user_by_username(login.username)?;
match user_value {
Some(user) => {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let token = encode(&Header::default(), &my_claims, &EncodingKey::from_secret(app.jwt_secret.as_ref()))?;
Ok(Some(token))
} else {
Ok(None)
}
},
None => { Ok(None) }
}
}
fn env_var_config() -> EnvVarConfig {
let mut port : u16 = 8080;
let mut jwt_expiry : i64 = 86400;
let mut secure = false;
let mut origin = "*".to_string();
let mut jwt_secret = "secret".to_string();
let mut key_path = "./broker.rsa".to_string();
let mut cert_path = "./broker.pem".to_string();
let mut db: String = "tmp".to_string();
let _ : Vec<String> = go_flag::parse(|flags| {
flags.add_flag("port", &mut port);
flags.add_flag("origin", &mut origin);
flags.add_flag("jwt_expiry", &mut jwt_expiry);
flags.add_flag("jwt_secret", &mut jwt_secret);
flags.add_flag("secure", &mut secure);
flags.add_flag("key_path", &mut key_path);
flags.add_flag("cert_path", &mut cert_path);
flags.add_flag("db", &mut db);
});
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, key_path, cert_path, db}
}
fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
let configure = env_var_config();
let mut parts = token.split(" ");
let auth_type = parts.next().unwrap();
if auth_type == "Bearer" {
let token = parts.next().unwrap();
let _ = match decode::<Claims>(&token, &DecodingKey::from_secret(configure.jwt_secret.as_ref()), &Validation::default()) {
Ok(c) => {
return Ok(Some(c));
},
Err(_) => {
return Ok(None);
}
};
} else {
Ok(None)
}
}
// insert an event
async fn insert(user_username: String, event_form: EventForm) -> Result<bool> {
let user_value = get_user_by_username(user_username)?;
match user_value {
Some(user) => {
let timestamp = nippy::get_unix_ntp_time().await?;
let id = uuid::Uuid::new_v4();
let event = Event{
id,
data: event_form.data,
event: event_form.event,
user_id: user.id,
timestamp,
};
puts_event(event.clone())?;
Ok(true)
},
None => {
Ok(false)
}
}
}
async fn create_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await.unwrap();
let user_form : UserForm = serde_json::from_str(&r).unwrap();
match user_create(user_form)? {
Some(err) => {
let err = format!("error: {}", err);
Ok(tide::Response::builder(400).body(err).header("content-type", "application/json").build())
},
None => {
Ok(tide::Response::builder(200).body("").header("content-type", "application/json").build())
}
}
}
async fn login_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let login_form : LoginForm = serde_json::from_str(&r)?;
match create_jwt(login_form).await? {
Some(jwt) => {
let msg = format!("jwt: {}", jwt);
Ok(tide::Response::builder(200).body(msg).header("content-type", "application/json").build())
},
None => {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
}
async fn insert_event(mut req: Request<()>) -> tide::Result {
let token_value = req.header("authorization");
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap();
match jwt_value {
Some(jwt) => {
let r = req.body_string().await.unwrap();
let event_form : EventForm = serde_json::from_str(&r).unwrap();
insert(jwt.claims.sub, event_form).await.unwrap();
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
}
#[async_std::main]
async fn main() -> tide::Result<()> {
let configure = env_var_config();
let cors = CorsMiddleware::new()
.allow_methods("GET, POST, OPTIONS".parse::<HeaderValue>().unwrap())
.allow_headers("authorization".parse::<HeaderValue>().unwrap())
.allow_origin(Origin::from(configure.origin))
.allow_credentials(false);
let mut app = tide::new();
app.with(driftwood::DevLogger);
app.with(cors);
app.at("/insert").post(insert_event);
app.at("/users").post(create_user);
app.at("/login").post(login_user);
app.at("/sse").get(tide::sse::endpoint(|req: Request<()>, sender| async move {
let token_value = req.header("authorization");
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).unwrap();
match jwt_value {
Some(_) => {
let mut cache: HashMap<String, Event> = HashMap::new();
let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await {
let events = get_events()?;
for evt in events {
if !cache.contains_key(&evt.event) {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
},
None => {}
}
}
}
}
Ok(())
},
None => { Ok(()) }
}
},
None => { Ok(()) }
}
}));
let ip = format!("0.0.0.0:{}", configure.port);
if configure.secure {
app.listen(
TlsListener::build()
.addrs(ip)
.cert(configure.cert_path)
.key(configure.key_path)
)
.await?;
} else {
app.listen(ip).await?;
}
Ok(())
}