Apply checks for the code

This commit is contained in:
Mahmut Bulut 2020-02-29 15:26:32 +01:00
parent 72cf188169
commit 90f8e5ec4e
5 changed files with 74 additions and 82 deletions

View File

@ -3,32 +3,21 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use clap::*;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::ToSocketAddrs;
use std::path::Path;
use uuid::Uuid;
use artillery_core::epidemic::prelude::*;
use artillery_core::service_discovery::mdns::prelude::*;
use once_cell::sync::OnceCell;
use serde::*;
use std::thread;
use futures::future;
use std::time::Duration;
use artillery_core::cluster::ap_cluster::*;
use futures::future;
use bastion_executor::prelude::*;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;
use std::sync::Arc;
fn main() {
pretty_env_logger::init();
@ -62,39 +51,41 @@ fn main() {
let ap_cluster = Arc::new(ArtilleryAPCluster::new(ap_cluster_config).unwrap());
// Launch the cluster node
run( async {
let cluster_stack = ProcStack::default()
.with_pid(2);
let events_stack = ProcStack::default()
.with_pid(3);
run(
async {
let cluster_stack = ProcStack::default().with_pid(2);
let events_stack = ProcStack::default().with_pid(3);
let ap_events = ap_cluster.clone();
let ap_events = ap_cluster.clone();
// Detach cluster launch
let cluster_handle =
spawn_blocking(async move {
ap_cluster.launch().await }, cluster_stack);
// Detach cluster launch
let cluster_handle =
spawn_blocking(async move { ap_cluster.launch().await }, cluster_stack);
// Detach event consumption
let events_handle =
spawn_blocking(async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().clone().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
// Detach event consumption
let events_handle = spawn_blocking(
async move {
warn!("STARTED: Event Poller");
for (members, event) in ap_events.cluster().clone().events.iter() {
warn!("");
warn!(" CLUSTER EVENT ");
warn!("===============");
warn!("{:?}", event);
warn!("");
for member in members {
info!("MEMBER {:?}", member);
for member in members {
info!("MEMBER {:?}", member);
}
}
}
warn!("STOPPED: Event Poller");
}, events_stack);
warn!("STOPPED: Event Poller");
},
events_stack,
);
future::join(events_handle, cluster_handle).await
}, ProcStack::default().with_pid(1));
future::join(events_handle, cluster_handle).await
},
ProcStack::default().with_pid(1),
);
}
fn get_port() -> u16 {

View File

@ -1,17 +1,13 @@
use crate::epidemic::prelude::*;
use crate::service_discovery::mdns::prelude::*;
use bastion_executor::blocking::spawn_blocking;
use lightproc::proc_handle::ProcHandle;
use lightproc::proc_stack::ProcStack;
use uuid::Uuid;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Receiver;
use lightproc::prelude::*;
use crate::errors::*;
use std::rc::Rc;
use std::future::Future;
use std::error::Error;
use crate::service_discovery::mdns::prelude::*;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Default, Clone)]
pub struct ArtilleryAPClusterConfig {
@ -24,7 +20,7 @@ pub struct ArtilleryAPClusterConfig {
pub struct ArtilleryAPCluster {
config: ArtilleryAPClusterConfig,
cluster: Arc<Cluster>,
sd: Arc<MDNSServiceDiscovery>
sd: Arc<MDNSServiceDiscovery>,
}
unsafe impl Send for ArtilleryAPCluster {}
@ -34,20 +30,14 @@ pub type DiscoveryLaunch = RecoverableHandle<()>;
impl ArtilleryAPCluster {
pub fn new(config: ArtilleryAPClusterConfig) -> Result<Self> {
let sd =
MDNSServiceDiscovery::new_service_discovery(
config.sd_config.clone())?;
let sd = MDNSServiceDiscovery::new_service_discovery(config.sd_config.clone())?;
let cluster =
Cluster::new_cluster(
config.node_id,
config.cluster_config.clone()
)?;
let cluster = Cluster::new_cluster(config.node_id, config.cluster_config.clone())?;
Ok(Self {
config,
cluster: Arc::new(cluster),
sd: Arc::new(sd)
sd: Arc::new(sd),
})
}
@ -59,7 +49,7 @@ impl ArtilleryAPCluster {
self.sd.clone()
}
pub fn launch(&self) -> impl Future<Output=()> + '_ {
pub fn launch(&self) -> impl Future<Output = ()> + '_ {
let config = self.config.clone();
let events = self.service_discovery().events();
let cluster = self.cluster.clone();
@ -71,7 +61,9 @@ impl ArtilleryAPCluster {
events_inner
.iter()
.filter(|discovery| discovery.get().port() != config_inner.sd_config.local_service_addr.port())
.filter(|discovery| {
discovery.get().port() != config_inner.sd_config.local_service_addr.port()
})
.for_each(|discovery| cluster_inner.add_seed_node(discovery.get()))
}
}

View File

@ -4,7 +4,12 @@ use crate::epidemic::state::{ArtilleryClusterEvent, ArtilleryClusterRequest};
use crate::errors::*;
use std::convert::AsRef;
use std::net::SocketAddr;
use std::{task::{Context, Poll}, sync::mpsc::{channel, Receiver, Sender}, pin::Pin, future::Future};
use std::{
future::Future,
pin::Pin,
sync::mpsc::{channel, Receiver, Sender},
task::{Context, Poll},
};
use uuid::Uuid;
pub struct Cluster {
@ -59,12 +64,10 @@ impl Cluster {
impl Future for Cluster {
type Output = ArtilleryClusterEvent;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
return match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}

View File

@ -8,13 +8,11 @@ use libp2p::multiaddr::Protocol;
use libp2p::{identity, Multiaddr, PeerId};
use lightproc::proc_stack::ProcStack;
use crossbeam_channel::{unbounded, Sender, Receiver};
use crossbeam_channel::{unbounded, Receiver};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::sync::Arc;
use std::task::{Context, Poll};
pub struct MDNSServiceDiscovery {
events: Arc<Receiver<MDNSServiceDiscoveryEvent>>,
@ -98,7 +96,9 @@ impl MDNSServiceDiscovery {
ProcStack::default(),
);
Ok(Self { events: Arc::new(event_rx) })
Ok(Self {
events: Arc::new(event_rx),
})
}
pub fn events(&self) -> Arc<Receiver<MDNSServiceDiscoveryEvent>> {
@ -109,12 +109,10 @@ impl MDNSServiceDiscovery {
impl Future for MDNSServiceDiscovery {
type Output = MDNSServiceDiscoveryEvent;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
return match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending
}
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
match self.events.recv() {
Ok(kv) => Poll::Ready(kv),
Err(_) => Poll::Pending,
}
}
}

8
checks.sh Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env sh
set -em
cargo fmt
cargo fix --allow-dirty --allow-staged
cargo clippy
cargo fmt