mirror of https://github.com/apibillme/broker
test
This commit is contained in:
parent
bae1f90d6f
commit
f16d750fb9
92
src/main.rs
92
src/main.rs
|
@ -321,54 +321,54 @@ async fn main() {
|
|||
}
|
||||
});
|
||||
|
||||
let _ = tokio::spawn(async move {
|
||||
loop {
|
||||
let vals : HashMap<String, Event> = tree_clone.iter().into_iter().filter(|x| {
|
||||
let p = x.as_ref().unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
if k.contains("_v_") {
|
||||
let json : Event = serde_json::from_str(&v).unwrap();
|
||||
let now = Utc::now().timestamp();
|
||||
if json.timestamp <= now && !json.published && !json.cancelled {
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}).map(|x| {
|
||||
let p = x.as_ref().unwrap();
|
||||
let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
let json : Event = serde_json::from_str(&v).unwrap();
|
||||
let json_cloned = json.clone();
|
||||
(k, json_cloned)
|
||||
}).collect();
|
||||
// let _ = tokio::spawn(async move {
|
||||
// loop {
|
||||
// let vals : HashMap<String, Event> = tree_clone.iter().into_iter().filter(|x| {
|
||||
// let p = x.as_ref().unwrap();
|
||||
// let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
// let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
// if k.contains("_v_") {
|
||||
// let json : Event = serde_json::from_str(&v).unwrap();
|
||||
// let now = Utc::now().timestamp();
|
||||
// if json.timestamp <= now && !json.published && !json.cancelled {
|
||||
// return true
|
||||
// } else {
|
||||
// return false
|
||||
// }
|
||||
// } else {
|
||||
// return false
|
||||
// }
|
||||
// }).map(|x| {
|
||||
// let p = x.as_ref().unwrap();
|
||||
// let k = std::str::from_utf8(&p.0).unwrap().to_owned();
|
||||
// let v = std::str::from_utf8(&p.1).unwrap().to_owned();
|
||||
// let json : Event = serde_json::from_str(&v).unwrap();
|
||||
// let json_cloned = json.clone();
|
||||
// (k, json_cloned)
|
||||
// }).collect();
|
||||
|
||||
for (k, v) in vals {
|
||||
let old_json = v.clone();
|
||||
let old_json_clone = old_json.clone();
|
||||
let mut new_json = v.clone();
|
||||
new_json.published = true;
|
||||
let newer_json = new_json.clone();
|
||||
// for (k, v) in vals {
|
||||
// let old_json = v.clone();
|
||||
// let old_json_clone = old_json.clone();
|
||||
// let mut new_json = v.clone();
|
||||
// new_json.published = true;
|
||||
// let newer_json = new_json.clone();
|
||||
|
||||
let guid = Uuid::new_v4().to_string();
|
||||
let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
|
||||
let _ = tx.send(sse).await;
|
||||
let tree_cloned = tree_clone.clone();
|
||||
let _ = tokio::spawn(async move {
|
||||
let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
|
||||
let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
|
||||
let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
|
||||
let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
|
||||
let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
|
||||
let _ = tree_cloned.flush();
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
// let guid = Uuid::new_v4().to_string();
|
||||
// let sse = SSE{id: guid, event: new_json.event, data: serde_json::to_string(&new_json.data).unwrap(), retry: Duration::from_millis(5000)};
|
||||
// let _ = tx.send(sse).await;
|
||||
// let tree_cloned = tree_clone.clone();
|
||||
// let _ = tokio::spawn(async move {
|
||||
// let _ = tree_cloned.compare_and_swap(old_json.event.as_bytes(), None as Option<&[u8]>, Some(b""));
|
||||
// let old_json_og = tree_cloned.get(old_json.event).unwrap().unwrap();
|
||||
// let old_value = std::str::from_utf8(&old_json_og).unwrap().to_owned();
|
||||
// let _ = tree_cloned.compare_and_swap(old_json_clone.event.as_bytes(), Some(old_value.as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
|
||||
// let _ = tree_cloned.compare_and_swap(k, Some(serde_json::to_string(&old_json_clone).unwrap().as_bytes()), Some(serde_json::to_string(&newer_json).unwrap().as_bytes()));
|
||||
// let _ = tree_cloned.flush();
|
||||
// }).await;
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
|
||||
|
||||
let events = rx.map(|sse| {
|
||||
|
|
Loading…
Reference in New Issue