Merge pull request #24 from bastion-rs/ddata-craq

DData – CRAQ implementation
This commit is contained in:
Mahmut Bulut 2020-04-20 23:35:03 +02:00 committed by GitHub
commit a297b82749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2856 additions and 13 deletions

View File

@ -8,4 +8,8 @@ members = [
[profile.release]
lto = "fat"
codegen-units = 1
codegen-units = 1
# [profile.bench]
# debug = true

12
Makefile.toml Normal file
View File

@ -0,0 +1,12 @@
[config]
default_to_workspace = false
[env]
CRAQ_DIR = "artillery-ddata/src/craq"
[tasks.compile-craq]
script = [
'''
thrift -out $CRAQ_DIR --gen rs $CRAQ_DIR/protocol/proto.thrift
'''
]

View File

@ -2,13 +2,14 @@
name = "artillery-core"
version = "0.1.0"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
description = "Fire-forged cluster management & Distributed data protocol"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4.8"
failure = "0.1.6"
log = "0.4"
failure = "0.1.7"
failure_derive = "0.1.6"
bastion-utils = "0.3.2"
cuneiform-fields = "0.1"
@ -20,7 +21,7 @@ rand = "0.7.3"
mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] }
futures = "0.3"
pin-utils = "0.1.0-alpha.4"
libp2p = "0.17.0"
libp2p = { version = "0.18", features = ["mdns"] }
bastion-executor = "0.3.4"
lightproc = "0.3.4"
crossbeam-channel = "0.4.2"

View File

@ -4,6 +4,25 @@ version = "0.1.0"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
failure = "0.1.7"
thrift = "0.13.0"
t1ha = "0.1"
crossbeam-channel = "0.4"
[dev-dependencies]
clap = "2.33.0"
pretty_env_logger = "0.4.0"
bastion = "0.3"
bastion-executor = "0.3"
lightproc = "0.3"
rand = "0.7"
criterion = "0.3"
futures = "0.3"
[[bench]]
name = "craq_bencher"
harness = false

View File

@ -0,0 +1,194 @@
use criterion::{criterion_group, criterion_main, Criterion};
use artillery_ddata::craq::prelude::*;
use futures::stream::StreamExt;
use rand::distributions::Alphanumeric;
use rand::prelude::*;
pub fn entry_bench_read(sip: String, args: Vec<&str>) -> Vec<DDataCraqClient> {
// Connect to servers
let num_clients: usize = args[0].parse::<usize>().unwrap();
let num_bytes: usize = args[1].parse::<usize>().unwrap();
let _trials: usize = args[2].parse::<usize>().unwrap();
let num_servers = args.len() - 3;
let hpc: Vec<&str> = sip.split(":").collect();
let mut hosts = vec![(hpc[0], hpc[1].parse::<u16>().unwrap())];
(0..num_servers).into_iter().for_each(|i| {
let hpc: Vec<&str> = args[i + 3].split(":").collect();
let host = hpc[0];
let port = hpc[1];
let port = port.parse::<u16>().unwrap();
hosts.extend([(host, port)].iter());
});
let mut clients: Vec<DDataCraqClient> = (0..num_clients)
.into_iter()
.map(|i| {
let (host, port) = hosts[i % hosts.len()];
DDataCraqClient::connect_host_port(host, port).unwrap()
})
.collect();
if clients[0].write(gen_random_str(num_bytes)).is_err() {
println!("bench_write: Couldn't write new revision.");
}
// Check if any object is written...
if clients[0].read(CraqConsistencyModel::Strong, 0).is_err() {
println!("bench_read: Could not read object.");
}
clients
}
pub fn gen_random_str(slen: usize) -> String {
thread_rng().sample_iter(&Alphanumeric).take(slen).collect()
}
pub fn stub_read(clients: &mut Vec<DDataCraqClient>) {
clients.iter_mut().for_each(|client| {
let _ = client.read(CraqConsistencyModel::Eventual, 0);
});
}
fn client_benchmarks(c: &mut Criterion) {
{
// 1 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["1", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_1_client", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 2 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["2", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_2_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 3 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["3", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_3_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 4 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["4", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_4_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 5 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["5", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_5_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 10 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["10", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_10_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 20 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["20", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_20_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 30 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["30", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_30_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 40 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["40", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_40_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 50 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["50", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_50_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
{
// 100 clients
let mut clients = entry_bench_read(
"127.0.0.1:30001".to_string(),
vec!["100", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"],
);
c.bench_function("benchmark_read_100_clients", |b| {
b.iter(|| stub_read(&mut clients))
});
}
// {
// // 500 clients
// let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["500", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]);
// c.bench_function("benchmark_read_500_clients", |b| b.iter(|| stub_read(&mut clients)));
// }
// {
// // 1000 clients
// let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["1000", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]);
// c.bench_function("benchmark_read_1000_clients", |b| b.iter(|| stub_read(&mut clients)));
// }
}
criterion_group!(benches, client_benchmarks);
criterion_main!(benches);

View File

@ -0,0 +1,98 @@
extern crate pretty_env_logger;
#[macro_use]
extern crate log;
use artillery_ddata::craq::prelude::*;
use clap::*;
fn main() {
pretty_env_logger::init();
let matches = App::new("Artillery CRAQ")
.author("Mahmut Bulut, vertexclique [ta] gmail [tod] com")
.version(crate_version!())
.about("Artillery Distributed Data Protocol Tester")
.subcommand(
SubCommand::with_name("server")
.about("Runs a CRAQ server")
.arg(
Arg::with_name("cr_mode")
.required(true)
.help("CR mode that server would use: 0 for CRAQ, 1 for CR")
.index(1),
)
.arg(
Arg::with_name("node_index")
.required(true)
.help("Node index this server would use")
.index(2),
)
.arg(
Arg::with_name("chain_servers")
.required(true)
.multiple(true),
),
)
.subcommand(
SubCommand::with_name("client")
.about("Runs a CRAQ client")
.arg(
Arg::with_name("server_ip_port")
.required(true)
.help("Server ip and port to connect")
.index(1),
)
.arg(
Arg::with_name("test_method")
.required(true)
.help("Test method of client to test against the server")
.index(2),
)
.arg(
Arg::with_name("extra_args")
.required(true)
.multiple(true)
.min_values(3),
),
)
.after_help("Enables Artillery CRAQ protocol to be tested in the server/client fashion")
.get_matches();
match matches.subcommand() {
("server", Some(server_matches)) => {
let cr_mode = match server_matches.value_of("cr_mode") {
Some("0") => CRMode::Craq,
Some("1") => CRMode::Cr,
_ => panic!("CR mode not as expected"),
};
if let Some(node_index) = server_matches.value_of("node_index") {
let node_index = node_index.parse::<usize>().unwrap();
let varargs: Vec<&str> =
server_matches.values_of("chain_servers").unwrap().collect();
let nodes: Vec<ChainNode> = varargs.iter().flat_map(ChainNode::new).collect();
assert_eq!(nodes.len(), varargs.len(), "Node address parsing failed");
let chain = CraqChain::new(&nodes, node_index).unwrap();
CraqNode::start(cr_mode, chain, CraqConfig::default());
}
}
("client", Some(client_matches)) => {
let _sip = client_matches
.value_of("server_ip_port")
.unwrap()
.to_string();
match client_matches.value_of("test_method") {
Some("bench_read") => todo!(),
_ => unreachable!(),
}
}
_ => {
error!("Couldn't find any known subcommands");
}
}
}

View File

@ -0,0 +1,88 @@
use super::chain_node::ChainNode;
use super::errors::*;
use std::fmt;
use std::fmt::Display;
///
/// Representation of a closed-loop CRAQ chain
#[derive(Default, Debug)]
pub struct CraqChain {
/// List of nodes in this chain, in order.
nodes: Vec<ChainNode>,
/// Index of this node.
node_idx: usize,
}
impl CraqChain {
///
/// Create a new chain.
pub fn new(nodes: &[ChainNode], node_idx: usize) -> Result<Self> {
ensure!(
node_idx < nodes.len(),
"Node index can't be greater than chain length."
);
Ok(Self {
nodes: nodes.to_vec(),
node_idx,
})
}
///
/// Returns whether this node is the head of its chain.
pub fn is_head(&self) -> bool {
self.node_idx == 0
}
///
/// Returns the successor node if exists
pub fn is_tail(&self) -> bool {
self.node_idx == self.nodes.len().saturating_sub(1)
}
///
/// Returns the successor node if exists
pub fn get_successor(&self) -> Option<&ChainNode> {
if self.is_tail() {
None
} else {
self.nodes.get(self.node_idx.saturating_add(1))
}
}
///
/// Returns the tail node.
pub fn get_tail(&self) -> Option<&ChainNode> {
self.nodes.last()
}
///
/// Returns the chain node associated with the current node index.
pub fn get_node(&self) -> Option<&ChainNode> {
self.nodes.get(self.node_idx)
}
///
/// Returns the current node index.
pub fn get_index(&self) -> usize {
self.node_idx
}
///
/// Returns the size of this chain.
pub fn chain_size(&self) -> usize {
self.nodes.len()
}
}
///
/// Human-readable display impl for the Chain
impl Display for CraqChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CR: Index [{}] in chain: {:#?}",
self.node_idx, self.nodes
)
}
}

View File

