Chaos tests

This commit is contained in:
Mahmut Bulut 2020-03-07 18:38:58 +01:00
parent b22b2752fe
commit 5c8f51468f
13 changed files with 266 additions and 373 deletions

View File

@ -45,6 +45,6 @@ jobs:
toolchain: ${{ matrix.version }}-${{ matrix.toolchain }}
default: true
- name: Failpoint Test (Core)
run: cargo test --features fail/failpoints --verbose
- name: Chaos Tests (Core)
run: cargo test --verbose
working-directory: ./artillery-core

View File

@ -19,6 +19,7 @@ chrono = { version = "0.4", features = ["serde"] }
rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
futures = "0.3"
pin-utils = "0.1.0-alpha.4"
libp2p = "0.16.0"
bastion-executor = "0.3.4"
lightproc = "0.3.4"
@ -31,16 +32,10 @@ clap = "2.33.0"
pretty_env_logger = "0.4.0"
once_cell = "1.3.1"
criterion = "0.3.1"
trybuild = "1.0"
kaos = "0.1"
bastion = "0.3"
[[test]]
name = "epidemic-state-change-tail-follow"
path = "tests/failpoints/epidemic-state-change-tail-follow.rs"
required-features = ["fail/failpoints"]
[[test]]
name = "epidemic-periodic-index"
path = "tests/failpoints/epidemic-periodic-index.rs"
required-features = ["fail/failpoints"]
name = "kaos"
path = "kaos-tests/launcher.rs"

View File

