broker/src/main.rs

458 lines
16 KiB
Rust

use std::{collections::HashMap, iter::Iterator};
use serde_derive::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use argon2::{self, Config as Argon2Config};
use anyhow::Result;
use jsonwebtoken::{encode, TokenData, decode, Header, Validation, EncodingKey, DecodingKey};
use lazy_static::lazy_static;
use std::sync::Arc;
use tide::Request;
use http_types::headers::HeaderValue;
use tide::security::{CorsMiddleware, Origin};
use async_std::stream;
use std::time::Duration;
use futures::StreamExt;
use tide_acme::{AcmeConfig, TideRustlsExt};
lazy_static! {
static ref DB : Arc<rocksdb::DB> = {
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(3);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_prefix_extractor(prefix_extractor);
let configure = env_var_config();
let db = rocksdb::DB::open(&opts, configure.db).unwrap();
Arc::new(db)
};
}
#[derive(Deserialize, Debug, Clone)]
pub struct EnvVarConfig {
pub port: u16,
pub jwt_expiry: i64,
pub origin: String,
pub jwt_secret: String,
pub db: String,
pub secure: bool,
pub certs: String,
pub domain: String,
pub admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct User {
id: uuid::Uuid,
username: String,
password: String,
tenant_name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct UserForm {
username: String,
password: String,
tenant_name: String,
admin_token: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LoginForm {
username: String,
password: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub exp: i64,
pub iat: i64,
pub iss: String,
pub sub: String,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Event {
pub id: uuid::Uuid,
pub user_id: uuid::Uuid,
pub event: String,
pub timestamp: i64,
pub data: serde_json::Value,
pub tenant_name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventForm {
event: String,
data: serde_json::Value
}
fn replace(key: String, value: Vec<u8>) -> Result<()> {
DB.put(key.clone(), value.clone())?;
Ok(())
}
fn get_user_by_username(user_username: String) -> Result<Option<User>> {
let users = get_users()?;
Ok(users.into_iter().filter(|user| user.username == user_username).last())
}
fn get_users() -> Result<Vec<User>> {
let prefix = "users".as_bytes();
let i = DB.prefix_iterator(prefix);
let res : Vec<User> = i.map(|(_, v)| {
let data: User = rmp_serde::from_read_ref(&v).unwrap();
data
}).collect();
Ok(res)
}
fn puts_user(user: User) -> Result<()> {
let key = format!("users_{}", user.id);
let value = rmp_serde::to_vec_named(&user)?;
replace(key, value)?;
Ok(())
}
fn is_user_unique(user_username: String) -> Result<bool> {
let users = get_users()?;
for user in users {
if user.username == user_username {
return Ok(false);
}
}
Ok(true)
}
fn get_events() -> Result<Vec<Event>> {
let prefix = "events".as_bytes();
let i = DB.prefix_iterator(prefix);
let res : Vec<Event> = i.map(|(_, v)| {
let data: Event = rmp_serde::from_read_ref(&v).unwrap();
data
}).collect();
Ok(res)
}
fn puts_event(event: Event) -> Result<()> {
let key = format!("events_{}", event.id);
let value = rmp_serde::to_vec_named(&event)?;
replace(key, value)?;
Ok(())
}
fn user_create(user_form: UserForm) -> Result<Option<String>> {
let configure = env_var_config();
if configure.admin_token == user_form.clone().admin_token {
if !is_user_unique(user_form.clone().username)? {
let j = json!({"error": "username already taken"}).to_string();
return Ok(Some(j));
} else {
// set as future value
let uuid = Uuid::new_v4();
let config = Argon2Config::default();
let uuid_string = Uuid::new_v4().to_string();
let salt = uuid_string.as_bytes();
let password = user_form.password.as_bytes();
let hashed = argon2::hash_encoded(password, salt, &config).unwrap();
let new_user = User{id: uuid, username: user_form.clone().username, password: hashed, tenant_name: user_form.clone().tenant_name };
puts_user(new_user).unwrap();
return Ok(None);
}
} else {
let j = json!({"error": "admin_token is incorrect"}).to_string();
return Ok(Some(j));
}
}
async fn create_jwt(login: LoginForm) -> Result<Option<String>> {
let user_value = get_user_by_username(login.username)?;
match user_value {
Some(user) => {
let verified = argon2::verify_encoded(&user.password, login.password.as_bytes())?;
if verified {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let token = encode(&Header::default(), &my_claims, &EncodingKey::from_secret(app.jwt_secret.as_ref()))?;
Ok(Some(token))
} else {
Ok(None)
}
},
None => { Ok(None) }
}
}
fn env_var_config() -> EnvVarConfig {
let mut port : u16 = 8080;
let mut jwt_expiry : i64 = 86400;
let mut secure = false;
let mut origin = "*".to_string();
let mut jwt_secret = "secret".to_string();
let mut db: String = "tmp".to_string();
let mut certs = "certs".to_string();
let mut domain = "localhost".to_string();
let mut admin_token = "letmein".to_string();
let _ : Vec<String> = go_flag::parse(|flags| {
flags.add_flag("port", &mut port);
flags.add_flag("origin", &mut origin);
flags.add_flag("jwt_expiry", &mut jwt_expiry);
flags.add_flag("jwt_secret", &mut jwt_secret);
flags.add_flag("secure", &mut secure);
flags.add_flag("db", &mut db);
flags.add_flag("domain", &mut domain);
flags.add_flag("certs", &mut certs);
flags.add_flag("admin_token", &mut admin_token);
});
EnvVarConfig{port, origin, jwt_expiry, jwt_secret, secure, domain, certs, db, admin_token}
}
async fn jwt_verify(token: String) -> Result<Option<TokenData<Claims>>> {
let configure = env_var_config();
let mut parts = token.split(" ");
let auth_type = parts.next().unwrap();
if auth_type == "Bearer" {
let token = parts.next().unwrap();
let _ = match decode::<Claims>(&token, &DecodingKey::from_secret(configure.jwt_secret.as_ref()), &Validation::default()) {
Ok(c) => { return Ok(Some(c)); },
Err(_) => { return Ok(None); }
};
} else if auth_type == "Basic" {
let basic_encoded = parts.next().unwrap();
let _ = match base64::decode(basic_encoded) {
Ok(c) => {
let _ = match std::str::from_utf8(&c) {
Ok(basic) => {
let mut basic_parts = basic.split(":");
let user_name = basic_parts.next().unwrap();
let password = basic_parts.next().unwrap();
let user_value = get_user_by_username(user_name.to_string())?;
match user_value {
Some(user) => {
if argon2::verify_encoded(&user.password, password.as_ref())? {
let app = env_var_config();
let iat = nippy::get_unix_ntp_time().await?;
let exp = iat + app.jwt_expiry;
let iss = "Dispatcher".to_string();
let my_claims = Claims{sub: user.clone().username, exp, iat, iss};
let my_token = TokenData{
header: Header::default(),
claims: my_claims,
};
return Ok(Some(my_token));
}
},
None => { return Ok(None); }
}
},
Err(_) => { return Ok(None); }
};
},
Err(_) => { return Ok(None); }
};
}
Ok(None)
}
async fn insert(user: User, event_form: EventForm) -> Result<()> {
let timestamp = nippy::get_unix_ntp_time().await?;
let id = uuid::Uuid::new_v4();
let event = Event{
id,
data: event_form.data,
event: event_form.event,
user_id: user.id,
timestamp,
tenant_name: user.tenant_name,
};
puts_event(event.clone())?;
Ok(())
}
async fn create_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let user_form : UserForm = serde_json::from_str(&r)?;
match user_create(user_form)? {
Some(err) => {
let err = format!("error: {}", err);
Ok(tide::Response::builder(400).body(err).header("content-type", "application/json").build())
},
None => {
Ok(tide::Response::builder(200).body("").header("content-type", "application/json").build())
}
}
}
async fn login_user(mut req: Request<()>) -> tide::Result {
let r = req.body_string().await?;
let login_form : LoginForm = serde_json::from_str(&r)?;
match create_jwt(login_form).await? {
Some(jwt) => {
let msg = format!("jwt: {}", jwt);
Ok(tide::Response::builder(200).body(msg).header("content-type", "application/json").build())
},
None => {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
}
async fn insert_event(mut req: Request<()>) -> tide::Result {
let token_value = req.header("authorization");
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).await?;
match jwt_value {
Some(jwt) => {
let r = req.body_string().await?;
let event_form : EventForm = serde_json::from_str(&r)?;
match get_user_by_username(jwt.claims.sub)? {
Some(user) => {
insert(user, event_form).await?;
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
}
async fn verify_user(req: Request<()>) -> tide::Result {
let token_value = req.header("authorization");
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).await?;
match jwt_value {
Some(jwt) => {
match get_user_by_username(jwt.claims.sub)? {
Some(_) => {
Ok(tide::Response::builder(200).header("content-type", "application/json").build())
},
None => {
Ok(tide::Response::builder(401).header("content-type", "application/json").build())
}
}
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
},
None => { Ok(tide::Response::builder(401).header("content-type", "application/json").build()) }
}
}
#[async_std::main]
async fn main() -> tide::Result<()> {
let configure = env_var_config();
let cors = CorsMiddleware::new()
.allow_methods("GET, POST, OPTIONS".parse::<HeaderValue>().unwrap())
.allow_headers("authorization".parse::<HeaderValue>().unwrap())
.allow_origin(Origin::from(configure.origin))
.allow_credentials(false);
let mut app = tide::new();
app.with(driftwood::DevLogger);
app.with(cors);
app.at("/insert").post(insert_event);
app.at("/users").post(create_user);
app.at("/login").post(login_user);
app.at("/verify").get(verify_user);
app.at("/sse").get(tide::sse::endpoint(|req: Request<()>, sender| async move {
let token_value = req.header("authorization");
match token_value {
Some(token_header) => {
let token = token_header.last().to_string();
let jwt_value = jwt_verify(token).await?;
match jwt_value {
Some(jwt) => {
let user_value = get_user_by_username(jwt.claims.sub)?;
match user_value {
Some(user) => {
let mut cache: HashMap<String, Event> = HashMap::new();
let mut interval = stream::interval(Duration::from_millis(100));
while let Some(_) = interval.next().await {
let events = get_events()?;
for evt in events {
if evt.tenant_name == user.tenant_name {
if !cache.contains_key(&evt.event) {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
} else {
let value_maybe = cache.get_key_value(&evt.event);
match value_maybe {
Some((_, v)) => {
if &evt != v {
let id = uuid::Uuid::new_v4();
sender.send(&evt.event, evt.data.to_string(), Some(&id.to_string())).await?;
cache.insert(evt.event.clone(), evt.clone());
}
},
None => { println!("helo"); return Ok(()); }
}
}
}
}
}
},
None => { return Ok(()); }
}
Ok(())
},
None => { Ok(()) }
}
},
None => { Ok(()) }
}
}));
let ip = format!("0.0.0.0:{}", configure.port);
if configure.secure {
app.listen(
tide_rustls::TlsListener::build().addrs("0.0.0.0:443").acme(
AcmeConfig::new()
.domains(vec![configure.domain])
.cache_dir(configure.certs)
.production(),
),
)
.await?;
} else {
app.listen(ip).await?;
}
Ok(())
}