Rework the code

This commit is contained in:
Mahmut Bulut 2020-03-06 00:28:23 +01:00
parent 25eed79ae5
commit 6c08b7bb5a
7 changed files with 134 additions and 106 deletions

View File

@ -48,7 +48,7 @@ fn main() {
};
// Configure our cluster node
let (cluster, cluster_listener) = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let (cluster, _cluster_listener) = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
// Launch the cluster node
@ -60,8 +60,7 @@ fn main() {
let ap_events = ap_cluster.clone();
// Detach cluster launch
let cluster_handle =
spawn(async move { ap_cluster.launch().await }, cluster_stack);
let cluster_handle = spawn(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(

View File

@ -4,11 +4,8 @@ use crate::service_discovery::mdns::prelude::*;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use uuid::Uuid;
use futures::{join, future};
#[derive(Default, Clone)]
pub struct ArtilleryAPClusterConfig {
@ -21,7 +18,7 @@ pub struct ArtilleryAPClusterConfig {
pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>
sd: Arc<MDNSServiceDiscovery>,
}
unsafe impl Send for ArtilleryAPCluster {}
@ -33,13 +30,17 @@ impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<(Self, RecoverableHandle<()>)> {
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
let (cluster, cluster_listener) = Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
let (cluster, cluster_listener) =
Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
Ok((Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd)
}, cluster_listener))
Ok((
Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd),
},
cluster_listener,
))
}
pub fn cluster(&self) -> Arc<Cluster> {
@ -55,8 +56,7 @@ impl ArtilleryAPCluster {
}
pub async fn launch(&self) {
self
.service_discovery()
self.service_discovery()
.events()
.iter()
.filter(|discovery| {

View File

@ -3,24 +3,27 @@ use crate::epidemic::cluster_config::ClusterConfig;
use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use bastion_executor::prelude::*;
use lightproc::{recoverable_handle::RecoverableHandle, proc_stack::ProcStack};
use lightproc::{proc_stack::ProcStack, recoverable_handle::RecoverableHandle};
use std::convert::AsRef;
use std::net::SocketAddr;
use std::{
future::Future,
pin::Pin,
sync::{Arc, mpsc::{channel, Receiver, Sender}},
sync::mpsc::{channel, Receiver, Sender},
task::{Context, Poll},
};
use uuid::Uuid;
pub struct Cluster {
pub events: Receiver<ArtilleryClusterEvent>,
comm: Sender<ArtilleryClusterRequest>
comm: Sender<ArtilleryClusterRequest>,
}
impl Cluster {
pub fn new_cluster(host_key: Uuid, config: ClusterConfig) -> Result<(Self, RecoverableHandle<()>)> {
pub fn new_cluster(
host_key: Uuid,
config: ClusterConfig,
) -> Result<(Self, RecoverableHandle<()>)> {
let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>();
let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>();
@ -36,15 +39,17 @@ impl Cluster {
ProcStack::default(),
);
Ok((Self {
events: event_rx,
comm: internal_tx
}, cluster_handle))
Ok((
Self {
events: event_rx,
comm: internal_tx,
},
cluster_handle,
))
}
pub fn add_seed_node(&self, addr: SocketAddr) {
let _ = self.comm
.send(ArtilleryClusterRequest::AddSeed(addr));
let _ = self.comm.send(ArtilleryClusterRequest::AddSeed(addr));
}
pub fn send_payload<T: AsRef<str>>(&self, id: Uuid, msg: T) {

View File

@ -10,7 +10,6 @@ use crate::epidemic::member;
use bastion_utils::math;
use fail::fail_point;
pub struct ArtilleryMemberList {
members: Vec<ArtilleryMember>,
periodic_index: usize,

View File

@ -9,11 +9,11 @@ use libp2p::{identity, Multiaddr, PeerId};
use lightproc::proc_stack::ProcStack;
use crossbeam_channel::{unbounded, Receiver};
use fail::fail_point;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use fail::fail_point;
pub struct MDNSServiceDiscovery {
events: Arc<Receiver<MDNSServiceDiscoveryEvent>>,

View File

@ -3,10 +3,9 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use std::sync::Once;
use bastion::prelude::*;
use fail::FailScenario;
use std::sync::Once;
//
@ -27,9 +26,14 @@ use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn test_epidemic_periodic_index_fp(port: u16) ->
(Arc<ArtilleryAPCluster>, RecoverableHandle<()>, RecoverableHandle<()>, RecoverableHandle<()>) {
fn test_epidemic_periodic_index_fp(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
@ -65,10 +69,9 @@ fn test_epidemic_periodic_index_fp(port: u16) ->
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(
async move { ap_cluster.launch().await }, cluster_stack);
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
@ -84,7 +87,9 @@ fn test_epidemic_periodic_index_fp(port: u16) ->
}
}
warn!("STOPPED: Event Poller");
}, events_stack);
},
events_stack,
);
(ap_ref, events_handle, cluster_handle, cluster_listener)
}
@ -104,49 +109,56 @@ fn get_port() -> u16 {
static LOGGER_INIT: Once = Once::new();
macro_rules! cluster_fault_recovery_test {
($fp_name:expr) => {
($fp_name:expr) => {
LOGGER_INIT.call_once(|| pretty_env_logger::init());
let scenario = FailScenario::setup();
fail::cfg($fp_name, "panic").unwrap();
// Let's see how reliable you are.
let node1 = spawn_blocking(async {
let (c, events, cluster_handle, cluster_listener) = test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
}
}, ProcStack::default());
let node1 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
let node2 = spawn_blocking(async {
let (c, events, cluster_handle, cluster_listener) = test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
}
}, ProcStack::default());
let node2 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
run(async { future::join(node1, node2).await }, ProcStack::default());
run(
async { future::join(node1, node2).await },
ProcStack::default(),
);
scenario.teardown();
}
};
}
#[test]
fn epidemic_periodic_index_fp() {
cluster_fault_recovery_test!("epidemic-periodic-index-fp");
}

View File

@ -3,10 +3,9 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use std::sync::Once;
use bastion::prelude::*;
use fail::FailScenario;
use std::sync::Once;
//
@ -27,9 +26,14 @@ use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn test_epidemic_periodic_index_fp(port: u16) ->
(Arc<ArtilleryAPCluster>, RecoverableHandle<()>, RecoverableHandle<()>, RecoverableHandle<()>) {
fn test_epidemic_periodic_index_fp(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
@ -65,10 +69,9 @@ fn test_epidemic_periodic_index_fp(port: u16) ->
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(
async move { ap_cluster.launch().await }, cluster_stack);
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
@ -84,7 +87,9 @@ fn test_epidemic_periodic_index_fp(port: u16) ->
}
}
warn!("STOPPED: Event Poller");
}, events_stack);
},
events_stack,
);
(ap_ref, events_handle, cluster_handle, cluster_listener)
}
@ -104,47 +109,55 @@ fn get_port() -> u16 {
static LOGGER_INIT: Once = Once::new();
macro_rules! cluster_fault_recovery_test {
($fp_name:expr) => {
($fp_name:expr) => {
LOGGER_INIT.call_once(|| pretty_env_logger::init());
let scenario = FailScenario::setup();
fail::cfg($fp_name, "panic").unwrap();
// Let's see how reliable you are.
let node1 = spawn_blocking(async {
let (c, events, cluster_handle, cluster_listener) = test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
}
}, ProcStack::default());
let node1 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
let node2 = spawn_blocking(async {
let (c, events, cluster_handle, cluster_listener) = test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
}
}, ProcStack::default());
let node2 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
run(async { future::join(node1, node2).await }, ProcStack::default());
run(
async { future::join(node1, node2).await },
ProcStack::default(),
);
scenario.teardown();
}
};
}
#[test]
fn epidemic_state_change_tail_follow() {
cluster_fault_recovery_test!("epidemic-state-change-tail-follow-fp");