@ -48,8 +48,7 @@ fn main() {
};
// Configure our cluster node
let (cluster, _cluster_listener) = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
let ap_cluster = Arc::new(ArtilleryAPCluster::new(ap_cluster_config).unwrap());
// Launch the cluster node
run(
@ -60,7 +59,7 @@ fn main() {
let ap_events = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn(async move { ap_cluster.launch().await }, cluster_stack);
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(

View File

@ -1,21 +0,0 @@
#[test]
fn test_launcher() {
use std::fs;
let t = trybuild::TestCases::new();
for entry in fs::read_dir("fptests").unwrap() {
let entry = entry.unwrap();
let path = entry.path();
dbg!(path.clone());
if !path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("launcher")
{
t.pass(path);
}
}
}

View File

@ -0,0 +1,157 @@
#[macro_export]
macro_rules! cluster_init {
() => {
use bastion::prelude::*;
use fail::FailScenario;
use std::sync::Once;
//
use std::net::ToSocketAddrs;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use artillery_core::cluster::ap::*;
use futures::future;
use bastion_executor::prelude::*;
use lightproc::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn node_setup(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
sd_config: {
let mut config = MDNSServiceDiscoveryConfig::default();
config.local_service_addr.set_port(port);
config
},
cluster_config: {
let listen_addr = format!("127.0.0.1:{}", port);
ClusterConfig {
listen_addr: (&listen_addr as &str)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
..Default::default()
}
},
};
// Configure our cluster node
let cluster = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
// Launch the cluster node
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);
(ap_ref, events_handle, cluster_handle)
}
fn get_port() -> u16 {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
let port: u16 = rng.gen();
if port > 1025 && port < 65535 {
port
} else {
get_port()
}
}
static LOGGER_INIT: Once = Once::new();
};
}
#[macro_export]
macro_rules! node_spawn {
($node_handle:ident) => {
let $node_handle = spawn_blocking(
async {
let (c, events, cluster_handle) = node_setup(get_port());
match events.await {
Some(a) => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
_ => {
assert!(false);
}
}
},
ProcStack::default(),
);
}
}
#[macro_export]
macro_rules! chaos_unleash {
($fp_name:expr) => {
LOGGER_INIT.call_once(|| pretty_env_logger::init());
let scenario = FailScenario::setup();
fail::cfg($fp_name, "panic").unwrap();
// Let's see how reliable you are.
node_spawn!(node1);
node_spawn!(node2);
node_spawn!(node3);
run(
async {
future::join_all(
vec![node1, node2, node3]
).await
},
ProcStack::default(),
);
scenario.teardown();
};
}

View File

@ -0,0 +1,14 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
#[macro_use]
mod chaos;
use chaos::*;
fn main() {
cluster_init!();
chaos_unleash!("epidemic-periodic-index-fp");
}

View File

@ -0,0 +1,14 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
#[macro_use]
mod chaos;
use chaos::*;
fn main() {
cluster_init!();
chaos_unleash!("epidemic-state-change-tail-follow-fp");
}

View File

@ -0,0 +1,37 @@
#[test]
fn kaos() {
use std::fs;
use std::time::Duration;
let k = kaos::Runs::new();
for entry in fs::read_dir("kaos-tests").unwrap() {
let entry = entry.unwrap();
let path = entry.path();
dbg!(path.clone());
if !path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("launcher")
&&
!path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("mod")
&&
!path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("chaos")
{
// Every service run should be available at least 2 seconds
k.available(path, Duration::from_secs(2));
}
}
}

View File

@ -0,0 +1,6 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod chaos;

View File

@ -4,8 +4,11 @@ use crate::service_discovery::mdns::prelude::*;
use lightproc::prelude::*;
use std::sync::Arc;
use std::{cell::Cell, sync::Arc};
use uuid::Uuid;
use futures::{FutureExt, select};
use pin_utils::pin_mut;
#[derive(Default, Clone)]
pub struct ArtilleryAPClusterConfig {
@ -19,6 +22,7 @@ pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>,
cluster_ev_loop_handle: Cell<RecoverableHandle<()>>,
}
unsafe impl Send for ArtilleryAPCluster {}
@ -27,20 +31,20 @@ unsafe impl Sync for ArtilleryAPCluster {}
pub type DiscoveryLaunch = RecoverableHandle<()>;
impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<(Self, RecoverableHandle<()>)> {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
let (cluster, cluster_listener) =
Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
Ok((
Ok(
Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd),
},
cluster_listener,
))
cluster_ev_loop_handle: Cell::new(cluster_listener)
}
)
}
pub fn cluster(&self) -> Arc<Cluster> {
@ -56,6 +60,23 @@ impl ArtilleryAPCluster {
}
pub async fn launch(&self) {
let (_, eh) = LightProc::recoverable(async {}, |_|(), ProcStack::default());
let ev_loop_handle = self.cluster_ev_loop_handle.replace(eh);
// do fusing
let ev_loop_handle = ev_loop_handle.fuse();
let discover_nodes_handle = self.discover_nodes().fuse();
pin_mut!(ev_loop_handle);
pin_mut!(discover_nodes_handle);
select! {
ev_loop_res = ev_loop_handle => { dbg!(ev_loop_res); ev_loop_res.unwrap() },
_ = discover_nodes_handle => panic!("Node discovery unexpectedly shutdown.")
};
}
async fn discover_nodes(&self) {
self.service_discovery()
.events()
.iter()

View File

@ -31,7 +31,7 @@ impl Cluster {
ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?;
debug!("Starting Artillery Cluster");
let cluster_handle = spawn(
let cluster_handle = spawn_blocking(
async move {
ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state)
.expect("Failed to create event loop");
@ -62,9 +62,8 @@ impl Cluster {
}
pub fn leave_cluster(&self) {
self.comm
.send(ArtilleryClusterRequest::LeaveCluster)
.unwrap();
let _ = self.comm
.send(ArtilleryClusterRequest::LeaveCluster);
}
}

View File

@ -1,164 +0,0 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use bastion::prelude::*;
use fail::FailScenario;
use std::sync::Once;
//
use std::net::ToSocketAddrs;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use artillery_core::cluster::ap::*;
use futures::future;
use bastion_executor::prelude::*;
use lightproc::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn test_epidemic_periodic_index_fp(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
sd_config: {
let mut config = MDNSServiceDiscoveryConfig::default();
config.local_service_addr.set_port(port);
config
},
cluster_config: {
let listen_addr = format!("127.0.0.1:{}", port);
ClusterConfig {
listen_addr: (&listen_addr as &str)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
..Default::default()
}
},
};
// Configure our cluster node
let (cluster, cluster_listener) = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
// Launch the cluster node
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);
(ap_ref, events_handle, cluster_handle, cluster_listener)
}
fn get_port() -> u16 {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
let port: u16 = rng.gen();
if port > 1025 && port < 65535 {
port
} else {
get_port()
}
}
static LOGGER_INIT: Once = Once::new();
macro_rules! cluster_fault_recovery_test {
($fp_name:expr) => {
LOGGER_INIT.call_once(|| pretty_env_logger::init());
let scenario = FailScenario::setup();
fail::cfg($fp_name, "panic").unwrap();
// Let's see how reliable you are.
let node1 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
let node2 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
run(
async { future::join(node1, node2).await },
ProcStack::default(),
);
scenario.teardown();
};
}
#[test]
fn epidemic_periodic_index_fp() {
cluster_fault_recovery_test!("epidemic-periodic-index-fp");
}

View File

@ -1,164 +0,0 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use bastion::prelude::*;
use fail::FailScenario;
use std::sync::Once;
//
use std::net::ToSocketAddrs;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use artillery_core::cluster::ap::*;
use futures::future;
use bastion_executor::prelude::*;
use lightproc::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn test_epidemic_periodic_index_fp(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
sd_config: {
let mut config = MDNSServiceDiscoveryConfig::default();
config.local_service_addr.set_port(port);
config
},
cluster_config: {
let listen_addr = format!("127.0.0.1:{}", port);
ClusterConfig {
listen_addr: (&listen_addr as &str)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
..Default::default()
}
},
};
// Configure our cluster node
let (cluster, cluster_listener) = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
// Launch the cluster node
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);
(ap_ref, events_handle, cluster_handle, cluster_listener)
}
fn get_port() -> u16 {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
let port: u16 = rng.gen();
if port > 1025 && port < 65535 {
port
} else {
get_port()
}
}
static LOGGER_INIT: Once = Once::new();
macro_rules! cluster_fault_recovery_test {
($fp_name:expr) => {
LOGGER_INIT.call_once(|| pretty_env_logger::init());
let scenario = FailScenario::setup();
fail::cfg($fp_name, "panic").unwrap();
// Let's see how reliable you are.
let node1 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
let node2 = spawn_blocking(
async {
let (c, events, cluster_handle, cluster_listener) =
test_epidemic_periodic_index_fp(get_port());
match cluster_listener.await {
Some(_) => assert!(false),
_ => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
}
}
},
ProcStack::default(),
);
run(
async { future::join(node1, node2).await },
ProcStack::default(),
);
scenario.teardown();
};
}
#[test]
fn epidemic_state_change_tail_follow() {
cluster_fault_recovery_test!("epidemic-state-change-tail-follow-fp");
}