@ -0,0 +1,26 @@
use super::errors::*;
use std::net::{SocketAddr, ToSocketAddrs};
///
/// Chain node representation
#[derive(Debug, Clone)]
pub struct ChainNode {
host: SocketAddr,
}
impl ChainNode {
pub fn new<A>(addr: A) -> Result<Self>
where
A: ToSocketAddrs,
{
let host: SocketAddr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?;
Ok(Self { host })
}
pub fn get_addr(&self) -> &SocketAddr {
&self.host
}
}

View File

@ -0,0 +1,117 @@
use std::fmt;
use super::errors::*;
use super::{
node::CraqClient,
proto::{CraqConsistencyModel, CraqObject, TCraqServiceSyncClient},
};
use std::net::{SocketAddr, ToSocketAddrs};
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
use thrift::transport::{
TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp,
};
pub struct ReadObject {
///
/// Object's value.
value: Vec<u8>,
///
/// Whether the read was dirty (true) or clean (false).
dirty: bool,
}
impl ReadObject {
///
/// Creates a new wrapper Read Object
pub fn new(value: Vec<u8>, dirty: bool) -> Self {
Self { value, dirty }
}
}
impl fmt::Debug for ReadObject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadObject")
.field("value", &self.value)
.field("dirty", &self.dirty)
.finish()
}
}
impl fmt::Display for ReadObject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadObject")
.field("value", &self.value)
.field("dirty", &self.dirty)
.finish()
}
}
pub struct DDataCraqClient {
host: SocketAddr,
cc: CraqClient,
}
impl DDataCraqClient {
pub fn connect_host_port<T>(host: T, port: u16) -> Result<Self>
where
T: AsRef<str>,
{
Self::connect(format!("{}:{}", host.as_ref(), port))
}
pub fn connect<A>(addr: A) -> Result<Self>
where
A: ToSocketAddrs,
{
let host: SocketAddr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?;
debug!("Client is initiating connection to: {}", host);
let mut c = BiTcp::new();
c.open(&host.to_string())?;
let (i_chan, o_chan) = c.split()?;
let (i_tran, o_tran) = (
TFramedReadTransport::new(i_chan),
TFramedWriteTransport::new(o_chan),
);
let (i_prot, o_prot) = (
TBinaryInputProtocol::new(i_tran, true),
TBinaryOutputProtocol::new(o_tran, true),
);
debug!("Created client: {}", host);
let cc = CraqClient::new(i_prot, o_prot);
Ok(Self { host, cc })
}
///
/// Writes an object to the cluster, returning the new object version or -1 upon failure.
pub fn write(&mut self, value: String) -> Result<i64> {
let mut obj = CraqObject::default();
obj.value = Some(value.into_bytes());
Ok(self.cc.write(obj)?)
}
///
/// Reads an object with given bound version.
pub fn read(&mut self, model: CraqConsistencyModel, version_bound: i64) -> Result<ReadObject> {
let obj = self.cc.read(model, version_bound)?;
match (obj.value, obj.dirty) {
(Some(v), Some(d)) => Ok(ReadObject::new(v, d)),
_ => bail!(CraqError::ReadError, "Read request failed"),
}
}
///
/// Performs a test-and-set operation, returning the new object version or -1 upon failure.
pub fn test_and_set(&mut self, value: String, expected_version: i64) -> Result<i64> {
let mut obj = CraqObject::default();
obj.value = Some(value.into_bytes());
Ok(self.cc.test_and_set(obj, expected_version)?)
}
}

View File

@ -0,0 +1,22 @@
use super::node::CRMode;
#[derive(Debug, Clone)]
pub struct CraqConfig {
pub fallback_replication_port: u16,
pub operation_mode: CRMode,
pub connection_sleep_time: u64,
pub connection_pool_size: usize,
pub protocol_worker_size: usize,
}
impl Default for CraqConfig {
fn default() -> Self {
CraqConfig {
fallback_replication_port: 22991_u16,
operation_mode: CRMode::Craq,
connection_sleep_time: 1000_u64,
connection_pool_size: 50_usize,
protocol_worker_size: 100_usize,
}
}
}

View File

