mirror of https://github.com/apibillme/broker
update
This commit is contained in:
commit
c219004fac
|
@ -0,0 +1,2 @@
|
|||
target
|
||||
tmp
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,17 @@
|
|||
[package]
|
||||
name = "download-counter-example"
|
||||
version = "0.0.1"
|
||||
authors = ["Bevan Hunt <bevan@bevanhunt.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
description = "Download Counter Example"
|
||||
repository = "https://github.com/apibillme/download-counter-example"
|
||||
|
||||
[dependencies]
|
||||
actix-rt = "1.0.0"
|
||||
actix-web = "2.0.0-alpha.6"
|
||||
actix-files = "0.2.0-alpha.3"
|
||||
futures = "0.3.1"
|
||||
tokio = "0.2.4"
|
||||
sse-actix-web="0.0.1"
|
||||
sled = "0.30.0"
|
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) [2019] [Bevan Hunt]
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,27 @@
|
|||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<meta http-equiv="X-UA-Compatible" content="ie=edge">
|
||||
<title>Server-sent events</title>
|
||||
<style>
|
||||
p {
|
||||
margin-top: 0.5em;
|
||||
margin-bottom: 0.5em;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div id="root"></div>
|
||||
<script>
|
||||
let root = document.getElementById("root");
|
||||
let events = new EventSource("/events");
|
||||
events.onmessage = (event) => {
|
||||
let data = document.createElement("p");
|
||||
let time = new Date().toLocaleTimeString();
|
||||
data.innerText = time + ": " + event.data;
|
||||
root.appendChild(data);
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,106 @@
|
|||
use std::pin::Pin;
|
||||
use std::sync::Mutex;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::web::{Bytes, Data};
|
||||
use actix_web::{Error, HttpResponse, Responder};
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::time::{interval_at, Instant};
|
||||
|
||||
pub async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
|
||||
let rx = broadcaster.lock().unwrap().new_client();
|
||||
|
||||
HttpResponse::Ok()
|
||||
.header("content-type", "text/event-stream")
|
||||
.no_chunking()
|
||||
.streaming(rx)
|
||||
}
|
||||
|
||||
pub async fn broadcast(
|
||||
msg: &str,
|
||||
broadcaster: Data<Mutex<Broadcaster>>,
|
||||
) -> () {
|
||||
broadcaster.lock().unwrap().send(&msg);
|
||||
}
|
||||
|
||||
pub struct Broadcaster {
|
||||
clients: Vec<Sender<Bytes>>,
|
||||
}
|
||||
|
||||
impl Broadcaster {
|
||||
pub fn create() -> Data<Mutex<Self>> {
|
||||
// Data ≃ Arc
|
||||
let me = Data::new(Mutex::new(Broadcaster::new()));
|
||||
|
||||
// ping clients every 10 seconds to see if they are alive
|
||||
Broadcaster::spawn_ping(me.clone());
|
||||
|
||||
me
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
Broadcaster {
|
||||
clients: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_ping(me: Data<Mutex<Self>>) {
|
||||
actix_rt::spawn(async move {
|
||||
let mut task = interval_at(Instant::now(), Duration::from_secs(10));
|
||||
while let Some(_) = task.next().await {
|
||||
me.lock().unwrap().remove_stale_clients();
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remove_stale_clients(&mut self) {
|
||||
let mut ok_clients = Vec::new();
|
||||
for client in self.clients.iter() {
|
||||
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
|
||||
|
||||
if let Ok(()) = result {
|
||||
ok_clients.push(client.clone());
|
||||
}
|
||||
}
|
||||
self.clients = ok_clients;
|
||||
}
|
||||
|
||||
pub fn new_client(&mut self) -> Client {
|
||||
let (tx, rx) = channel(100);
|
||||
|
||||
tx.clone()
|
||||
.try_send(Bytes::from("data: connected\n\n"))
|
||||
.unwrap();
|
||||
|
||||
self.clients.push(tx);
|
||||
Client(rx)
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: &str) {
|
||||
let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
|
||||
|
||||
for client in self.clients.iter() {
|
||||
client.clone().try_send(msg.clone()).unwrap_or(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wrap Receiver in own type, with correct error type
|
||||
pub struct Client(Receiver<Bytes>);
|
||||
|
||||
impl Stream for Client {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
match Pin::new(&mut self.0).poll_next(cx) {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
use std::io::{self};
|
||||
use actix_web::{web, HttpServer, HttpResponse, App, Responder};
|
||||
mod lib;
|
||||
use lib::{new_client, Broadcaster, broadcast};
|
||||
use sled::Db;
|
||||
use actix_files::NamedFile;
|
||||
|
||||
struct MyData {
|
||||
db: sled::Db,
|
||||
}
|
||||
|
||||
async fn download(data: web::Data<MyData>, broad: web::Data<std::sync::Mutex<Broadcaster>>) -> io::Result<NamedFile> {
|
||||
let counter_buffer = data.db.get(b"counter").unwrap().unwrap();
|
||||
|
||||
let counter = std::str::from_utf8(&counter_buffer).unwrap();
|
||||
let counter_int = counter.clone().parse::<i32>().unwrap();
|
||||
let new_counter_int = counter_int + 1;
|
||||
let new_counter_string = new_counter_int.clone().to_string();
|
||||
let new_counter = new_counter_string.as_bytes();
|
||||
let old_counter = counter.clone().as_bytes();
|
||||
|
||||
let _ = data.db.compare_and_swap(b"counter", Some(old_counter.clone()), Some(new_counter.clone()));
|
||||
|
||||
let _ = web::block(move || data.db.flush()).await;
|
||||
|
||||
broadcast(counter, broad).await;
|
||||
|
||||
let f = web::block(|| std::fs::File::create("test.jpg")).await.unwrap();
|
||||
|
||||
// BUG: this should return content-disposition attachment not inline
|
||||
NamedFile::from_file(f, "test.jpg")
|
||||
}
|
||||
|
||||
async fn index() -> impl Responder {
|
||||
let content = include_str!("index.html");
|
||||
|
||||
HttpResponse::Ok()
|
||||
.header("content-type", "text/html")
|
||||
.body(content)
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let data = Broadcaster::create();
|
||||
let tree = Db::open("./tmp/data").unwrap();
|
||||
tree.insert(b"counter", b"0").unwrap();
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.register_data(data.clone())
|
||||
.data(MyData{ db: tree.clone() })
|
||||
.route("/", web::get().to(index))
|
||||
.route("/events", web::get().to(new_client))
|
||||
.route("/download", web::get().to(download))
|
||||
})
|
||||
.bind("0.0.0.0:3000")?
|
||||
.start()
|
||||
.await
|
||||
}
|
Loading…
Reference in New Issue