Merge pull request #12 from bastion-rs/async-cluster-construct

Async cluster construction
This commit is contained in:
Mahmut Bulut 2020-02-29 19:27:48 +01:00 committed by GitHub
commit fe12258863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 330 additions and 199 deletions

View File

@ -18,9 +18,11 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
futures = "0.3"
libp2p = "0.16.0"
bastion-executor = "0.3.4"
lightproc = "0.3.4"
crossbeam-channel = "0.4.2"
[dev-dependencies]
bincode = "1.2.1"

View File

@ -3,39 +3,28 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use clap::*;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::ToSocketAddrs;
use std::path::Path;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use once_cell::sync::OnceCell;
use serde::*;
use artillery_core::cluster::ap::*;
use futures::future;
use std::thread;
use std::time::Duration;
use artillery_core::cluster::ap_cluster::*;
use bastion_executor::prelude::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ExampleSDReply {
ip: String,
port: u16,
}
use std::sync::Arc;
fn main() {
pretty_env_logger::init();
// Let's find a broadcast port
let port = get_port();
// Initialize our cluster configuration
let ap_cluster_config = ArtilleryAPClusterConfig {
app_name: String::from("artillery-ap"),
node_id: Uuid::new_v4(),
@ -58,8 +47,45 @@ fn main() {
},
};
let ap_cluster = ArtilleryAPCluster::new(ap_cluster_config).unwrap();
spawn_blocking(async { ap_cluster.launch().await }, ProcStack::default());
// Configure our cluster node
let ap_cluster = Arc::new(ArtilleryAPCluster::new(ap_cluster_config).unwrap());
// Launch the cluster node
run(
async {
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
// Detach cluster launch
let cluster_handle =
spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().clone().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
},
events_stack,
);
future::join(events_handle, cluster_handle).await
},
ProcStack::default().with_pid(1),
);
}
fn get_port() -> u16 {

View File

@ -71,7 +71,7 @@ fn main() {
.expect("cannot start cluster-event-poller");
thread::sleep(Duration::from_secs(1));
for discovery in sd.events {
for discovery in sd.events().iter() {
if discovery.get().port() != this_node_cluster_port {
cluster.add_seed_node(discovery.get());
}

View File

@ -1,17 +1,13 @@
use crate::epidemic::prelude::*;
use crate::service_discovery::mdns::prelude::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;
use uuid::Uuid;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Receiver;
use lightproc::prelude::*;
use crate::errors::*;
use std::rc::Rc;
use std::future::Future;
use std::error::Error;
use crate::service_discovery::mdns::prelude::*;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Default, Clone)]
pub struct ArtilleryAPClusterConfig {
@ -24,7 +20,7 @@ pub struct ArtilleryAPClusterConfig {
pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>
sd: Arc<MDNSServiceDiscovery>,
}
unsafe impl Send for ArtilleryAPCluster {}
@ -34,20 +30,14 @@ pub type DiscoveryLaunch = RecoverableHandle<()>;
impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
let sd =
MDNSServiceDiscovery::new_service_discovery(
config.sd_config.clone())?;
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
let cluster =
Cluster::new_cluster(
config.node_id,
config.cluster_config.clone()
)?;
let cluster = Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
Ok(Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd)
sd: Arc::new(sd),
})
}
@ -59,7 +49,7 @@ impl ArtilleryAPCluster {
self.sd.clone()
}
pub fn launch(&self) -> impl Future<Output=()> + '_ {
pub fn launch(&self) -> impl Future<Output = ()> + '_ {
let config = self.config.clone();
let events = self.service_discovery().events();
let cluster = self.cluster.clone();
@ -71,7 +61,9 @@ impl ArtilleryAPCluster {
events_inner
.iter()
.filter(|discovery| discovery.get().port() != config_inner.sd_config.local_service_addr.port())
.filter(|discovery| {
discovery.get().port() != config_inner.sd_config.local_service_addr.port()
})
.for_each(|discovery| cluster_inner.add_seed_node(discovery.get()))
}
}

View File

@ -1 +1 @@
pub mod ap_cluster;
pub mod ap;

View File

