Merge pull request #19 from bastion-rs/failpoint-tests

Chaos tests
This commit is contained in:
Mahmut Bulut 2020-03-08 17:46:00 +01:00 committed by GitHub
commit aeead41c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 455 additions and 41 deletions

View File

@ -4,4 +4,8 @@ members = [
"artillery-ddata",
"artillery-core",
"artillery-hierman",
]
]
[profile.release]
lto = "fat"
codegen-units = 1

View File

@ -19,14 +19,20 @@ chrono = { version = "0.4", features = ["serde"] }
rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
futures = "0.3"
pin-utils = "0.1.0-alpha.4"
libp2p = "0.16.0"
bastion-executor = "0.3.4"
lightproc = "0.3.4"
crossbeam-channel = "0.4.2"
kaos = "0.1.1-alpha.2"
[dev-dependencies]
bincode = "1.2.1"
clap = "2.33.0"
pretty_env_logger = "0.4.0"
once_cell = "1.3.1"
criterion = "0.3.1"
criterion = "0.3.1"
[[test]]
name = "chaos_tests"
path = "kaos-tests/launcher.rs"

View File

@ -83,7 +83,7 @@ fn main() {
..Default::default()
};
let cluster = Cluster::new_cluster(host_key, config).unwrap();
let (cluster, _cluster_handle) = Cluster::new_cluster(host_key, config).unwrap();
if let Some(seed_node) = seed_node {
cluster.add_seed_node(FromStr::from_str(&seed_node).unwrap());

View File

@ -138,6 +138,7 @@ fn get_cluster(listen_addr: &str, host_key: Uuid) -> &'static Cluster {
..Default::default()
};
Cluster::new_cluster(host_key, config).unwrap()
let (cluster, _) = Cluster::new_cluster(host_key, config).unwrap();
cluster
})
}

View File

@ -177,6 +177,7 @@ fn get_cluster(listen_addr: &str, host_key: Uuid) -> &'static Cluster {
..Default::default()
};
Cluster::new_cluster(host_key, config).unwrap()
let (cluster, _) = Cluster::new_cluster(host_key, config).unwrap();
cluster
})
}

View File

@ -0,0 +1,155 @@
#[macro_export]
macro_rules! cluster_init {
() => {
use std::sync::Once;
//
use kaos::*;
use std::net::ToSocketAddrs;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use artillery_core::cluster::ap::*;
use futures::future;
use bastion_executor::prelude::*;
use lightproc::prelude::*;
use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn node_setup(
port: u16,
) -> (
Arc<ArtilleryAPCluster>,
RecoverableHandle<()>,
RecoverableHandle<()>,
) {
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
sd_config: {
let mut config = MDNSServiceDiscoveryConfig::default();
config.local_service_addr.set_port(port);
config
},
cluster_config: {
let listen_addr = format!("127.0.0.1:{}", port);
ClusterConfig {
listen_addr: (&listen_addr as &str)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
..Default::default()
}
},
};
// Configure our cluster node
let cluster = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
let ap_cluster = Arc::new(cluster);
// Launch the cluster node
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
let ap_ref = ap_cluster.clone();
// Detach cluster launch
let cluster_handle = spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);
(ap_ref, events_handle, cluster_handle)
}
fn get_port() -> u16 {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
let port: u16 = rng.gen();
if port > 1025 && port < 65535 {
port
} else {
get_port()
}
}
static LOGGER_INIT: Once = Once::new();
LOGGER_INIT.call_once(|| pretty_env_logger::init());
};
}
#[macro_export]
macro_rules! ap_events_check_node_spawn {
($node_handle:ident) => {
let $node_handle = spawn_blocking(
async {
let (c, events, cluster_handle) = node_setup(get_port());
match events.await {
Some(a) => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
_ => {
assert!(false);
}
}
},
ProcStack::default(),
);
}
}
#[macro_export]
macro_rules! ap_sd_check_node_spawn {
($node_handle:ident) => {
let $node_handle = spawn_blocking(
async {
let (c, events, cluster_handle) = node_setup(get_port());
match cluster_handle.await {
Some(a) => {
// Test passed.
warn!("This node is leaving.");
c.shutdown();
warn!("Stopping the setup");
},
_ => {
assert!(false);
}
}
},
ProcStack::default(),
);
}
}

View File

@ -0,0 +1,39 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
cluster_init!();
kaostest!("epidemic-periodic-index-fp",
{
// What Bastion does is doing this asynchronously. Thanks.
let mut restarts = 0;
loop {
restarts += 1;
ap_events_check_node_spawn!(node1);
ap_events_check_node_spawn!(node2);
ap_events_check_node_spawn!(node3);
run(
async {
future::join_all(
vec![node1, node2, node3]
).await
},
ProcStack::default(),
);
if restarts == 10 {
break;
}
}
}
);
}

View File

@ -0,0 +1,29 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
cluster_init!();
kaostest!("epidemic-periodic-index-fp",
{
ap_events_check_node_spawn!(node1);
ap_events_check_node_spawn!(node2);
ap_events_check_node_spawn!(node3);
run(
async {
future::join_all(
vec![node1, node2, node3]
).await
},
ProcStack::default(),
);
}
);
}

View File

@ -0,0 +1,28 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
cluster_init!();
kaostest!("epidemic-state-change-tail-follow-fp",
{
ap_events_check_node_spawn!(node1);
ap_events_check_node_spawn!(node2);
ap_events_check_node_spawn!(node3);
run(
async {
future::join_all(
vec![node1, node2, node3]
).await
},
ProcStack::default(),
);
}
);
}

View File

@ -0,0 +1,56 @@
#[test]
fn chaos_tests() {
use std::fs;
use std::time::Duration;
let k = kaos::Runs::new();
for entry in fs::read_dir("kaos-tests").unwrap() {
let entry = entry.unwrap();
let path = entry.path();
dbg!(path.clone());
if !path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("launcher") // Filter out itself
&& !path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("mod") // Filter out module hierarchy
&& !path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("base")
// Filter out common code as test
{
if path
.clone()
.into_os_string()
.into_string()
.unwrap()
.contains("chaotic")
// Chaotic test rather than availability
{
// Let's have 5 varying runs.
let run_count = 5;
// Minimum availability to expect as milliseconds for the runs.
// Which corresponds as maximum surge between service runs.
// Let's have it 10 seconds.
let max_surge = 10 * 1000;
// Run chaotic test.
k.chaotic(path, run_count, max_surge);
} else {
// Every service run should be available at least 2 seconds
k.available(path, Duration::from_secs(2));
}
}
}
}

View File

@ -0,0 +1,28 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
cluster_init!();
kaostest!("mdns-protocol-fp",
{
ap_sd_check_node_spawn!(node1);
ap_sd_check_node_spawn!(node2);
ap_sd_check_node_spawn!(node3);
run(
async {
future::join_all(
vec![node1, node2, node3]
).await
},
ProcStack::default(),
);
}
);
}

View File

@ -0,0 +1,6 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;

View File

@ -0,0 +1,16 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
// cluster_init!();
// "udp-anycast-dgram-oop-fp"
// TODO: This will obviously pass because AP cluster doesn't use UDP anycast by default.
// Fix it after having different prepared cluster.
std::thread::sleep(std::time::Duration::from_secs(3));
}

View File

@ -0,0 +1,16 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
mod base;
use base::*;
fn main() {
// cluster_init!();
// "udp-anycast-reply-dgram-oop-fp"
// TODO: This will obviously pass because AP cluster doesn't use UDP anycast by default.
// Fix it after having different prepared cluster.
std::thread::sleep(std::time::Duration::from_secs(3));
}

View File

@ -4,9 +4,9 @@ use crate::service_discovery::mdns::prelude::*;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use futures::{select, FutureExt};
use pin_utils::pin_mut;
use std::{cell::Cell, sync::Arc};
use uuid::Uuid;
#[derive(Default, Clone)]
@ -21,6 +21,7 @@ pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>,
cluster_ev_loop_handle: Cell<RecoverableHandle<()>>,
}
unsafe impl Send for ArtilleryAPCluster {}
@ -32,12 +33,14 @@ impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
let cluster = Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
let (cluster, cluster_listener) =
Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
Ok(Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd),
cluster_ev_loop_handle: Cell::new(cluster_listener),
})
}
@ -49,22 +52,34 @@ impl ArtilleryAPCluster {
self.sd.clone()
}
pub fn launch(&self) -> impl Future<Output = ()> + '_ {
let config = self.config.clone();
let events = self.service_discovery().events();
let cluster = self.cluster.clone();
pub fn shutdown(&self) {
self.cluster().leave_cluster();
}
async {
let config_inner = config;
let events_inner = events;
let cluster_inner = cluster;
pub async fn launch(&self) {
let (_, eh) = LightProc::recoverable(async {}, |_| (), ProcStack::default());
let ev_loop_handle = self.cluster_ev_loop_handle.replace(eh);
events_inner
.iter()
.filter(|discovery| {
discovery.get().port() != config_inner.sd_config.local_service_addr.port()
})
.for_each(|discovery| cluster_inner.add_seed_node(discovery.get()))
}
// do fusing
let ev_loop_handle = ev_loop_handle.fuse();
let discover_nodes_handle = self.discover_nodes().fuse();
pin_mut!(ev_loop_handle);
pin_mut!(discover_nodes_handle);
select! {
ev_loop_res = ev_loop_handle => { dbg!(ev_loop_res); ev_loop_res.unwrap() },
_ = discover_nodes_handle => panic!("Node discovery unexpectedly shutdown.")
};
}
async fn discover_nodes(&self) {
self.service_discovery()
.events()
.iter()
.filter(|discovery| {
discovery.get().port() != self.config.sd_config.local_service_addr.port()
})
.for_each(|discovery| self.cluster.add_seed_node(discovery.get()))
}
}

View File

@ -2,8 +2,8 @@ use super::state::ArtilleryEpidemic;
use crate::epidemic::cluster_config::ClusterConfig;
use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_stack::ProcStack;
use bastion_executor::prelude::*;
use lightproc::{proc_stack::ProcStack, recoverable_handle::RecoverableHandle};
use std::convert::AsRef;
use std::net::SocketAddr;
use std::{
@ -20,7 +20,10 @@ pub struct Cluster {
}
impl Cluster {
pub fn new_cluster(host_key: Uuid, config: ClusterConfig) -> Result<Self> {
pub fn new_cluster(
host_key: Uuid,
config: ClusterConfig,
) -> Result<(Self, RecoverableHandle<()>)> {
let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>();
let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>();
@ -28,7 +31,7 @@ impl Cluster {
ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?;
debug!("Starting Artillery Cluster");
let _cluster_handle = spawn_blocking(
let cluster_handle = spawn_blocking(
async move {
ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state)
.expect("Failed to create event loop");
@ -36,16 +39,17 @@ impl Cluster {
ProcStack::default(),
);
Ok(Self {
events: event_rx,
comm: internal_tx,
})
Ok((
Self {
events: event_rx,
comm: internal_tx,
},
cluster_handle,
))
}
pub fn add_seed_node(&self, addr: SocketAddr) {
self.comm
.send(ArtilleryClusterRequest::AddSeed(addr))
.unwrap();
let _ = self.comm.send(ArtilleryClusterRequest::AddSeed(addr));
}
pub fn send_payload<T: AsRef<str>>(&self, id: Uuid, msg: T) {
@ -58,9 +62,7 @@ impl Cluster {
}
pub fn leave_cluster(&self) {
self.comm
.send(ArtilleryClusterRequest::LeaveCluster)
.unwrap();
let _ = self.comm.send(ArtilleryClusterRequest::LeaveCluster);
}
}
@ -82,7 +84,7 @@ impl Drop for Cluster {
fn drop(&mut self) {
let (tx, rx) = channel();
self.comm.send(ArtilleryClusterRequest::Exit(tx)).unwrap();
let _ = self.comm.send(ArtilleryClusterRequest::Exit(tx));
rx.recv().unwrap();
}

View File

@ -9,6 +9,8 @@ use super::member::{ArtilleryMember, ArtilleryMemberState, ArtilleryStateChange}
use crate::epidemic::member;
use bastion_utils::math;
use kaos::flunk;
pub struct ArtilleryMemberList {
members: Vec<ArtilleryMember>,
periodic_index: usize,
@ -72,6 +74,7 @@ impl ArtilleryMemberList {
if other_members.is_empty() {
None
} else {
flunk!("epidemic-periodic-index-fp");
self.periodic_index = (self.periodic_index + 1) % other_members.len();
Some(other_members[self.periodic_index].clone())
}

View File

@ -16,10 +16,12 @@ use std::sync::mpsc::{Receiver, Sender};
use std::time::Duration;
use uuid::Uuid;
use failure::_core::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use kaos::flunk;
use crate::constants::*;
pub type ArtilleryClusterEvent = (Vec<ArtilleryMember>, ArtilleryMemberEvent);
@ -493,6 +495,7 @@ fn build_message(
};
for i in 0..=state_changes.len() {
flunk!("epidemic-state-change-tail-follow-fp");
message = ArtilleryMessage {
sender: *sender,
cluster_key: cluster_key.into(),

View File

@ -30,7 +30,7 @@
clippy::maybe_infinite_iter,
clippy::mem_forget,
clippy::multiple_inherent_impl,
clippy::mut_mut,
// clippy::mut_mut, // TODO: because select macro does this. Not us. Sigh.
clippy::needless_borrow,
clippy::needless_continue,
clippy::needless_pass_by_value,

View File

@ -9,6 +9,7 @@ use libp2p::{identity, Multiaddr, PeerId};
use lightproc::proc_stack::ProcStack;
use crossbeam_channel::{unbounded, Receiver};
use kaos::flunk;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@ -62,6 +63,7 @@ impl MDNSServiceDiscovery {
for addr in peer.addresses() {
debug!(" Address = {:?}", addr);
let components = addr.iter().collect::<Vec<_>>();
flunk!("mdns-protocol-fp");
if let Protocol::Ip4(discovered_ip) = components[0] {
if let Protocol::Udp(discovered_port) = components[1] {
let discovered =

View File

@ -15,6 +15,8 @@ use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender};
use std::time::{Duration, Instant};
use kaos::flunk;
#[derive(Serialize, Deserialize, Debug, Clone, PartialOrd, PartialEq, Ord, Eq)]
/// Default acknowledgement reply for the Discovery.
pub struct ServiceDiscoveryReply {
@ -151,6 +153,7 @@ impl MulticastServiceDiscoveryState {
while let Some(peer_addr) = self.seeker_replies.pop_front() {
let mut sent_bytes = 0;
while sent_bytes != discovery_reply.len() {
flunk!("udp-anycast-reply-dgram-oob-fp");
if let Ok(bytes_tx) = self
.server_socket
.send_to(&discovery_reply[sent_bytes..], peer_addr)
@ -170,6 +173,7 @@ impl MulticastServiceDiscoveryState {
SEEK_NODES => {
let mut sent_bytes = 0;
while sent_bytes != self.seek_request.len() {
flunk!("udp-anycast-dgram-oob-fp");
if let Ok(bytes_tx) = self
.server_socket
.send_to(&self.seek_request[sent_bytes..], self.config.seeking_addr)