@ -0,0 +1,56 @@
use failure::Fail;
use std::io;
use std::result;
/// Result type for operations that could result in an `CraqError`
pub type Result<T> = result::Result<T, CraqError>;
#[derive(Fail, Debug)]
pub enum CraqError {
#[fail(display = "Artillery :: CRAQ :: I/O error occurred: {}", _0)]
IOError(io::Error),
#[fail(display = "Artillery :: CRAQ :: Socket addr: {}", _0)]
SocketAddrError(String),
#[fail(display = "Artillery :: CRAQ :: Assertion failed: {}", _0)]
AssertionError(String, failure::Backtrace),
#[fail(display = "Artillery :: CRAQ :: Protocol error: {}", _0)]
ProtocolError(thrift::Error),
#[fail(display = "Artillery :: CRAQ :: Read error: {}", _0)]
ReadError(String),
}
impl From<io::Error> for CraqError {
fn from(e: io::Error) -> Self {
CraqError::IOError(e)
}
}
impl From<thrift::Error> for CraqError {
fn from(e: thrift::Error) -> Self {
CraqError::ProtocolError(e)
}
}
#[macro_export]
macro_rules! bail {
($kind:expr, $e:expr) => {
return Err($kind($e.to_owned()));
};
($kind:expr, $fmt:expr, $($arg:tt)+) => {
return Err($kind(format!($fmt, $($arg)+).to_owned()));
};
}
macro_rules! ensure {
($cond:expr, $e:expr) => {
if !($cond) {
return Err(CraqError::AssertionError($e.to_string(), failure::Backtrace::new()));
}
};
($cond:expr, $fmt:expr, $($arg:tt)+) => {
if !($cond) {
return Err(CraqError::AssertionError(format!($fmt, $($arg)+).to_string(), failure::Backtrace::new()));
}
};
}

View File

@ -0,0 +1,41 @@
use core::sync::atomic::spin_loop_hint;
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
///
///
/// RwLock that blocks until yielding
pub struct ERwLock<T: ?Sized>(RwLock<T>);
impl<T> ERwLock<T> {
pub fn new(t: T) -> ERwLock<T> {
ERwLock(RwLock::new(t))
}
}
impl<T: ?Sized> ERwLock<T> {
#[inline]
pub fn read(&self) -> RwLockReadGuard<T> {
loop {
match self.0.try_read() {
Ok(guard) => break guard,
_ => spin_loop_hint(),
}
}
}
#[inline]
pub fn write(&self) -> RwLockWriteGuard<T> {
loop {
match self.0.try_write() {
Ok(guard) => break guard,
_ => spin_loop_hint(),
}
}
}
#[allow(dead_code)]
#[inline]
pub fn inner(&self) -> &RwLock<T> {
&self.0
}
}

View File

@ -0,0 +1,28 @@
/// Error API for CRAQ distributed store
#[macro_use]
pub mod errors;
mod chain;
mod chain_node;
mod erwlock;
#[allow(clippy::all)]
#[allow(deprecated)]
#[allow(unknown_lints)]
mod proto;
mod server;
pub mod client;
pub mod craq_config;
pub mod node;
/// Prelude for CRAQ distributed store
pub mod prelude {
pub use super::chain::*;
pub use super::chain_node::*;
pub use super::client::*;
pub use super::craq_config::*;
pub use super::errors::*;
pub use super::node::*;
pub use super::proto::*;
}

View File

@ -0,0 +1,262 @@
use super::errors::*;
use super::chain::CraqChain;
use super::{craq_config::CraqConfig, erwlock::ERwLock, proto::*, server::CraqProtoServer};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs},
sync::Arc,
};
use thrift::protocol::{
TBinaryInputProtocol, TBinaryInputProtocolFactory, TBinaryOutputProtocol,
TBinaryOutputProtocolFactory,
};
use thrift::transport::{
ReadHalf, TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp, WriteHalf,
};
use thrift::server::TServer;
use crossbeam_channel::{unbounded, Receiver, Sender};
///
/// CR mode that will be used.
#[derive(Debug, Clone, PartialEq)]
pub enum CRMode {
/// Standard CR mode.
Cr,
/// CRAQ mode.
Craq,
}
impl Default for CRMode {
fn default() -> Self {
CRMode::Craq
}
}
type CraqClientInputProtocol = TBinaryInputProtocol<TFramedReadTransport<ReadHalf<BiTcp>>>;
type CraqClientOutputProtocol = TBinaryOutputProtocol<TFramedWriteTransport<WriteHalf<BiTcp>>>;
pub(crate) type CraqClient =
CraqServiceSyncClient<CraqClientInputProtocol, CraqClientOutputProtocol>;
///
/// Representation of a physical CRAQ node.
#[derive(Default)]
pub struct CraqNode {
/// Run mode which is either CR or CRAQ mode.
pub cr_mode: CRMode,
/// Whole chain.
pub chain: Arc<CraqChain>,
/// Tail connection pool receiver.
pub tail_pool_rx: Option<Arc<Receiver<CraqClient>>>,
/// Successor connection pool receiver.
pub successor_pool_rx: Option<Arc<Receiver<CraqClient>>>,
/// Tail connection pool sender.
pub tail_pool_tx: Option<Arc<Sender<CraqClient>>>,
/// Successor connection pool sender.
pub successor_pool_tx: Option<Arc<Sender<CraqClient>>>,
/// Stored node configuration to be reused across iterations
pub config: CraqConfig,
}
impl CraqNode {
///
/// Initialize a CRAQ node with given chain
fn new_node(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result<Self> {
Ok(Self {
cr_mode,
chain: Arc::new(chain),
config,
..Default::default()
})
}
///
/// Initial connection to the underlying protocol server.
fn connect_to_first<A>(&self, server_addr: A) -> CraqClient
where
A: ToSocketAddrs,
{
self.connect_to_server(&server_addr).unwrap_or_else(|_e| {
std::thread::sleep(std::time::Duration::from_millis(
self.config.connection_sleep_time,
));
self.connect_to_first(server_addr)
})
}
///
/// Creates a connection pool to the given underlying protocol server.
fn create_conn_pool<A>(
&self,
server_addr: A,
) -> Result<(Sender<CraqClient>, Receiver<CraqClient>)>
where
A: ToSocketAddrs,
{
let (tx, rx) = unbounded();
let client = self.connect_to_first(&server_addr);
// TODO: tryize
let _ = tx.try_send(client);
let _ = (0..self.config.connection_pool_size)
.flat_map(|_| -> Result<_> { Ok(tx.try_send(self.connect_to_server(&server_addr)?)) });
// while let Ok(_) = tx.try_send(self.connect_to_server(&server_addr)?) {}
Ok((tx, rx))
}
///
/// Connects to the other nodes in the given chain.
fn connect(noderef: Arc<ERwLock<CraqNode>>) -> Result<()> {
let mut nodew = noderef.write();
debug!("Trying to connect");
if nodew.chain.is_tail() {
return Ok(());
}
debug!("Checking tail connection...");
if let Some(tail) = nodew.chain.clone().get_tail() {
let tail = tail.clone();
let (t_tx, t_rx) = nodew.create_conn_pool(tail.get_addr())?;
nodew.tail_pool_rx = Some(Arc::new(t_rx));
nodew.tail_pool_tx = Some(Arc::new(t_tx));
info!(
"[CR Node {}] Connected to tail at {}",
nodew.chain.get_index(),
tail.get_addr()
);
} else {
// NOTE: shouldn't happen
error!("Shouldn't have happened - tail follows");
unreachable!()
}
debug!("Checking node before the tail...");
// Is this the node before the tail?
if nodew.chain.get_index() == nodew.chain.chain_size().saturating_sub(2) {
nodew.successor_pool_tx = nodew.tail_pool_tx.clone();
nodew.successor_pool_rx = nodew.tail_pool_rx.clone();
info!("[CR Node {}] Node before the tail", nodew.chain.get_index());
return Ok(());
}
debug!("Checking successor...");
if let Some(successor) = nodew.chain.get_successor() {
let successor = successor.clone();
info!(
"[CR Node {}] Connecting to successor at {}",
nodew.chain.get_index(),
successor.get_addr()
);
let (s_tx, s_rx) = nodew.create_conn_pool(successor.get_addr())?;
nodew.successor_pool_rx = Some(Arc::new(s_rx));
nodew.successor_pool_tx = Some(Arc::new(s_tx));
info!(
"[CR Node {}] Connected to successor at {}",
nodew.chain.get_index(),
successor.get_addr()
);
} else {
// NOTE: shouldn't happen
error!("Shouldn't have happened - successor interval");
unreachable!()
}
debug!("All aligned...");
Ok(())
}
///
/// Entrypoint / Start procedure of this node
pub fn start(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result<()> {
let port = chain
.get_node()
.map_or(config.fallback_replication_port, |n| n.get_addr().port());
let node = Arc::new(ERwLock::new(CraqNode::new_node(cr_mode, chain, config)?));
let connector_node = node.clone();
let handle = std::thread::spawn(move || {
Self::connect(connector_node).expect("Successor connections has failed");
});
let _ = handle.join();
info!("Starting protocol server at port: {}", port);
Self::run_protocol_server(node, port)
}
///
/// Connect to given server address using CRAQ client.
fn connect_to_server<A>(&self, addr: A) -> Result<CraqClient>
where
A: ToSocketAddrs,
{
let host: SocketAddr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?;
debug!("Issuing connection to: {}", host);
let mut c = BiTcp::new();
c.open(&host.to_string())?;
let (i_chan, o_chan) = c.split()?;
let (i_tran, o_tran) = (
TFramedReadTransport::new(i_chan),
TFramedWriteTransport::new(o_chan),
);
let (i_prot, o_prot) = (
TBinaryInputProtocol::new(i_tran, true),
TBinaryOutputProtocol::new(o_tran, true),
);
debug!("Creating client: {}", host);
Ok(CraqClient::new(i_prot, o_prot))
}
///
/// Start local protocol server
fn run_protocol_server(node: Arc<ERwLock<CraqNode>>, port: u16) -> Result<()> {
let node = node.read();
debug!("Protocol medium getting set up");
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let (i_tran_fact, i_prot_fact) = (
TFramedReadTransportFactory::new(),
TBinaryInputProtocolFactory::new(),
);
let (o_tran_fact, o_prot_fact) = (
TFramedWriteTransportFactory::new(),
TBinaryOutputProtocolFactory::new(),
);
let processor = CraqServiceSyncProcessor::new(CraqProtoServer::new(
node.tail_pool_rx.clone(),
node.tail_pool_tx.clone(),
node.successor_pool_rx.clone(),
node.successor_pool_tx.clone(),
node.chain.clone(),
node.cr_mode.clone(),
));
debug!("Server started");
let mut server = TServer::new(
i_tran_fact,
i_prot_fact,
o_tran_fact,
o_prot_fact,
processor,
node.config.protocol_worker_size,
);
debug!("Started listening");
Ok(server.listen(&server_addr.to_string())?)
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
/*
* CRAQ replication protocol definition
*/
namespace cpp artillery_craq
namespace java artillery.craq.thrift
/** Version numbers. */
typedef i64 Version
/** Consistency models. */
enum CraqConsistencyModel { STRONG, EVENTUAL, EVENTUAL_MAX_BOUNDED, DEBUG }
/** Object envelope. */
struct CraqObject {
1: optional binary value;
2: optional bool dirty;
}
/** Artillery CRAQ Invalid State Error */
exception InvalidState {
1: string reason
}
/** Artillery CRAQ service. */
service CraqService {
// -------------------------------------------------------------------------
// Client-facing methods
// -------------------------------------------------------------------------
/** Reads a value with the desired consistency model. */
CraqObject read(1:CraqConsistencyModel model, 2:Version versionBound),
/** Writes a new value. */
Version write(1:CraqObject obj),
/** Performs a test-and-set operation. **/
Version testAndSet(1:CraqObject obj, 2:Version expectedVersion),
// -------------------------------------------------------------------------
// Internal methods
// -------------------------------------------------------------------------
/** Writes a new value with the given version. */
void writeVersioned(1:CraqObject obj, 2:Version version),
/** Returns the latest committed version. */
Version versionQuery()
}

View File

@ -0,0 +1,387 @@
use super::proto::*;
use super::{chain::CraqChain, erwlock::ERwLock, node::CRMode};
use crate::craq::node::CraqClient;
use core::sync::atomic::AtomicI64;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;
use std::sync::{atomic::Ordering, Arc};
use t1ha::T1haHashMap as HashMap;
pub struct CraqProtoServer {
/// Tail connection pool receiver.
pub tp_rx: Option<Arc<Receiver<CraqClient>>>,
/// Successor connection pool receiver.
pub sp_rx: Option<Arc<Receiver<CraqClient>>>,
/// Tail connection pool sender.
pub tp_tx: Option<Arc<Sender<CraqClient>>>,
/// Successor connection pool sender.
pub sp_tx: Option<Arc<Sender<CraqClient>>>,
///
/// CR reference
chain_ref: Arc<CraqChain>,
/// Known objects: version, bytes
pub objects: Arc<ERwLock<HashMap<i64, CraqObject>>>,
/// Latest known version which is either clean or dirty.
///
/// NOTE: This should start with a negative round to make
/// version aligned at the first WR/DR.
pub latest_version: AtomicI64,
/// Latest clean version this one is always clean.
///
/// NOTE: This should start with a negative round to make
/// version aligned at the first clean version upgrade commit.
pub latest_clean_version: AtomicI64,
///
/// Algorithmic mode of operation
cr_mode: CRMode,
}
impl CraqProtoServer {
pub fn new(
tp_rx: Option<Arc<Receiver<CraqClient>>>,
tp_tx: Option<Arc<Sender<CraqClient>>>,
sp_rx: Option<Arc<Receiver<CraqClient>>>,
sp_tx: Option<Arc<Sender<CraqClient>>>,
chain_ref: Arc<CraqChain>,
cr_mode: CRMode,
) -> Self {
Self {
tp_rx,
sp_rx,
tp_tx,
sp_tx,
chain_ref,
cr_mode,
objects: Arc::new(ERwLock::new(HashMap::default())),
latest_version: AtomicI64::new(!0),
latest_clean_version: AtomicI64::new(!0),
}
}
///
/// Creates shallow dirty copy of given object
fn copy_object(&self, obj: &CraqObject, dirty: bool) -> CraqObject {
let mut copied = obj.clone();
copied.dirty = Some(dirty);
copied
}
///
/// Handle all incoming write-mode requests like: test-and-set, write
fn write(
&self,
obj: CraqObject,
expected_version: i64,
) -> std::result::Result<i64, thrift::Error> {
if self.tp_tx.is_none()
|| self.tp_rx.is_none()
|| self.sp_tx.is_none()
|| self.sp_rx.is_none()
{
return Err(state_error("Chain is not initialized!"));
}
if !self.chain_ref.is_head() {
return Err(state_error("Cannot write to non-head!"));
}
if expected_version != !0 {
// reject if latest version is not the expected version or there are uncommitted writes
let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst);
let latest_version = self.latest_version.load(Ordering::SeqCst);
if latest_clean_version != expected_version || latest_version != latest_clean_version {
return Ok(!0);
}
}
// Record new object version. Do the Harlem Shake...
let mut new_version = self.latest_version.fetch_add(1, Ordering::SeqCst);
new_version += 1;
self.objects.write().insert(new_version, obj.clone());
// Send down chain
let s_rx = self.sp_rx.as_ref().unwrap();
let s_tx = self.sp_tx.as_ref().unwrap();
// TODO: tryize
let mut successor = s_rx.try_recv().unwrap();
successor.write_versioned(obj, new_version)?;
// TODO: tryize
let _ = s_tx.try_send(successor);
// Update current clean version
let old_clean_ver: i64 = {
// TODO: It should be CAS.
let loaded = self.latest_clean_version.load(Ordering::SeqCst);
if loaded < new_version {
self.latest_clean_version
.store(new_version, Ordering::SeqCst);
new_version
} else {
loaded
}
};
if new_version > old_clean_ver {
self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst));
}
Ok(new_version)
}
///
/// Strong consistency specific version query
/// This one makes a version request to tail to get the appropriate object
fn get_obj_from_version_query(&self) -> std::result::Result<CraqObject, thrift::Error> {
// Send a version query
let t_rx = self.tp_rx.as_ref().unwrap();
let t_tx = self.tp_tx.as_ref().unwrap();
// TODO: tryize
let mut tail = t_rx.try_recv().unwrap();
let tail_version = tail.version_query()?;
// TODO: tryize
let _ = t_tx.try_send(tail);
// If no clean version is around then return an empty obj
if tail_version < 0 {
return Ok(CraqObject::default());
}
let mut obj = self.objects.read().get(&tail_version).cloned();
if obj.is_none() {
// newer version already committed (old one erased), return the latest clean version
obj = self
.objects
.read()
.get(&self.latest_clean_version.load(Ordering::SeqCst))
.cloned();
}
obj.map_or(
Err(state_error("Returning empty object after a version query!")),
Result::Ok,
)
}
///
/// Removes all object versions older than the latest clean one.
fn remove_old_versions(&self, latest_clean_ver: i64) {
let mut objects = self.objects.write();
objects.retain(|k, _| k >= &latest_clean_ver);
}
}
impl CraqServiceSyncHandler for CraqProtoServer {
///
/// Handles versioned query request received from the protocol client
fn handle_version_query(&self) -> std::result::Result<i64, thrift::Error> {
debug!(
"[Artillery CRAQ Node {}] Received version query...",
self.chain_ref.get_index()
);
// only tail should receive version queries
if !self.chain_ref.is_tail() {
return Err(state_error(
"Cannot make a version query to a non-tail node!",
));
}
Ok(self.latest_clean_version.load(Ordering::SeqCst))
}
///
/// Handles versioned write request received from the protocol client
fn handle_write_versioned(
&self,
obj: CraqObject,
version: i64,
) -> std::result::Result<(), thrift::Error> {
debug!(
"[Artillery CRAQ Node {}] Received write with version: {}",
self.chain_ref.get_index(),
version
);
// Head should not receive versioned writes
if self.chain_ref.is_head() {
return Err(state_error("Cannot make a versioned write to the head!"));
}
// Write latest object version
self.objects.write().insert(version, obj.clone());
// Update latest version if applicable
if self.latest_version.load(Ordering::SeqCst) < version {
self.latest_version.store(version, Ordering::SeqCst);
}
// Non-tail: send down chain
if !self.chain_ref.is_tail() {
let s_rx = self.sp_rx.as_ref().unwrap();
let s_tx = self.sp_tx.as_ref().unwrap();
// TODO: tryize
let mut successor = s_rx.try_recv().unwrap();
successor.write_versioned(obj, version)?;
// TODO: tryize
let _ = s_tx.try_send(successor);
}
// Mark this current version as CLEAN
let old_clean_ver: i64 = {
// TODO: CAS it should be.
let loaded = self.latest_clean_version.load(Ordering::SeqCst);
if loaded < version {
self.latest_clean_version.store(version, Ordering::SeqCst);
version
} else {
loaded
}
};
if version > old_clean_ver || self.chain_ref.is_tail() {
self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst));
}
Ok(())
}
///
/// Handles test-and-set request received from the protocol client
fn handle_test_and_set(
&self,
obj: CraqObject,
expected_version: i64,
) -> std::result::Result<i64, thrift::Error> {
debug!(
"[Artillery CRAQ Node {}] Received test-and-set request from client...",
self.chain_ref.get_index()
);
self.write(obj, expected_version)
}
///
/// Handles write request received from the protocol client
fn handle_write(&self, obj: CraqObject) -> std::result::Result<i64, thrift::Error> {
debug!(
"[Artillery CRAQ Node {}] Received write request from client...",
self.chain_ref.get_index()
);
self.write(obj, !0)
}
fn handle_read(
&self,
model: CraqConsistencyModel,
version_bound: i64,
) -> std::result::Result<CraqObject, thrift::Error> {
debug!(
"[Artillery CRAQ Node {}] Received read request from client...",
self.chain_ref.get_index()
);
// Node hasn't initialized?
if !self.chain_ref.is_tail()
&& (self.tp_rx.is_none()
|| self.tp_tx.is_none()
|| self.sp_rx.is_none()
|| self.sp_tx.is_none())
{
return Err(state_error("Chain is not initialized!"));
}
// Running normal CR: fail if we're not the tail
if self.cr_mode == CRMode::Cr && !self.chain_ref.is_tail() {
return Err(state_error("Cannot read from non-tail node in CR mode!"));
}
// No objects stored?
if self.objects.read().is_empty() {
return Ok(CraqObject::default());
}
// Lazy programmers do the best they say.
// Same people said there's more than one way to do it.
match model {
CraqConsistencyModel::Strong => {
if self.latest_version.load(Ordering::SeqCst)
> self.latest_clean_version.load(Ordering::SeqCst)
&& !self.chain_ref.is_tail()
{
// Non-tail: latest known version isn't clean, send a version query
let obj = self.get_obj_from_version_query()?;
return Ok(self.copy_object(&obj, true));
}
if self.latest_clean_version.load(Ordering::SeqCst) < 0 {
return Ok(CraqObject::default());
}
if self.chain_ref.is_tail() {
let latest_version = self.latest_version.load(Ordering::SeqCst);
if let Some(obj) = self.objects.read().get(&latest_version) {
return Ok(self.copy_object(obj, false));
} else {
return Err(state_error("Returning null object from the tail"));
}
}
let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst);
if let Some(obj) = self.objects.read().get(&latest_clean_version) {
return Ok(self.copy_object(obj, false));
} else {
return Err(state_error("Returning null object from a clean read!"));
}
}
CraqConsistencyModel::Eventual => {
// Return latest known version
let latest_version = self.latest_version.load(Ordering::SeqCst);
if let Some(obj) = self.objects.read().get(&latest_version) {
// TODO: normally None for dirty.
return Ok(self.copy_object(obj, false));
} else {
return Err(state_error("Returning null object for an eventual read!"));
}
}
CraqConsistencyModel::EventualMaxBounded => {
// Return latest known version within the given bound
let latest_version = self.latest_version.load(Ordering::SeqCst);
let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst);
let bounded_version = latest_clean_version.saturating_add(std::cmp::min(
version_bound,
latest_version.saturating_sub(latest_clean_version),
));
if let Some(obj) = self.objects.read().get(&bounded_version) {
// TODO: normally None for dirty.
return Ok(self.copy_object(obj, false));
} else {
return Err(state_error(
"Returning null object for a bounded eventual read!",
));
}
}
CraqConsistencyModel::Debug => {
// make a version query
if self.chain_ref.is_tail() {
return Ok(CraqObject::default());
} else {
self.copy_object(&self.get_obj_from_version_query()?, true);
}
}
}
error!("Fatal state error happened.");
Err(state_error("Fatal state error."))
}
}
#[inline]
fn state_error(msg: &str) -> thrift::Error {
thrift::Error::from(InvalidState::new(msg.to_owned()))
}

View File

@ -1,7 +1,5 @@
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
#[macro_use]
extern crate log;
#[macro_use]
pub mod craq;

9
ddata-tests/shutdown.sh Executable file
View File

@ -0,0 +1,9 @@
for i in `seq 0 $CHAIN_LEN`
do
a=`printenv PID$i`
kill $a
echo "kill" $a
export PID$i=
done
export CHAIN_LEN=

13
ddata-tests/test.sh Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -eo pipefail
len=$(($#-1))
export CHAIN_LEN=$len
for i in `seq 0 $(($#-1))`
do
echo $i
target/debug/examples/craq_node server 0 $i $* &
export PID$i=$!
echo ${PID}$i
done