@ -1,10 +1,17 @@
use super::state::ArtilleryState;
use super::state::ArtilleryEpidemic;
use crate::epidemic::cluster_config::ClusterConfig;
use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_stack::ProcStack;
use std::convert::AsRef;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::{
future::Future,
pin::Pin,
sync::mpsc::{channel, Receiver, Sender},
task::{Context, Poll},
};
use uuid::Uuid;
pub struct Cluster {
@ -17,16 +24,17 @@ impl Cluster {
let (event_tx, event_rx) = channel::<ArtilleryClusterEvent>();
let (internal_tx, mut internal_rx) = channel::<ArtilleryClusterRequest>();
let (poll, state) = ArtilleryState::new(host_key, config, event_tx, internal_tx.clone())?;
let (poll, state) =
ArtilleryEpidemic::new(host_key, config, event_tx, internal_tx.clone())?;
debug!("Starting Artillery Cluster");
std::thread::Builder::new()
.name("artillery-epidemic-cluster-state".to_string())
.spawn(move || {
ArtilleryState::event_loop(&mut internal_rx, poll, state)
let _cluster_handle = spawn_blocking(
async move {
ArtilleryEpidemic::event_loop(&mut internal_rx, poll, state)
.expect("Failed to create event loop");
})
.expect("cannot start epidemic cluster state management thread");
},
ProcStack::default(),
);
Ok(Self {
events: event_rx,
@ -59,12 +67,10 @@ impl Cluster {
impl Future for Cluster {
type Output = ArtilleryClusterEvent;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
return match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}

View File

@ -176,6 +176,10 @@ pub fn most_uptodate_member_data<'a>(
lhs: &'a ArtilleryMember,
rhs: &'a ArtilleryMember,
) -> &'a ArtilleryMember {
// Don't apply clippy here.
// It's important bit otherwise we won't understand.
#![allow(clippy::match_same_arms)]
let lhs_overrides = match (
lhs.member_state,
lhs.incarnation_number,

View File

@ -38,7 +38,7 @@ impl ArtilleryMemberList {
}
fn mut_myself(&mut self) -> &mut ArtilleryMember {
for member in self.members.iter_mut() {
for member in &mut self.members {
if member.is_current() {
return member;
}
@ -79,25 +79,32 @@ impl ArtilleryMemberList {
pub fn time_out_nodes(
&mut self,
expired_hosts: HashSet<SocketAddr>,
expired_hosts: &HashSet<SocketAddr>,
) -> (Vec<ArtilleryMember>, Vec<ArtilleryMember>) {
let mut suspect_members = Vec::new();
let mut down_members = Vec::new();
for member in self.members.iter_mut() {
for member in &mut self.members {
if let Some(remote_host) = member.remote_host() {
if !expired_hosts.contains(&remote_host) {
continue;
}
if member.state() == ArtilleryMemberState::Alive {
member.set_state(ArtilleryMemberState::Suspect);
suspect_members.push(member.clone());
} else if member.state() == ArtilleryMemberState::Suspect
&& member.state_change_older_than(Duration::seconds(3))
{
member.set_state(ArtilleryMemberState::Down);
down_members.push(member.clone());
match member.state() {
ArtilleryMemberState::Alive => {
member.set_state(ArtilleryMemberState::Suspect);
suspect_members.push(member.clone());
}
// TODO: Config suspect timeout
ArtilleryMemberState::Suspect
if member.state_change_older_than(Duration::seconds(3)) =>
{
member.set_state(ArtilleryMemberState::Down);
down_members.push(member.clone());
}
ArtilleryMemberState::Suspect
| ArtilleryMemberState::Down
| ArtilleryMemberState::Left => {}
}
}
}
@ -106,7 +113,7 @@ impl ArtilleryMemberList {
}
pub fn mark_node_alive(&mut self, src_addr: &SocketAddr) -> Option<ArtilleryMember> {
for member in self.members.iter_mut() {
for member in &mut self.members {
if member.remote_host() == Some(*src_addr)
&& member.state() != ArtilleryMemberState::Alive
{
@ -144,8 +151,7 @@ impl ArtilleryMemberList {
match old_member_data {
Entry::Occupied(mut entry) => {
let new_member =
member::most_uptodate_member_data(&new_member_data, entry.get())
.clone();
member::most_uptodate_member_data(new_member_data, entry.get()).clone();
let new_host = new_member
.remote_host()
.or_else(|| entry.get().remote_host())
@ -173,6 +179,9 @@ impl ArtilleryMemberList {
(new_nodes, changed_nodes)
}
///
///
/// Random ping enqueuing
pub fn hosts_for_indirect_ping(
&self,
host_count: usize,
@ -181,12 +190,16 @@ impl ArtilleryMemberList {
let mut possible_members: Vec<_> = self
.members
.iter()
.filter(|m| {
m.state() == ArtilleryMemberState::Alive
.filter_map(|m| {
if m.state() == ArtilleryMemberState::Alive
&& m.is_remote()
&& m.remote_host() != Some(*target)
{
m.remote_host()
} else {
None
}
})
.map(|m| m.remote_host().unwrap())
.collect();
math::shuffle_linear(&mut possible_members);
@ -197,14 +210,15 @@ impl ArtilleryMemberList {
pub fn has_member(&self, remote_host: &SocketAddr) -> bool {
self.members
.iter()
.any(|ref m| m.remote_host() == Some(*remote_host))
.any(|m| m.remote_host() == Some(*remote_host))
}
pub fn add_member(&mut self, member: ArtilleryMember) {
self.members.push(member)
}
/// get_member will return artillery member if the given uuid is matches with any of the
///
/// `get_member` will return artillery member if the given uuid is matches with any of the
/// member in the cluster.
pub fn get_member(&self, id: &Uuid) -> Option<ArtilleryMember> {
let member: Vec<_> = self

View File

@ -9,6 +9,7 @@ use mio::{Events, Interest, Poll, Token};
use serde::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::io;
use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender};
@ -26,12 +27,12 @@ pub type WaitList = HashMap<SocketAddr, Vec<SocketAddr>>;
#[derive(Debug)]
pub enum ArtilleryMemberEvent {
MemberJoined(ArtilleryMember),
MemberWentUp(ArtilleryMember),
MemberSuspectedDown(ArtilleryMember),
MemberWentDown(ArtilleryMember),
MemberLeft(ArtilleryMember),
MemberPayload(ArtilleryMember, String),
Joined(ArtilleryMember),
WentUp(ArtilleryMember),
SuspectedDown(ArtilleryMember),
WentDown(ArtilleryMember),
Left(ArtilleryMember),
Payload(ArtilleryMember, String),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -47,9 +48,9 @@ struct EncSocketAddr(SocketAddr);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
enum Request {
Ping,
Heartbeat,
Ack,
PingRequest(EncSocketAddr),
Ping(EncSocketAddr),
AckHost(ArtilleryMember),
Payload(Uuid, String),
}
@ -72,7 +73,7 @@ pub enum ArtilleryClusterRequest {
const UDP_SERVER: Token = Token(0);
pub struct ArtilleryState {
pub struct ArtilleryEpidemic {
host_key: Uuid,
config: ClusterConfig,
members: ArtilleryMemberList,
@ -86,9 +87,9 @@ pub struct ArtilleryState {
running: AtomicBool,
}
pub type ClusterReactor = (Poll, ArtilleryState);
pub type ClusterReactor = (Poll, ArtilleryEpidemic);
impl ArtilleryState {
impl ArtilleryEpidemic {
pub fn new(
host_key: Uuid,
config: ClusterConfig,
@ -104,7 +105,7 @@ impl ArtilleryState {
let me = ArtilleryMember::current(host_key);
let state = ArtilleryState {
let state = ArtilleryEpidemic {
host_key,
config,
members: ArtilleryMemberList::new(me.clone()),
@ -124,13 +125,15 @@ impl ArtilleryState {
pub(crate) fn event_loop(
receiver: &mut Receiver<ArtilleryClusterRequest>,
mut poll: Poll,
mut state: ArtilleryState,
mut state: ArtilleryEpidemic,
) -> Result<()> {
let mut events = Events::with_capacity(1);
let mut buf = [0_u8; CONST_PACKET_SIZE];
let mut start = Instant::now();
let timeout = Duration::from_millis(state.config.ping_interval.num_milliseconds() as u64);
let timeout = Duration::from_millis(u64::try_from(
state.config.ping_interval.num_milliseconds(),
)?);
debug!("Starting Event Loop");
// Our event loop.
@ -166,8 +169,8 @@ impl ArtilleryState {
// Process inbound events
for event in events.iter() {
match event.token() {
UDP_SERVER => loop {
if let UDP_SERVER = event.token() {
loop {
match state.server_socket.recv_from(&mut buf) {
Ok((packet_size, source_address)) => {
let message = serde_json::from_slice(&buf[..packet_size])?;
@ -186,7 +189,7 @@ impl ArtilleryState {
// If it was any other kind of error, something went
// wrong and we terminate with an error.
bail!(
ArtilleryError::UnexpectedError,
ArtilleryError::Unexpected,
format!(
"Unexpected error occured in event loop: {}",
e.to_string()
@ -194,10 +197,9 @@ impl ArtilleryState {
)
}
}
},
_ => {
warn!("Got event for unexpected token: {:?}", event);
}
} else {
warn!("Got event for unexpected token: {:?}", event);
}
}
}
@ -206,16 +208,17 @@ impl ArtilleryState {
Ok(())
}
fn process_request(&mut self, request: TargetedRequest) {
fn process_request(&mut self, request: &TargetedRequest) {
use Request::*;
let timeout = Utc::now() + self.config.ping_timeout;
let should_add_pending = request.request == Ping;
// It was Ping before
let should_add_pending = request.request == Heartbeat;
let message = build_message(
&self.host_key,
&self.config.cluster_key,
request.request,
self.state_changes.clone(),
&request.request,
&self.state_changes,
self.config.network_mtu,
);
@ -229,14 +232,14 @@ impl ArtilleryState {
assert!(encoded.len() < self.config.network_mtu);
let buf = encoded.as_bytes();
self.server_socket.send_to(&buf, request.target).unwrap();
self.server_socket.send_to(buf, request.target).unwrap();
}
fn enqueue_seed_nodes(&self) {
for seed_node in &self.seed_queue {
self.request_tx
.send(ArtilleryClusterRequest::React(TargetedRequest {
request: Request::Ping,
request: Request::Heartbeat,
target: *seed_node,
}))
.unwrap();
@ -247,7 +250,7 @@ impl ArtilleryState {
if let Some(member) = self.members.next_random_member() {
self.request_tx
.send(ArtilleryClusterRequest::React(TargetedRequest {
request: Request::Ping,
request: Request::Heartbeat,
target: member.remote_host().unwrap(),
}))
.unwrap();
@ -267,18 +270,18 @@ impl ArtilleryState {
self.pending_responses = remaining;
let (suspect, down) = self.members.time_out_nodes(expired_hosts);
let (suspect, down) = self.members.time_out_nodes(&expired_hosts);
enqueue_state_change(&mut self.state_changes, &down);
enqueue_state_change(&mut self.state_changes, &suspect);
for member in suspect {
self.send_ping_requests(&member);
self.send_member_event(ArtilleryMemberEvent::MemberSuspectedDown(member.clone()));
self.send_member_event(ArtilleryMemberEvent::SuspectedDown(member.clone()));
}
for member in down {
self.send_member_event(ArtilleryMemberEvent::MemberWentDown(member.clone()));
self.send_member_event(ArtilleryMemberEvent::WentDown(member.clone()));
}
}
@ -290,7 +293,7 @@ impl ArtilleryState {
{
self.request_tx
.send(ArtilleryClusterRequest::React(TargetedRequest {
request: Request::PingRequest(EncSocketAddr::from_addr(&target_host)),
request: Request::Ping(EncSocketAddr::from_addr(&target_host)),
target: relay,
}))
.unwrap();
@ -306,7 +309,7 @@ impl ArtilleryState {
Respond(src_addr, message) => self.respond_to_message(src_addr, message),
React(request) => {
self.prune_timed_out_responses();
self.process_request(request);
self.process_request(&request);
}
LeaveCluster => {
let myself = self.members.leave();
@ -319,7 +322,7 @@ impl ArtilleryState {
return None;
}
self.process_request(TargetedRequest {
self.process_request(&TargetedRequest {
request: Request::Payload(id, msg),
target: target_peer
.remote_host()
@ -341,16 +344,14 @@ impl ArtilleryState {
fn respond_to_message(&mut self, src_addr: SocketAddr, message: ArtilleryMessage) {
use Request::*;
if message.cluster_key != self.config.cluster_key {
error!("Mismatching cluster keys, ignoring message");
} else {
if message.cluster_key == self.config.cluster_key {
self.apply_state_changes(message.state_changes, src_addr);
remove_potential_seed(&mut self.seed_queue, src_addr);
self.ensure_node_is_member(src_addr, message.sender);
let response = match message.request {
Ping => Some(TargetedRequest {
Heartbeat => Some(TargetedRequest {
request: Ack,
target: src_addr,
}),
@ -359,11 +360,11 @@ impl ArtilleryState {
self.mark_node_alive(src_addr);
None
}
PingRequest(dest_addr) => {
Ping(dest_addr) => {
let EncSocketAddr(dest_addr) = dest_addr;
add_to_wait_list(&mut self.wait_list, &dest_addr, &src_addr);
Some(TargetedRequest {
request: Ping,
request: Heartbeat,
target: dest_addr,
})
}
@ -374,7 +375,7 @@ impl ArtilleryState {
}
Payload(peer_id, msg) => {
if let Some(member) = self.members.get_member(&peer_id) {
self.send_member_event(ArtilleryMemberEvent::MemberPayload(member, msg));
self.send_member_event(ArtilleryMemberEvent::Payload(member, msg));
} else {
warn!("Got payload request from an unknown peer {}", peer_id);
}
@ -387,13 +388,15 @@ impl ArtilleryState {
.send(ArtilleryClusterRequest::React(response))
.unwrap()
}
} else {
error!("Mismatching cluster keys, ignoring message");
}
}
fn ack_response(&mut self, src_addr: SocketAddr) {
let mut to_remove = Vec::new();
for &(ref t, ref addr, ref state_changes) in self.pending_responses.iter() {
for &(ref t, ref addr, ref state_changes) in &self.pending_responses {
if src_addr != *addr {
continue;
}
@ -420,19 +423,18 @@ impl ArtilleryState {
self.members.add_member(new_member.clone());
enqueue_state_change(&mut self.state_changes, &[new_member.clone()]);
self.send_member_event(ArtilleryMemberEvent::MemberJoined(new_member));
self.send_member_event(ArtilleryMemberEvent::Joined(new_member));
}
fn send_member_event(&self, event: ArtilleryMemberEvent) {
use ArtilleryMemberEvent::*;
match event {
MemberJoined(_) => {}
MemberWentUp(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Alive),
MemberWentDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Down),
MemberSuspectedDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Suspect),
MemberLeft(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Left),
_ => {}
Joined(_) | Payload(..) => {}
WentUp(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Alive),
WentDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Down),
SuspectedDown(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Suspect),
Left(ref m) => assert_eq!(m.state(), ArtilleryMemberState::Left),
};
self.event_tx
@ -447,7 +449,7 @@ impl ArtilleryState {
enqueue_state_change(&mut self.state_changes, &changed);
for member in new {
self.send_member_event(ArtilleryMemberEvent::MemberJoined(member));
self.send_member_event(ArtilleryMemberEvent::Joined(member));
}
for member in changed {
@ -471,7 +473,7 @@ impl ArtilleryState {
}
enqueue_state_change(&mut self.state_changes, &[member.clone()]);
self.send_member_event(ArtilleryMemberEvent::MemberWentUp(member));
self.send_member_event(ArtilleryMemberEvent::WentUp(member));
}
}
}
@ -479,8 +481,8 @@ impl ArtilleryState {
fn build_message(
sender: &Uuid,
cluster_key: &[u8],
request: Request,
state_changes: Vec<ArtilleryStateChange>,
request: &Request,
state_changes: &[ArtilleryStateChange],
network_mtu: usize,
) -> ArtilleryMessage {
let mut message = ArtilleryMessage {
@ -523,14 +525,11 @@ fn remove_potential_seed(seed_queue: &mut Vec<SocketAddr>, src_addr: SocketAddr)
}
fn determine_member_event(member: ArtilleryMember) -> ArtilleryMemberEvent {
use ArtilleryMemberEvent::*;
use ArtilleryMemberState::*;
match member.state() {
Alive => MemberWentUp(member),
Suspect => MemberSuspectedDown(member),
Down => MemberWentDown(member),
Left => MemberLeft(member),
ArtilleryMemberState::Alive => ArtilleryMemberEvent::WentUp(member),
ArtilleryMemberState::Suspect => ArtilleryMemberEvent::SuspectedDown(member),
ArtilleryMemberState::Down => ArtilleryMemberEvent::WentDown(member),
ArtilleryMemberState::Left => ArtilleryMemberEvent::Left(member),
}
}

View File

@ -11,48 +11,56 @@ pub type Result<T> = result::Result<T, ArtilleryError>;
pub enum ArtilleryError {
// General Error Types
#[fail(display = "Artillery :: Orphan Node Error: {}", _0)]
OrphanNodeError(String),
OrphanNode(String),
#[fail(display = "Artillery :: I/O error occurred: {}", _0)]
IoError(io::Error),
Io(io::Error),
#[fail(display = "Artillery :: Cluster Message Decode Error: {}", _0)]
ClusterMessageDecodeError(String),
ClusterMessageDecode(String),
#[fail(display = "Artillery :: Message Send Error: {}", _0)]
SendError(String),
Send(String),
#[fail(display = "Artillery :: Message Receive Error: {}", _0)]
ReceiveError(String),
Receive(String),
#[fail(display = "Artillery :: Unexpected Error: {}", _0)]
UnexpectedError(String),
Unexpected(String),
#[fail(display = "Artillery :: Decoding Error: {}", _0)]
DecodingError(String),
Decoding(String),
#[fail(display = "Artillery :: Numeric Cast Error: {}", _0)]
NumericCast(String),
}
impl From<io::Error> for ArtilleryError {
fn from(e: io::Error) -> Self {
ArtilleryError::IoError(e)
ArtilleryError::Io(e)
}
}
impl From<serde_json::error::Error> for ArtilleryError {
fn from(e: serde_json::error::Error) -> Self {
ArtilleryError::ClusterMessageDecodeError(e.to_string())
ArtilleryError::ClusterMessageDecode(e.to_string())
}
}
impl<T> From<std::sync::mpsc::SendError<T>> for ArtilleryError {
fn from(e: SendError<T>) -> Self {
ArtilleryError::SendError(e.to_string())
ArtilleryError::Send(e.to_string())
}
}
impl From<std::sync::mpsc::RecvError> for ArtilleryError {
fn from(e: RecvError) -> Self {
ArtilleryError::ReceiveError(e.to_string())
ArtilleryError::Receive(e.to_string())
}
}
impl From<std::str::Utf8Error> for ArtilleryError {
fn from(e: std::str::Utf8Error) -> Self {
ArtilleryError::DecodingError(e.to_string())
ArtilleryError::Decoding(e.to_string())
}
}
impl From<std::num::TryFromIntError> for ArtilleryError {
fn from(e: std::num::TryFromIntError) -> Self {
ArtilleryError::NumericCast(e.to_string())
}
}

View File

@ -1,3 +1,66 @@
#![deny(
clippy::unimplemented,
clippy::wildcard_enum_match_arm,
clippy::else_if_without_else,
clippy::float_arithmetic,
// clippy::indexing_slicing, // XXX: Enable for failpoint discovery
clippy::cast_lossless,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::checked_conversions,
clippy::decimal_literal_representation,
clippy::doc_markdown,
clippy::empty_enum,
clippy::expl_impl_clone_on_copy,
clippy::explicit_into_iter_loop,
clippy::explicit_iter_loop,
clippy::fallible_impl_from,
clippy::filter_map,
clippy::filter_map_next,
clippy::find_map,
clippy::get_unwrap,
clippy::if_not_else,
clippy::inline_always,
clippy::invalid_upcast_comparisons,
clippy::items_after_statements,
clippy::map_flatten,
clippy::match_same_arms,
clippy::maybe_infinite_iter,
clippy::mem_forget,
clippy::multiple_inherent_impl,
clippy::mut_mut,
clippy::needless_borrow,
clippy::needless_continue,
clippy::needless_pass_by_value,
clippy::non_ascii_literal,
clippy::option_map_unwrap_or,
clippy::option_map_unwrap_or_else,
clippy::path_buf_push_overwrite,
clippy::print_stdout,
clippy::pub_enum_variant_names,
clippy::redundant_closure_for_method_calls,
clippy::replace_consts,
clippy::result_map_unwrap_or_else,
clippy::shadow_reuse,
clippy::shadow_same,
clippy::shadow_unrelated,
clippy::single_match_else,
clippy::string_add,
clippy::string_add_assign,
clippy::type_repetition_in_bounds,
clippy::unicode_not_nfc,
clippy::unseparated_literal_suffix,
clippy::used_underscore_binding,
clippy::wildcard_dependencies,
clippy::wrong_pub_self_convention,
// TODO: Write docs and constantization. Nice word, constantization. I found it. Thanks.
// clippy::missing_const_for_fn,
// clippy::missing_docs_in_private_items,
)]
#[macro_use]
extern crate log;

View File

@ -8,12 +8,11 @@ use libp2p::multiaddr::Protocol;
use libp2p::{identity, Multiaddr, PeerId};
use lightproc::proc_stack::ProcStack;
use std::sync::mpsc::{channel, Receiver};
use crossbeam_channel::{unbounded, Receiver};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct MDNSServiceDiscovery {
events: Arc<Receiver<MDNSServiceDiscoveryEvent>>,
@ -24,7 +23,7 @@ unsafe impl Sync for MDNSServiceDiscovery {}
impl MDNSServiceDiscovery {
pub fn new_service_discovery(config: MDNSServiceDiscoveryConfig) -> Result<Self> {
let (event_tx, event_rx) = channel::<MDNSServiceDiscoveryEvent>();
let (event_tx, event_rx) = unbounded::<MDNSServiceDiscoveryEvent>();
let peer_id = PeerId::from(identity::Keypair::generate_ed25519().public());
@ -97,7 +96,9 @@ impl MDNSServiceDiscovery {
ProcStack::default(),
);
Ok(Self { events: Arc::new(event_rx) })
Ok(Self {
events: Arc::new(event_rx),
})
}
pub fn events(&self) -> Arc<Receiver<MDNSServiceDiscoveryEvent>> {
@ -108,12 +109,10 @@ impl MDNSServiceDiscovery {
impl Future for MDNSServiceDiscovery {
type Output = MDNSServiceDiscoveryEvent;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
return match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}

View File

@ -4,7 +4,9 @@ use crate::service_discovery::udp_anycast::state::MulticastServiceDiscoveryState
use crate::service_discovery::udp_anycast::state::{
ServiceDiscoveryReply, ServiceDiscoveryRequest,
};
use bastion_executor::blocking::spawn_blocking;
use cuneiform_fields::arch::ArchPadding;
use lightproc::proc_stack::ProcStack;
use std::sync::mpsc;
use std::sync::mpsc::{channel, Sender};
@ -21,13 +23,13 @@ impl MulticastServiceDiscovery {
let (poll, state) = MulticastServiceDiscoveryState::new(config, discovery_reply)?;
debug!("Starting Artillery Multicast SD");
std::thread::Builder::new()
.name("artillery-mcast-service-discovery-state".to_string())
.spawn(move || {
let _multicast_sd_handle = spawn_blocking(
async move {
MulticastServiceDiscoveryState::event_loop(&mut internal_rx, poll, state)
.expect("Failed to create event loop");
})
.expect("cannot start udp_anycast service discovery state thread");
},
ProcStack::default(),
);
Ok(Self {
comm: ArchPadding::new(internal_tx),
@ -51,14 +53,15 @@ impl MulticastServiceDiscovery {
.send(ServiceDiscoveryRequest::SetBroadcastListen(listen))?)
}
/// Explore the network to find nodes using udp_anycast SD.
/// Explore the network to find nodes using `udp_anycast` SD.
pub fn seek_peers(&self) -> Result<()> {
Ok(self.comm.send(ServiceDiscoveryRequest::SeekPeers)?)
}
/// Shutdown Service Discovery
pub fn shutdown(&mut self) -> Result<()> {
Ok(self.discovery_exit())
self.discovery_exit();
Ok(())
}
fn discovery_exit(&mut self) {

View File

@ -1,6 +1,7 @@
use crate::constants::*;
use crate::errors::*;
use crate::service_discovery::udp_anycast::discovery_config::MulticastServiceDiscoveryConfig;
use std::convert::TryFrom;
use cuneiform_fields::arch::ArchPadding;
use mio::net::UdpSocket;
@ -70,8 +71,6 @@ impl MulticastServiceDiscoveryState {
) -> Result<ServiceDiscoveryReactor> {
let poll: Poll = Poll::new()?;
// let interests = get_interests();
// let interests = get_interests();
let mut server_socket = UdpSocket::bind(config.discovery_addr)?;
server_socket.set_broadcast(true)?;
@ -141,48 +140,52 @@ impl MulticastServiceDiscoveryState {
}
fn writable(&mut self, poll: &mut Poll, token: Token) -> Result<()> {
if token == ON_DISCOVERY {
let reply = ServiceDiscoveryMessage::Response {
uid: self.uid,
content: self.default_reply.clone(),
};
let discovery_reply = serde_json::to_vec(&reply)?;
match token {
ON_DISCOVERY => {
let reply = ServiceDiscoveryMessage::Response {
uid: self.uid,
content: self.default_reply.clone(),
};
let discovery_reply = serde_json::to_vec(&reply)?;
while let Some(peer_addr) = self.seeker_replies.pop_front() {
while let Some(peer_addr) = self.seeker_replies.pop_front() {
let mut sent_bytes = 0;
while sent_bytes != discovery_reply.len() {
if let Ok(bytes_tx) = self
.server_socket
.send_to(&discovery_reply[sent_bytes..], peer_addr)
{
sent_bytes += bytes_tx;
} else {
poll.registry().reregister(
&mut self.server_socket,
ON_DISCOVERY,
Interest::WRITABLE,
)?;
return Ok(());
}
}
}
}
SEEK_NODES => {
let mut sent_bytes = 0;
while sent_bytes != discovery_reply.len() {
while sent_bytes != self.seek_request.len() {
if let Ok(bytes_tx) = self
.server_socket
.send_to(&discovery_reply[sent_bytes..], peer_addr)
.send_to(&self.seek_request[sent_bytes..], self.config.seeking_addr)
{
sent_bytes += bytes_tx;
} else {
poll.registry().reregister(
&mut self.server_socket,
ON_DISCOVERY,
SEEK_NODES,
Interest::WRITABLE,
)?;
return Ok(());
}
}
}
} else if token == SEEK_NODES {
let mut sent_bytes = 0;
while sent_bytes != self.seek_request.len() {
if let Ok(bytes_tx) = self
.server_socket
.send_to(&self.seek_request[sent_bytes..], self.config.seeking_addr)
{
sent_bytes += bytes_tx;
} else {
poll.registry().reregister(
&mut self.server_socket,
SEEK_NODES,
Interest::WRITABLE,
)?;
return Ok(());
}
}
_ => (),
}
Ok(poll
@ -199,7 +202,9 @@ impl MulticastServiceDiscoveryState {
let mut buf = [0_u8; CONST_PACKET_SIZE];
let mut start = Instant::now();
let timeout = Duration::from_millis(state.config.timeout_delta.num_milliseconds() as u64);
let timeout = Duration::from_millis(u64::try_from(
state.config.timeout_delta.num_milliseconds(),
)?);
// Our event loop.
loop {

8
checks.sh Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env sh
set -em
cargo fmt
cargo fix --allow-dirty --allow-staged
cargo clippy
cargo fmt

View File

@ -7,6 +7,8 @@
<h1 align="center">Artillery: Cluster management & Distributed data protocol</h1>
# Module Structure
It contains the modules below:
* `artillery-ddata`: Used for distributed data replication
* `artillery-core`: Contains: