Add MDNS service discovery

This commit is contained in:
Mahmut Bulut 2020-02-11 02:27:29 +01:00
parent 96b20fb47c
commit a2d8108174
21 changed files with 436 additions and 50 deletions

5
.gitignore vendored
View File

@ -21,4 +21,7 @@ bcs
.idea/
# Cluster related files to be ignored
artillery-core/statedata
artillery-core/statedata
# Test outputs
deployment-tests/node_state

View File

@ -18,6 +18,9 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
libp2p = "0.15.0"
bastion-executor = "0.3.4"
lightproc = "0.3.4"
[dev-dependencies]
bincode = "1.2.1"

View File

@ -116,8 +116,8 @@ fn read_host_key(root_folder: &Path) -> Uuid {
}
let host_key = Uuid::new_v4();
dbg!(host_key_path.clone());
let mut host_key_file = File::create(&host_key_path).unwrap();
host_key_file.write_all(host_key.as_bytes()).unwrap();
// https://open.spotify.com/track/6Btbw0SV8FqxJ0AAA26Xrd?si=4BR-pO7nQki-DlCrvCTGgg
host_key
}

View File

@ -0,0 +1,151 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use clap::*;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::{ToSocketAddrs, SocketAddr};
use std::path::Path;
use uuid::Uuid;
use std::str::FromStr;
use serde::*;
use bastion_utils::math;
use once_cell::sync::{Lazy, OnceCell};
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
use artillery_core::service_discovery::mdns::prelude::*;
use artillery_core::epidemic::prelude::*;
use artillery_core::constants::CONST_INFECTION_PORT;
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ExampleSDReply {
ip: String,
port: u16
}
fn main() {
pretty_env_logger::init();
let matches = App::new("Cannonball :: MDNS + Epidemic")
.author("Mahmut Bulut, vertexclique [ta] gmail [tod] com")
.version(crate_version!())
.about("Artillery Epidemic Protocol Tester")
.arg(
Arg::with_name("node-data")
.index(1)
.long("node-data")
.aliases(&["data-folder"])
.required(true)
.help("Node State Data Folder"),
)
.after_help(
"Enables Artillery MDNS Service Discovery + Epidemic Protocol to be tested \
in the cluster configuration",
)
.get_matches();
let data_folder = matches
.value_of("node-data")
.expect("Can't be None, required");
let data_folder_path = Path::new(&data_folder);
let host_key = read_host_key(&data_folder_path);
warn!("Host key: {}", host_key.to_hyphenated());
let this_node_cluster_port = get_port();
let sd_config = {
let mut config = MDNSServiceDiscoveryConfig::default();
config.local_service_addr.set_port(this_node_cluster_port);
config
};
let sd =
MDNSServiceDiscovery::new_service_discovery(sd_config).unwrap();
let this_node_cluster_listen_addr = format!("127.0.0.1:{}", this_node_cluster_port);
let cluster = get_cluster(this_node_cluster_listen_addr.as_str(), host_key);
std::thread::Builder::new()
.name("cluster-event-poller".to_string())
.spawn(move || {
poll_cluster_events(this_node_cluster_listen_addr.as_str(), host_key)
})
.expect("cannot start cluster-event-poller");
thread::sleep(Duration::from_secs(1));
for discovery in sd.events {
if discovery.get().port() != this_node_cluster_port {
cluster.add_seed_node(discovery.get());
}
}
}
fn poll_cluster_events(listen_addr: &str, host_key: Uuid) {
warn!("STARTED: Event Poller");
for (members, event) in
get_cluster(listen_addr, host_key).events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
}
}
warn!("STOPPED: Event Poller");
}
fn read_host_key(root_folder: &Path) -> Uuid {
let host_key_path = root_folder.join("host_key");
if let Ok(mut config_file) = File::open(&host_key_path) {
let mut host_key_contents = Vec::<u8>::new();
config_file.read_to_end(&mut host_key_contents).unwrap();
let u: [u8; 16] = host_key_contents.as_slice().try_into().unwrap();
return Uuid::from_bytes(u);
}
let host_key = Uuid::new_v4();
dbg!(host_key_path.clone());
let mut host_key_file = File::create(&host_key_path).unwrap();
host_key_file.write_all(host_key.as_bytes()).unwrap();
host_key
}
fn get_port() -> u16 {
use rand::{thread_rng, Rng};
let mut rng = thread_rng();
let port: u16 = rng.gen();
if port > 1025 && port < 65535 {
port
} else { get_port() }
}
#[inline]
fn get_cluster(listen_addr: &str, host_key: Uuid) -> &'static Cluster {
static CLUSTER: OnceCell<Cluster> = OnceCell::new();
CLUSTER.get_or_init(|| {
let config = ClusterConfig {
cluster_key: "artillery_local".as_bytes().to_vec(),
listen_addr: (&listen_addr as &str)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
..Default::default()
};
Cluster::new_cluster(host_key, config).unwrap()
})
}

View File

@ -16,10 +16,12 @@ use std::str::FromStr;
use serde::*;
use bastion_utils::math;
use once_cell::sync::{Lazy, OnceCell};
use artillery_core::prelude::discovery_config::MulticastServiceDiscoveryConfig;
use std::sync::mpsc::channel;
use std::thread;
use chrono::Duration;
use artillery_core::service_discovery::multicast::prelude::*;
use artillery_core::constants::*;
use artillery_core::epidemic::prelude::*;
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct ExampleSDReply {
@ -29,7 +31,7 @@ struct ExampleSDReply {
fn main() {
pretty_env_logger::init();
let matches = App::new("Cannonball :: Epidemic")
let matches = App::new("Cannonball :: UDP-SD + Epidemic")
.author("Mahmut Bulut, vertexclique [ta] gmail [tod] com")
.version(crate_version!())
.about("Artillery Epidemic Protocol Tester")
@ -67,15 +69,14 @@ fn main() {
let service_discovery =
{
let sd_port = get_port();
dbg!(sd_port.clone());
if let Some(_) = seeker {
discovery_config::MulticastServiceDiscoveryConfig {
MulticastServiceDiscoveryConfig {
timeout_delta: Duration::seconds(1),
discovery_addr: SocketAddr::from(([0, 0, 0, 0], sd_port)),
seeking_addr: SocketAddr::from(([0, 0, 0, 0], CONST_SERVICE_DISCOVERY_PORT)),
}
} else {
discovery_config::MulticastServiceDiscoveryConfig {
MulticastServiceDiscoveryConfig {
timeout_delta: Duration::seconds(1),
discovery_addr: SocketAddr::from(([0, 0, 0, 0], CONST_SERVICE_DISCOVERY_PORT)),
seeking_addr: SocketAddr::from(([0, 0, 0, 0], sd_port)),
@ -89,12 +90,12 @@ fn main() {
};
let reply =
state::ServiceDiscoveryReply {
ServiceDiscoveryReply {
serialized_data: serde_json::to_string(&epidemic_sd_config).unwrap()
};
let sd =
sd::MulticastServiceDiscovery::new_service_discovery(
MulticastServiceDiscovery::new_service_discovery(
service_discovery, reply).unwrap();
let listen_addr = format!("{}:{}", "127.0.0.1", epidemic_sd_config.port);
@ -109,7 +110,6 @@ fn main() {
sd.set_listen_for_peers(true).unwrap();
}
std::thread::Builder::new()
.name("cluster-event-poller".to_string())
.spawn(move || {
@ -117,17 +117,10 @@ fn main() {
})
.expect("cannot start cluster-event-poller");
while let Ok(disco) = discoveries.try_recv() {
dbg!(disco);
}
for discovery in discoveries.iter() {
dbg!(discovery.clone());
let discovery: ExampleSDReply = serde_json::from_str(&discovery.serialized_data).unwrap();
dbg!(discovery.clone());
dbg!(epidemic_sd_config.clone());
if discovery.port != epidemic_sd_config.port {
dbg!("SEED NODE CAME");
debug!("Seed node address came");
let seed_node = format!("{}:{}", epidemic_sd_config.ip, discovery.port);
cluster
.add_seed_node(FromStr::from_str(&seed_node).unwrap());
@ -166,7 +159,6 @@ fn read_host_key(root_folder: &Path) -> Uuid {
let host_key = Uuid::new_v4();
let mut host_key_file = File::create(&host_key_path).unwrap();
host_key_file.write_all(host_key.as_bytes()).unwrap();
// https://open.spotify.com/track/6Btbw0SV8FqxJ0AAA26Xrd?si=4BR-pO7nQki-DlCrvCTGgg
host_key
}

View File

@ -0,0 +1,30 @@
use crate::service_discovery::multicast::prelude::*;
use crate::epidemic::prelude::*;
#[derive(Default)]
pub struct ArtilleryAPClusterConfig {
app_name: String,
cluster_config: ClusterConfig
}
pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig
}
impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Self {
Self {
config
}
}
pub fn new_with_defaults() -> Self {
Self {
config: ArtilleryAPClusterConfig::default()
}
}
pub fn launch(&self) -> Self {
unimplemented!()
}
}

View File

@ -0,0 +1 @@
pub mod ap_cluster;

View File

@ -4,7 +4,7 @@ pub const CONST_SERVICE_DISCOVERY_PORT: u16 = 34726;
// ARTIL = 27845
/// Default Epidemic Port
pub const CONST_DISSEMINATION_PORT: u16 = 27845;
pub const CONST_INFECTION_PORT: u16 = 27845;
// Not sure MIO handles this correctly.
// Behave like this is the size. Normally 512 is enough.

View File

@ -13,7 +13,7 @@ pub struct ClusterConfig {
impl Default for ClusterConfig {
fn default() -> Self {
let directed = SocketAddr::from(([127, 0, 0, 1], CONST_DISSEMINATION_PORT));
let directed = SocketAddr::from(([127, 0, 0, 1], CONST_INFECTION_PORT));
ClusterConfig {
cluster_key: b"default".to_vec(),

View File

@ -6,3 +6,11 @@ pub mod cluster_config;
pub mod member;
pub mod membership;
pub mod state;
pub mod prelude {
pub use super::cluster::*;
pub use super::cluster_config::*;
pub use super::membership::*;
pub use super::member::*;
pub use super::state::*;
}

View File

@ -2,19 +2,16 @@
extern crate log;
#[macro_use]
mod errors;
pub mod errors;
mod constants;
/// Constants of the Artillery
pub mod constants;
/// Infection-style clustering
mod epidemic;
pub mod epidemic;
/// Service discovery types
mod service_discovery;
/// Service discovery strategies
pub mod service_discovery;
pub mod prelude {
pub use super::constants::*;
pub use super::service_discovery::multicast::*;
pub use super::epidemic::cluster::*;
pub use super::epidemic::cluster_config::*;
}
/// Cluster types
pub mod cluster;

View File

@ -0,0 +1,20 @@
use crate::constants::*;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
pub struct MDNSServiceDiscoveryConfig {
pub reply_ttl: Duration,
pub local_service_addr: SocketAddr
}
impl Default for MDNSServiceDiscoveryConfig {
fn default() -> Self {
let local_service_addr =
SocketAddr::from(([127, 0, 0, 1], CONST_INFECTION_PORT));
Self {
reply_ttl: Duration::from_millis(10),
local_service_addr
}
}
}

View File

@ -0,0 +1,9 @@
pub mod discovery_config;
pub mod sd;
pub mod state;
pub mod prelude {
pub use super::discovery_config::*;
pub use super::sd::*;
pub use super::state::*;
}

View File

@ -0,0 +1,92 @@
use cuneiform_fields::prelude::*;
use std::sync::mpsc::{Sender, Receiver, channel};
use crate::errors::*;
use crate::service_discovery::mdns::discovery_config::MDNSServiceDiscoveryConfig;
use libp2p::mdns::service::*;
use crate::service_discovery::mdns::state::MDNSServiceDiscoveryEvent;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_stack::ProcStack;
use lightproc::recoverable_handle::RecoverableHandle;
use libp2p::{identity, Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use std::net::SocketAddr;
pub struct MDNSServiceDiscovery {
pub events: Receiver<MDNSServiceDiscoveryEvent>
}
impl MDNSServiceDiscovery {
pub fn new_service_discovery(
config: MDNSServiceDiscoveryConfig,
) -> Result<Self> {
let (event_tx, event_rx) =
channel::<MDNSServiceDiscoveryEvent>();
let peer_id = PeerId::from(identity::Keypair::generate_ed25519().public());
let discovery_handle = spawn_blocking(async move {
let mut service = MdnsService::new()
.expect("Can't launch the MDNS service");
loop {
let (mut srv, packet) = service.next().await;
match packet {
MdnsPacket::Query(query) => {
debug!("Query from {:?}", query.remote_addr());
let mut address: Multiaddr = format!(
"/ip4/{}/udp/{}",
config.local_service_addr.ip().to_string(),
config.local_service_addr.port()
).parse().unwrap();
let resp = build_query_response(
query.query_id(),
peer_id.clone(),
vec![address].into_iter(),
config.reply_ttl,
).unwrap();
srv.enqueue_response(resp);
}
MdnsPacket::Response(response) => {
// We detected a libp2p mDNS response on the network. Responses are for
// everyone and not just for the requester, which makes it possible to
// passively listen.
for peer in response.discovered_peers() {
debug!("Discovered peer {:?}", peer.id());
// These are the self-reported addresses of the peer we just discovered.
for addr in peer.addresses() {
debug!(" Address = {:?}", addr);
let components = addr.iter().collect::<Vec<_>>();
if let Protocol::Ip4(discovered_ip) = components[0] {
if let Protocol::Udp(discovered_port) = components[1] {
let discovered =
format!("{}:{}", discovered_ip, discovered_port)
.parse()
.unwrap();
event_tx.send(MDNSServiceDiscoveryEvent(discovered))
.unwrap();
} else {
error!("Unexpected protocol received: {}", components[1]);
}
} else {
error!("Unexpected IP received: {}", components[0]);
}
}
}
}
MdnsPacket::ServiceDiscovery(query) => {
// The last possibility is a service detection query from DNS-SD.
// Just like `Query`, in a real application you probably want to call
// `query.respond`.
debug!("Detected service query from {:?}", query.remote_addr());
}
}
service = srv
}
}, ProcStack::default());
Ok(Self {
events: event_rx
})
}
}

View File

@ -0,0 +1,9 @@
use std::net::SocketAddr;
pub struct MDNSServiceDiscoveryEvent(pub SocketAddr);
impl MDNSServiceDiscoveryEvent {
pub fn get(&self) -> SocketAddr {
self.0
}
}

View File

@ -1 +1,2 @@
pub mod multicast;
pub mod mdns;

View File

@ -2,3 +2,9 @@ pub mod discovery_config;
pub mod state;
pub mod sd;
pub mod prelude {
pub use super::discovery_config::*;
pub use super::state::*;
pub use super::sd::*;
}

View File

@ -98,23 +98,17 @@ impl MulticastServiceDiscoveryState {
fn readable(&mut self, buf: &mut [u8], poll: &mut Poll) -> Result<()> {
if let Ok((_bytes_read, peer_addr)) = self.server_socket.recv_from(buf) {
debug!("Readable received.");
let serialized = std::str::from_utf8(buf)?.to_string().trim().to_string();
let serialized = serialized.trim_matches(char::from(0x00));
let msg: ServiceDiscoveryMessage = if let Ok(msg) = serde_json::from_str(serialized) {
debug!("Message was: {:?}", msg);
msg
} else {
debug!("Decoding failure");
return Ok(());
};
dbg!(msg.clone());
match msg {
ServiceDiscoveryMessage::Request => {
if self.listen {
dbg!("listen");
self.seeker_replies.push_back(peer_addr);
poll.registry().reregister(
&mut self.server_socket,
@ -122,7 +116,6 @@ impl MulticastServiceDiscoveryState {
Interest::WRITABLE,
)?;
} else {
dbg!("seek");
poll.registry().reregister(
&mut self.server_socket,
ON_DISCOVERY,
@ -156,7 +149,6 @@ impl MulticastServiceDiscoveryState {
let discovery_reply = serde_json::to_vec(&reply)?;
while let Some(peer_addr) = self.seeker_replies.pop_front() {
dbg!("HITZHERE_2");
let mut sent_bytes = 0;
while sent_bytes != discovery_reply.len() {
if let Ok(bytes_tx) = self
@ -175,7 +167,6 @@ impl MulticastServiceDiscoveryState {
}
}
} else if token == SEEK_NODES {
dbg!("SEEK_NODES");
let mut sent_bytes = 0;
while sent_bytes != self.seek_request.len() {
if let Ok(bytes_tx) = self
@ -215,7 +206,6 @@ impl MulticastServiceDiscoveryState {
let elapsed = start.elapsed();
if elapsed >= timeout {
// state.process_internal_request(&mut poll, ServiceDiscoveryRequest::SeekPeers);
start = Instant::now();
}
@ -244,9 +234,7 @@ impl MulticastServiceDiscoveryState {
// Process inbound events
for event in events.iter() {
// dbg!(event.clone());
if event.is_readable() && event.token() == ON_DISCOVERY {
// if event.token() == ON_DISCOVERY {
if let Err(err) = state.readable(&mut buf, &mut poll) {
error!("Service discovery error in READABLE: {:?}", err);
break;
@ -279,15 +267,11 @@ impl MulticastServiceDiscoveryState {
self.listen = bcast_listen;
}
SeekPeers => {
let s = std::str::from_utf8(&self.seek_request).unwrap().to_string();
dbg!(s);
match self
.server_socket
.send_to(&self.seek_request, self.config.seeking_addr)
{
Ok(_) => {
dbg!("SENT");
if let Err(err) = poll.registry().reregister(
&mut self.server_socket,
ON_DISCOVERY,
@ -298,7 +282,6 @@ impl MulticastServiceDiscoveryState {
}
}
Ok(x) if x == 0 => {
dbg!("NOTHING WRITTEN");
if let Err(err) = poll.registry().reregister(
&mut self.server_socket,
SEEK_NODES,
@ -309,7 +292,6 @@ impl MulticastServiceDiscoveryState {
}
}
Err(err) => {
dbg!("BROADCAST FAILED");
error!("General Error for Service Discovery Internal Request: {:?}", err);
self.running = false;
}

View File

@ -0,0 +1,21 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: artillery-ap-deployment
labels:
app: artillery-ap
spec:
replicas: 3
selector:
matchLabels:
app: artillery-ap
template:
metadata:
labels:
app: artillery-ap
spec:
containers:
- name: artillery-ap
image: artillery-ap:0.1.0
ports:
- containerPort: 27845

View File

@ -0,0 +1,35 @@
#!/usr/bin/env zsh
help()
{
echo ""
echo "Usage: $0 -s CLUSTER_SIZE"
echo -e "\t-size Launches a zeroconf AP Artillery cluster"
exit 1
}
while getopts "s:" opt
do
case "$opt" in
s ) CLUSTER_SIZE="$OPTARG" ;;
? ) help ;;
esac
done
if [ -z "$CLUSTER_SIZE" ]
then
echo "Parameter expected";
help
fi
mkdir -p deployment-tests/node_state
cd deployment-tests/node_state
for i in {1..$CLUSTER_SIZE}
do
echo "Starting Node: $i"
NODE_DATA_DIR="node$i"
mkdir -p $NODE_DATA_DIR
RUST_BACKTRACE=full RUST_LOG=debug cargo run --example cball_mdns_sd_infection $NODE_DATA_DIR &
sleep 1
done

View File

@ -0,0 +1,26 @@
#!/bin/sh
set -o errexit
# desired cluster name; default is "kind"
KIND_CLUSTER_NAME="${KIND_CLUSTER_NAME:-kind}"
# create registry container unless it already exists
reg_name='kind-registry'
reg_port='5000'
running="$(docker inspect -f '{{.State.Running}}' "${reg_name}" 2>/dev/null || true)"
if [ "${running}" != 'true' ]; then
docker run \
-d --restart=always -p "${reg_port}:5000" --name "${reg_name}" \
registry:2
fi
reg_ip="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' "${reg_name}")"
# create a cluster with the local registry enabled in containerd
cat <<EOF | kind create cluster --name "${KIND_CLUSTER_NAME}" --config=-
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
containerdConfigPatches:
- |-
[plugins."io.containerd.grpc.v1.cri".registry.mirrors."localhost:${reg_port}"]
endpoint = ["http://${reg_ip}:${reg_port}"]
EOF