mirror of https://github.com/apibillme/broker
make library
This commit is contained in:
parent
5fcf132739
commit
8075c26060
|
@ -1249,6 +1249,7 @@ version = "0.0.1"
|
|||
dependencies = [
|
||||
"actix-cors 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-server 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
|
|
@ -19,3 +19,4 @@ json = "0.12.0"
|
|||
sled = "0.30.3"
|
||||
envy = "0.4.0"
|
||||
env_logger = "0.6"
|
||||
actix-server = "1.0.1"
|
||||
|
|
19
README.md
19
README.md
|
@ -1,8 +1,23 @@
|
|||
# SSE server using Actix Web
|
||||
|
||||
## Build
|
||||
## Use
|
||||
|
||||
- ``` make ```
|
||||
```rust
|
||||
use sse_server::{sse_start};
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> std::result::Result<(), std::io::Error> {
|
||||
sse_start("*".to_owned()).await
|
||||
}
|
||||
```
|
||||
|
||||
- the only param is the origin you want to allow - wildcard for all
|
||||
- the PORT needs to passed in as an environment variable
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
- run ``` make ``` and ``` cd example && npm i && npm start ``` to run example
|
||||
|
||||
## Example
|
||||
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
use actix_web::{http::header, middleware, web, HttpServer, HttpResponse, App, Error, Responder};
|
||||
use sse_actix_web::{Broadcaster, broadcast};
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
use sled;
|
||||
use actix_cors::Cors;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Config {
|
||||
port: String
|
||||
}
|
||||
|
||||
struct MyData {
|
||||
db: sled::Db
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct JSON {
|
||||
event: String,
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>) -> impl Responder {
|
||||
|
||||
// get iter to loop through all keys in db
|
||||
let iter = data.db.iter();
|
||||
|
||||
// turn iVec(s) to String(s) and make HashMap to pass to sse
|
||||
let vals: HashMap<String, String> = iter.into_iter().map(|x| {
|
||||
let p = x.unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
(k, v)
|
||||
}).collect();
|
||||
let rx = broad.lock().unwrap().new_client(vals);
|
||||
|
||||
// create sse endpoint
|
||||
HttpResponse::Ok()
|
||||
.header("content-type", "text/event-stream")
|
||||
.no_chunking()
|
||||
.streaming(rx)
|
||||
}
|
||||
|
||||
async fn insert(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, json: web::Json<JSON>) -> Result<HttpResponse, Error> {
|
||||
|
||||
// write blank value if no value for key exists so that get later doesn't crash
|
||||
let data_cloned = data.clone();
|
||||
let _ = data_cloned.db.compare_and_swap(json.0.event.clone(), None as Option<&[u8]>, Some(b""));
|
||||
let _ = web::block(move || data_cloned.db.flush()).await;
|
||||
|
||||
// get old value from db and new value from json
|
||||
let new_value_string = serde_json::to_string(&json.0.data).unwrap();
|
||||
let new_value = new_value_string.as_bytes();
|
||||
let old_value_buffer = data.db.get(json.0.event.clone()).unwrap().unwrap();
|
||||
let old_value_string = std::str::from_utf8(&old_value_buffer).unwrap();
|
||||
let old_value = old_value_string.clone().as_bytes();
|
||||
|
||||
// write old and new values to db
|
||||
let _ = data.db.compare_and_swap(json.0.event.clone(), Some(old_value.clone()), Some(new_value.clone()));
|
||||
let _ = web::block(move || data.db.flush()).await;
|
||||
|
||||
// write new event and data to sse
|
||||
broadcast(json.0.event.clone(), serde_json::to_string(&json.0.data).unwrap(), broad.clone()).await;
|
||||
|
||||
// return data to json response as 200
|
||||
Ok(HttpResponse::Ok().json(json.0.data))
|
||||
}
|
||||
|
||||
pub async fn sse_start(origin: String) -> std::result::Result<(), std::io::Error> {
|
||||
// set actix web env vars
|
||||
std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
|
||||
env_logger::init();
|
||||
|
||||
// get port env var
|
||||
let config = envy::from_env::<Config>().unwrap();
|
||||
let ip = format!("0.0.0.0:{}", config.port);
|
||||
|
||||
// setup db and sse
|
||||
let tree = sled::open("./tmp/data").unwrap();
|
||||
let data = Broadcaster::create();
|
||||
|
||||
// create actix web server with CORS, data, and routes - handle wildcard origins
|
||||
if origin == "*" {
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.wrap(
|
||||
Cors::new()
|
||||
.send_wildcard()
|
||||
.allowed_methods(vec!["GET", "POST"])
|
||||
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT, header::CONTENT_TYPE])
|
||||
.max_age(3600)
|
||||
.finish()
|
||||
)
|
||||
.app_data(data.clone())
|
||||
.app_data(web::JsonConfig::default())
|
||||
.data(MyData{ db: tree.clone() })
|
||||
.route("/insert", web::post().to(insert))
|
||||
.route("/events", web::get().to(new_client))
|
||||
})
|
||||
.bind(ip).unwrap()
|
||||
.run()
|
||||
.await
|
||||
} else {
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.wrap(
|
||||
Cors::new()
|
||||
.allowed_origin(&origin)
|
||||
.allowed_methods(vec!["GET", "POST"])
|
||||
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT, header::CONTENT_TYPE])
|
||||
.max_age(3600)
|
||||
.finish()
|
||||
)
|
||||
.app_data(data.clone())
|
||||
.app_data(web::JsonConfig::default())
|
||||
.data(MyData{ db: tree.clone() })
|
||||
.route("/insert", web::post().to(insert))
|
||||
.route("/events", web::get().to(new_client))
|
||||
})
|
||||
.bind(ip).unwrap()
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
}
|
103
src/main.rs
103
src/main.rs
|
@ -1,102 +1,7 @@
|
|||
use actix_web::{http::header, middleware, web, HttpServer, HttpResponse, App, Error, Responder};
|
||||
use sse_actix_web::{Broadcaster, broadcast};
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::sync::Mutex;
|
||||
use sled;
|
||||
use actix_cors::Cors;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Config {
|
||||
port: String
|
||||
}
|
||||
|
||||
pub struct MyData {
|
||||
db: sled::Db
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct JSON {
|
||||
event: String,
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
async fn new_client(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>) -> impl Responder {
|
||||
|
||||
let iter = data.db.iter();
|
||||
|
||||
let vals: HashMap<String, String> = iter.into_iter().map(|x| {
|
||||
let p = x.unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
(k, v)
|
||||
}).collect();
|
||||
|
||||
let rx = broad.lock().unwrap().new_client(vals);
|
||||
|
||||
HttpResponse::Ok()
|
||||
.header("content-type", "text/event-stream")
|
||||
.no_chunking()
|
||||
.streaming(rx)
|
||||
}
|
||||
|
||||
async fn insert(data: web::Data<MyData>, broad: web::Data<Mutex<Broadcaster>>, json: web::Json<JSON>) -> Result<HttpResponse, Error> {
|
||||
|
||||
let data_cloned = data.clone();
|
||||
|
||||
let _ = data_cloned.db.compare_and_swap(json.0.event.clone(), None as Option<&[u8]>, Some(b""));
|
||||
|
||||
let _ = web::block(move || data_cloned.db.flush()).await;
|
||||
|
||||
let user_string = serde_json::to_string(&json.0.data).unwrap();
|
||||
|
||||
let user_buffer = data.db.get(json.0.event.clone()).unwrap().unwrap();
|
||||
|
||||
let user = std::str::from_utf8(&user_buffer).unwrap();
|
||||
|
||||
let new_user = user_string.as_bytes();
|
||||
|
||||
let old_user = user.clone().as_bytes();
|
||||
|
||||
let _ = data.db.compare_and_swap(json.0.event.clone(), Some(old_user.clone()), Some(new_user.clone()));
|
||||
|
||||
let _ = web::block(move || data.db.flush()).await;
|
||||
|
||||
broadcast(json.0.event.clone(), serde_json::to_string(&json.0.data).unwrap(), broad.clone()).await;
|
||||
|
||||
Ok(HttpResponse::Ok().json(json.0.data))
|
||||
}
|
||||
mod lib;
|
||||
use lib::{sse_start};
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info");
|
||||
env_logger::init();
|
||||
let config = envy::from_env::<Config>().unwrap();
|
||||
let ip = format!("0.0.0.0:{}", config.port);
|
||||
|
||||
let tree = sled::open("./tmp/data").unwrap();
|
||||
let tree_clone = tree.clone();
|
||||
|
||||
let data = Broadcaster::create();
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(middleware::Logger::default())
|
||||
.wrap(
|
||||
Cors::new()
|
||||
.send_wildcard()
|
||||
.allowed_methods(vec!["GET", "POST"])
|
||||
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT, header::CONTENT_TYPE])
|
||||
.max_age(3600)
|
||||
.finish(),
|
||||
)
|
||||
.app_data(data.clone())
|
||||
.data(MyData{ db: tree_clone.clone()})
|
||||
.app_data(web::JsonConfig::default())
|
||||
.route("/insert", web::post().to(insert))
|
||||
.route("/events", web::get().to(new_client))
|
||||
})
|
||||
.bind(ip)?
|
||||
.run()
|
||||
.await
|
||||
async fn main() -> std::result::Result<(), std::io::Error> {
|
||||
sse_start("*".to_owned()).await
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue