From 1adb05311872f23c0af57db345ed895048fc4d44 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 20 Apr 2020 22:43:48 +0200 Subject: [PATCH] Protocol server impl Processor connection Finalize versioned writes Ordinary write handler impl Get obj from versioned query Strong consistency read State definition Rework Remove backtrace Apply checks Config rework Fix versions Finalize --- Cargo.toml | 6 +- Makefile.toml | 12 + artillery-core/Cargo.toml | 7 +- artillery-ddata/Cargo.toml | 23 +- artillery-ddata/benches/craq_bencher.rs | 194 +++ artillery-ddata/examples/craq_node.rs | 98 ++ artillery-ddata/src/craq/chain.rs | 88 + artillery-ddata/src/craq/chain_node.rs | 26 + artillery-ddata/src/craq/client.rs | 117 ++ artillery-ddata/src/craq/craq_config.rs | 22 + artillery-ddata/src/craq/errors.rs | 56 + artillery-ddata/src/craq/erwlock.rs | 41 + artillery-ddata/src/craq/mod.rs | 28 + artillery-ddata/src/craq/node.rs | 262 +++ artillery-ddata/src/craq/proto.rs | 1422 +++++++++++++++++ .../src/craq/protocol/proto.thrift | 46 + artillery-ddata/src/craq/server.rs | 387 +++++ artillery-ddata/src/lib.rs | 12 +- ddata-tests/shutdown.sh | 9 + ddata-tests/test.sh | 13 + 20 files changed, 2856 insertions(+), 13 deletions(-) create mode 100644 Makefile.toml create mode 100644 artillery-ddata/benches/craq_bencher.rs create mode 100644 artillery-ddata/examples/craq_node.rs create mode 100644 artillery-ddata/src/craq/chain.rs create mode 100644 artillery-ddata/src/craq/chain_node.rs create mode 100644 artillery-ddata/src/craq/client.rs create mode 100644 artillery-ddata/src/craq/craq_config.rs create mode 100644 artillery-ddata/src/craq/errors.rs create mode 100644 artillery-ddata/src/craq/erwlock.rs create mode 100644 artillery-ddata/src/craq/mod.rs create mode 100644 artillery-ddata/src/craq/node.rs create mode 100644 artillery-ddata/src/craq/proto.rs create mode 100644 artillery-ddata/src/craq/protocol/proto.thrift create mode 100644 artillery-ddata/src/craq/server.rs create mode 100755 ddata-tests/shutdown.sh create mode 100755 ddata-tests/test.sh diff --git a/Cargo.toml b/Cargo.toml index a4660f2..285cf10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,8 @@ members = [ [profile.release] lto = "fat" -codegen-units = 1 \ No newline at end of file +codegen-units = 1 + + +# [profile.bench] +# debug = true \ No newline at end of file diff --git a/Makefile.toml b/Makefile.toml new file mode 100644 index 0000000..fd2d12c --- /dev/null +++ b/Makefile.toml @@ -0,0 +1,12 @@ +[config] +default_to_workspace = false + +[env] +CRAQ_DIR = "artillery-ddata/src/craq" + +[tasks.compile-craq] +script = [ + ''' + thrift -out $CRAQ_DIR --gen rs $CRAQ_DIR/protocol/proto.thrift + ''' +] \ No newline at end of file diff --git a/artillery-core/Cargo.toml b/artillery-core/Cargo.toml index f90ea32..1136c4a 100644 --- a/artillery-core/Cargo.toml +++ b/artillery-core/Cargo.toml @@ -2,13 +2,14 @@ name = "artillery-core" version = "0.1.0" authors = ["Mahmut Bulut "] +description = "Fire-forged cluster management & Distributed data protocol" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -log = "0.4.8" -failure = "0.1.6" +log = "0.4" +failure = "0.1.7" failure_derive = "0.1.6" bastion-utils = "0.3.2" cuneiform-fields = "0.1" @@ -20,7 +21,7 @@ rand = "0.7.3" mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] } futures = "0.3" pin-utils = "0.1.0-alpha.4" -libp2p = "0.16.0" +libp2p = { version = "0.18", features = ["mdns"] } bastion-executor = "0.3.4" lightproc = "0.3.4" crossbeam-channel = "0.4.2" diff --git a/artillery-ddata/Cargo.toml b/artillery-ddata/Cargo.toml index 5dbd592..59463a4 100644 --- a/artillery-ddata/Cargo.toml +++ b/artillery-ddata/Cargo.toml @@ -4,6 +4,25 @@ version = "0.1.0" authors = ["Mahmut Bulut "] edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] +log = "0.4" +failure = "0.1.7" +thrift = "0.13.0" +t1ha = "0.1" +crossbeam-channel = "0.4" + + +[dev-dependencies] +clap = "2.33.0" +pretty_env_logger = "0.4.0" +bastion = "0.3" +bastion-executor = "0.3" +lightproc = "0.3" +rand = "0.7" +criterion = "0.3" +futures = "0.3" + + +[[bench]] +name = "craq_bencher" +harness = false diff --git a/artillery-ddata/benches/craq_bencher.rs b/artillery-ddata/benches/craq_bencher.rs new file mode 100644 index 0000000..04f4d16 --- /dev/null +++ b/artillery-ddata/benches/craq_bencher.rs @@ -0,0 +1,194 @@ +use criterion::{criterion_group, criterion_main, Criterion}; + +use artillery_ddata::craq::prelude::*; + +use futures::stream::StreamExt; +use rand::distributions::Alphanumeric; +use rand::prelude::*; + +pub fn entry_bench_read(sip: String, args: Vec<&str>) -> Vec { + // Connect to servers + let num_clients: usize = args[0].parse::().unwrap(); + let num_bytes: usize = args[1].parse::().unwrap(); + let _trials: usize = args[2].parse::().unwrap(); + let num_servers = args.len() - 3; + + let hpc: Vec<&str> = sip.split(":").collect(); + let mut hosts = vec![(hpc[0], hpc[1].parse::().unwrap())]; + + (0..num_servers).into_iter().for_each(|i| { + let hpc: Vec<&str> = args[i + 3].split(":").collect(); + let host = hpc[0]; + let port = hpc[1]; + let port = port.parse::().unwrap(); + + hosts.extend([(host, port)].iter()); + }); + + let mut clients: Vec = (0..num_clients) + .into_iter() + .map(|i| { + let (host, port) = hosts[i % hosts.len()]; + DDataCraqClient::connect_host_port(host, port).unwrap() + }) + .collect(); + + if clients[0].write(gen_random_str(num_bytes)).is_err() { + println!("bench_write: Couldn't write new revision."); + } + + // Check if any object is written... + if clients[0].read(CraqConsistencyModel::Strong, 0).is_err() { + println!("bench_read: Could not read object."); + } + + clients +} + +pub fn gen_random_str(slen: usize) -> String { + thread_rng().sample_iter(&Alphanumeric).take(slen).collect() +} + +pub fn stub_read(clients: &mut Vec) { + clients.iter_mut().for_each(|client| { + let _ = client.read(CraqConsistencyModel::Eventual, 0); + }); +} + +fn client_benchmarks(c: &mut Criterion) { + { + // 1 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["1", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_1_client", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 2 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["2", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_2_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 3 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["3", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_3_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 4 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["4", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_4_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 5 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["5", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_5_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 10 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["10", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_10_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 20 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["20", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_20_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 30 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["30", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_30_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 40 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["40", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_40_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 50 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["50", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_50_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 100 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["100", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_100_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + // { + // // 500 clients + // let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["500", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]); + // c.bench_function("benchmark_read_500_clients", |b| b.iter(|| stub_read(&mut clients))); + // } + + // { + // // 1000 clients + // let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["1000", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]); + // c.bench_function("benchmark_read_1000_clients", |b| b.iter(|| stub_read(&mut clients))); + // } +} + +criterion_group!(benches, client_benchmarks); +criterion_main!(benches); diff --git a/artillery-ddata/examples/craq_node.rs b/artillery-ddata/examples/craq_node.rs new file mode 100644 index 0000000..3b08118 --- /dev/null +++ b/artillery-ddata/examples/craq_node.rs @@ -0,0 +1,98 @@ +extern crate pretty_env_logger; + +#[macro_use] +extern crate log; + +use artillery_ddata::craq::prelude::*; +use clap::*; + +fn main() { + pretty_env_logger::init(); + + let matches = App::new("Artillery CRAQ") + .author("Mahmut Bulut, vertexclique [ta] gmail [tod] com") + .version(crate_version!()) + .about("Artillery Distributed Data Protocol Tester") + .subcommand( + SubCommand::with_name("server") + .about("Runs a CRAQ server") + .arg( + Arg::with_name("cr_mode") + .required(true) + .help("CR mode that server would use: 0 for CRAQ, 1 for CR") + .index(1), + ) + .arg( + Arg::with_name("node_index") + .required(true) + .help("Node index this server would use") + .index(2), + ) + .arg( + Arg::with_name("chain_servers") + .required(true) + .multiple(true), + ), + ) + .subcommand( + SubCommand::with_name("client") + .about("Runs a CRAQ client") + .arg( + Arg::with_name("server_ip_port") + .required(true) + .help("Server ip and port to connect") + .index(1), + ) + .arg( + Arg::with_name("test_method") + .required(true) + .help("Test method of client to test against the server") + .index(2), + ) + .arg( + Arg::with_name("extra_args") + .required(true) + .multiple(true) + .min_values(3), + ), + ) + .after_help("Enables Artillery CRAQ protocol to be tested in the server/client fashion") + .get_matches(); + + match matches.subcommand() { + ("server", Some(server_matches)) => { + let cr_mode = match server_matches.value_of("cr_mode") { + Some("0") => CRMode::Craq, + Some("1") => CRMode::Cr, + _ => panic!("CR mode not as expected"), + }; + + if let Some(node_index) = server_matches.value_of("node_index") { + let node_index = node_index.parse::().unwrap(); + let varargs: Vec<&str> = + server_matches.values_of("chain_servers").unwrap().collect(); + + let nodes: Vec = varargs.iter().flat_map(ChainNode::new).collect(); + + assert_eq!(nodes.len(), varargs.len(), "Node address parsing failed"); + + let chain = CraqChain::new(&nodes, node_index).unwrap(); + CraqNode::start(cr_mode, chain, CraqConfig::default()); + } + } + ("client", Some(client_matches)) => { + let _sip = client_matches + .value_of("server_ip_port") + .unwrap() + .to_string(); + + match client_matches.value_of("test_method") { + Some("bench_read") => todo!(), + _ => unreachable!(), + } + } + _ => { + error!("Couldn't find any known subcommands"); + } + } +} diff --git a/artillery-ddata/src/craq/chain.rs b/artillery-ddata/src/craq/chain.rs new file mode 100644 index 0000000..6a65501 --- /dev/null +++ b/artillery-ddata/src/craq/chain.rs @@ -0,0 +1,88 @@ +use super::chain_node::ChainNode; +use super::errors::*; +use std::fmt; +use std::fmt::Display; + +/// +/// Representation of a closed-loop CRAQ chain +#[derive(Default, Debug)] +pub struct CraqChain { + /// List of nodes in this chain, in order. + nodes: Vec, + /// Index of this node. + node_idx: usize, +} + +impl CraqChain { + /// + /// Create a new chain. + pub fn new(nodes: &[ChainNode], node_idx: usize) -> Result { + ensure!( + node_idx < nodes.len(), + "Node index can't be greater than chain length." + ); + + Ok(Self { + nodes: nodes.to_vec(), + node_idx, + }) + } + + /// + /// Returns whether this node is the head of its chain. + pub fn is_head(&self) -> bool { + self.node_idx == 0 + } + + /// + /// Returns the successor node if exists + pub fn is_tail(&self) -> bool { + self.node_idx == self.nodes.len().saturating_sub(1) + } + + /// + /// Returns the successor node if exists + pub fn get_successor(&self) -> Option<&ChainNode> { + if self.is_tail() { + None + } else { + self.nodes.get(self.node_idx.saturating_add(1)) + } + } + + /// + /// Returns the tail node. + pub fn get_tail(&self) -> Option<&ChainNode> { + self.nodes.last() + } + + /// + /// Returns the chain node associated with the current node index. + pub fn get_node(&self) -> Option<&ChainNode> { + self.nodes.get(self.node_idx) + } + + /// + /// Returns the current node index. + pub fn get_index(&self) -> usize { + self.node_idx + } + + /// + /// Returns the size of this chain. + pub fn chain_size(&self) -> usize { + self.nodes.len() + } +} + +/// +/// Human-readable display impl for the Chain +impl Display for CraqChain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "CR: Index [{}] in chain: {:#?}", + self.node_idx, self.nodes + ) + } +} diff --git a/artillery-ddata/src/craq/chain_node.rs b/artillery-ddata/src/craq/chain_node.rs new file mode 100644 index 0000000..60ab804 --- /dev/null +++ b/artillery-ddata/src/craq/chain_node.rs @@ -0,0 +1,26 @@ +use super::errors::*; +use std::net::{SocketAddr, ToSocketAddrs}; + +/// +/// Chain node representation +#[derive(Debug, Clone)] +pub struct ChainNode { + host: SocketAddr, +} + +impl ChainNode { + pub fn new(addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + Ok(Self { host }) + } + + pub fn get_addr(&self) -> &SocketAddr { + &self.host + } +} diff --git a/artillery-ddata/src/craq/client.rs b/artillery-ddata/src/craq/client.rs new file mode 100644 index 0000000..1d36025 --- /dev/null +++ b/artillery-ddata/src/craq/client.rs @@ -0,0 +1,117 @@ +use std::fmt; + +use super::errors::*; +use super::{ + node::CraqClient, + proto::{CraqConsistencyModel, CraqObject, TCraqServiceSyncClient}, +}; +use std::net::{SocketAddr, ToSocketAddrs}; +use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol}; +use thrift::transport::{ + TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, + TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp, +}; + +pub struct ReadObject { + /// + /// Object's value. + value: Vec, + /// + /// Whether the read was dirty (true) or clean (false). + dirty: bool, +} + +impl ReadObject { + /// + /// Creates a new wrapper Read Object + pub fn new(value: Vec, dirty: bool) -> Self { + Self { value, dirty } + } +} + +impl fmt::Debug for ReadObject { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadObject") + .field("value", &self.value) + .field("dirty", &self.dirty) + .finish() + } +} + +impl fmt::Display for ReadObject { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadObject") + .field("value", &self.value) + .field("dirty", &self.dirty) + .finish() + } +} + +pub struct DDataCraqClient { + host: SocketAddr, + cc: CraqClient, +} + +impl DDataCraqClient { + pub fn connect_host_port(host: T, port: u16) -> Result + where + T: AsRef, + { + Self::connect(format!("{}:{}", host.as_ref(), port)) + } + + pub fn connect(addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + + debug!("Client is initiating connection to: {}", host); + + let mut c = BiTcp::new(); + c.open(&host.to_string())?; + let (i_chan, o_chan) = c.split()?; + let (i_tran, o_tran) = ( + TFramedReadTransport::new(i_chan), + TFramedWriteTransport::new(o_chan), + ); + let (i_prot, o_prot) = ( + TBinaryInputProtocol::new(i_tran, true), + TBinaryOutputProtocol::new(o_tran, true), + ); + + debug!("Created client: {}", host); + let cc = CraqClient::new(i_prot, o_prot); + Ok(Self { host, cc }) + } + + /// + /// Writes an object to the cluster, returning the new object version or -1 upon failure. + pub fn write(&mut self, value: String) -> Result { + let mut obj = CraqObject::default(); + obj.value = Some(value.into_bytes()); + Ok(self.cc.write(obj)?) + } + + /// + /// Reads an object with given bound version. + pub fn read(&mut self, model: CraqConsistencyModel, version_bound: i64) -> Result { + let obj = self.cc.read(model, version_bound)?; + + match (obj.value, obj.dirty) { + (Some(v), Some(d)) => Ok(ReadObject::new(v, d)), + _ => bail!(CraqError::ReadError, "Read request failed"), + } + } + + /// + /// Performs a test-and-set operation, returning the new object version or -1 upon failure. + pub fn test_and_set(&mut self, value: String, expected_version: i64) -> Result { + let mut obj = CraqObject::default(); + obj.value = Some(value.into_bytes()); + Ok(self.cc.test_and_set(obj, expected_version)?) + } +} diff --git a/artillery-ddata/src/craq/craq_config.rs b/artillery-ddata/src/craq/craq_config.rs new file mode 100644 index 0000000..54039ca --- /dev/null +++ b/artillery-ddata/src/craq/craq_config.rs @@ -0,0 +1,22 @@ +use super::node::CRMode; + +#[derive(Debug, Clone)] +pub struct CraqConfig { + pub fallback_replication_port: u16, + pub operation_mode: CRMode, + pub connection_sleep_time: u64, + pub connection_pool_size: usize, + pub protocol_worker_size: usize, +} + +impl Default for CraqConfig { + fn default() -> Self { + CraqConfig { + fallback_replication_port: 22991_u16, + operation_mode: CRMode::Craq, + connection_sleep_time: 1000_u64, + connection_pool_size: 50_usize, + protocol_worker_size: 100_usize, + } + } +} diff --git a/artillery-ddata/src/craq/errors.rs b/artillery-ddata/src/craq/errors.rs new file mode 100644 index 0000000..4871a28 --- /dev/null +++ b/artillery-ddata/src/craq/errors.rs @@ -0,0 +1,56 @@ +use failure::Fail; +use std::io; + +use std::result; + +/// Result type for operations that could result in an `CraqError` +pub type Result = result::Result; + +#[derive(Fail, Debug)] +pub enum CraqError { + #[fail(display = "Artillery :: CRAQ :: I/O error occurred: {}", _0)] + IOError(io::Error), + #[fail(display = "Artillery :: CRAQ :: Socket addr: {}", _0)] + SocketAddrError(String), + #[fail(display = "Artillery :: CRAQ :: Assertion failed: {}", _0)] + AssertionError(String, failure::Backtrace), + #[fail(display = "Artillery :: CRAQ :: Protocol error: {}", _0)] + ProtocolError(thrift::Error), + #[fail(display = "Artillery :: CRAQ :: Read error: {}", _0)] + ReadError(String), +} + +impl From for CraqError { + fn from(e: io::Error) -> Self { + CraqError::IOError(e) + } +} + +impl From for CraqError { + fn from(e: thrift::Error) -> Self { + CraqError::ProtocolError(e) + } +} + +#[macro_export] +macro_rules! bail { + ($kind:expr, $e:expr) => { + return Err($kind($e.to_owned())); + }; + ($kind:expr, $fmt:expr, $($arg:tt)+) => { + return Err($kind(format!($fmt, $($arg)+).to_owned())); + }; +} + +macro_rules! ensure { + ($cond:expr, $e:expr) => { + if !($cond) { + return Err(CraqError::AssertionError($e.to_string(), failure::Backtrace::new())); + } + }; + ($cond:expr, $fmt:expr, $($arg:tt)+) => { + if !($cond) { + return Err(CraqError::AssertionError(format!($fmt, $($arg)+).to_string(), failure::Backtrace::new())); + } + }; +} diff --git a/artillery-ddata/src/craq/erwlock.rs b/artillery-ddata/src/craq/erwlock.rs new file mode 100644 index 0000000..ae283ec --- /dev/null +++ b/artillery-ddata/src/craq/erwlock.rs @@ -0,0 +1,41 @@ +use core::sync::atomic::spin_loop_hint; +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +/// +/// +/// RwLock that blocks until yielding +pub struct ERwLock(RwLock); + +impl ERwLock { + pub fn new(t: T) -> ERwLock { + ERwLock(RwLock::new(t)) + } +} + +impl ERwLock { + #[inline] + pub fn read(&self) -> RwLockReadGuard { + loop { + match self.0.try_read() { + Ok(guard) => break guard, + _ => spin_loop_hint(), + } + } + } + + #[inline] + pub fn write(&self) -> RwLockWriteGuard { + loop { + match self.0.try_write() { + Ok(guard) => break guard, + _ => spin_loop_hint(), + } + } + } + + #[allow(dead_code)] + #[inline] + pub fn inner(&self) -> &RwLock { + &self.0 + } +} diff --git a/artillery-ddata/src/craq/mod.rs b/artillery-ddata/src/craq/mod.rs new file mode 100644 index 0000000..7cacd87 --- /dev/null +++ b/artillery-ddata/src/craq/mod.rs @@ -0,0 +1,28 @@ +/// Error API for CRAQ distributed store +#[macro_use] +pub mod errors; + +mod chain; +mod chain_node; +mod erwlock; + +#[allow(clippy::all)] +#[allow(deprecated)] +#[allow(unknown_lints)] +mod proto; +mod server; + +pub mod client; +pub mod craq_config; +pub mod node; + +/// Prelude for CRAQ distributed store +pub mod prelude { + pub use super::chain::*; + pub use super::chain_node::*; + pub use super::client::*; + pub use super::craq_config::*; + pub use super::errors::*; + pub use super::node::*; + pub use super::proto::*; +} diff --git a/artillery-ddata/src/craq/node.rs b/artillery-ddata/src/craq/node.rs new file mode 100644 index 0000000..cc81d33 --- /dev/null +++ b/artillery-ddata/src/craq/node.rs @@ -0,0 +1,262 @@ +use super::errors::*; + +use super::chain::CraqChain; +use super::{craq_config::CraqConfig, erwlock::ERwLock, proto::*, server::CraqProtoServer}; + +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}, + sync::Arc, +}; +use thrift::protocol::{ + TBinaryInputProtocol, TBinaryInputProtocolFactory, TBinaryOutputProtocol, + TBinaryOutputProtocolFactory, +}; +use thrift::transport::{ + ReadHalf, TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, + TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp, WriteHalf, +}; + +use thrift::server::TServer; + +use crossbeam_channel::{unbounded, Receiver, Sender}; + +/// +/// CR mode that will be used. +#[derive(Debug, Clone, PartialEq)] +pub enum CRMode { + /// Standard CR mode. + Cr, + /// CRAQ mode. + Craq, +} + +impl Default for CRMode { + fn default() -> Self { + CRMode::Craq + } +} + +type CraqClientInputProtocol = TBinaryInputProtocol>>; +type CraqClientOutputProtocol = TBinaryOutputProtocol>>; +pub(crate) type CraqClient = + CraqServiceSyncClient; + +/// +/// Representation of a physical CRAQ node. +#[derive(Default)] +pub struct CraqNode { + /// Run mode which is either CR or CRAQ mode. + pub cr_mode: CRMode, + /// Whole chain. + pub chain: Arc, + /// Tail connection pool receiver. + pub tail_pool_rx: Option>>, + /// Successor connection pool receiver. + pub successor_pool_rx: Option>>, + /// Tail connection pool sender. + pub tail_pool_tx: Option>>, + /// Successor connection pool sender. + pub successor_pool_tx: Option>>, + /// Stored node configuration to be reused across iterations + pub config: CraqConfig, +} + +impl CraqNode { + /// + /// Initialize a CRAQ node with given chain + fn new_node(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result { + Ok(Self { + cr_mode, + chain: Arc::new(chain), + config, + ..Default::default() + }) + } + + /// + /// Initial connection to the underlying protocol server. + fn connect_to_first(&self, server_addr: A) -> CraqClient + where + A: ToSocketAddrs, + { + self.connect_to_server(&server_addr).unwrap_or_else(|_e| { + std::thread::sleep(std::time::Duration::from_millis( + self.config.connection_sleep_time, + )); + self.connect_to_first(server_addr) + }) + } + + /// + /// Creates a connection pool to the given underlying protocol server. + fn create_conn_pool( + &self, + server_addr: A, + ) -> Result<(Sender, Receiver)> + where + A: ToSocketAddrs, + { + let (tx, rx) = unbounded(); + let client = self.connect_to_first(&server_addr); + // TODO: tryize + let _ = tx.try_send(client); + + let _ = (0..self.config.connection_pool_size) + .flat_map(|_| -> Result<_> { Ok(tx.try_send(self.connect_to_server(&server_addr)?)) }); + // while let Ok(_) = tx.try_send(self.connect_to_server(&server_addr)?) {} + + Ok((tx, rx)) + } + + /// + /// Connects to the other nodes in the given chain. + fn connect(noderef: Arc>) -> Result<()> { + let mut nodew = noderef.write(); + + debug!("Trying to connect"); + if nodew.chain.is_tail() { + return Ok(()); + } + + debug!("Checking tail connection..."); + if let Some(tail) = nodew.chain.clone().get_tail() { + let tail = tail.clone(); + let (t_tx, t_rx) = nodew.create_conn_pool(tail.get_addr())?; + nodew.tail_pool_rx = Some(Arc::new(t_rx)); + nodew.tail_pool_tx = Some(Arc::new(t_tx)); + info!( + "[CR Node {}] Connected to tail at {}", + nodew.chain.get_index(), + tail.get_addr() + ); + } else { + // NOTE: shouldn't happen + error!("Shouldn't have happened - tail follows"); + unreachable!() + } + + debug!("Checking node before the tail..."); + // Is this the node before the tail? + if nodew.chain.get_index() == nodew.chain.chain_size().saturating_sub(2) { + nodew.successor_pool_tx = nodew.tail_pool_tx.clone(); + nodew.successor_pool_rx = nodew.tail_pool_rx.clone(); + info!("[CR Node {}] Node before the tail", nodew.chain.get_index()); + return Ok(()); + } + + debug!("Checking successor..."); + if let Some(successor) = nodew.chain.get_successor() { + let successor = successor.clone(); + info!( + "[CR Node {}] Connecting to successor at {}", + nodew.chain.get_index(), + successor.get_addr() + ); + let (s_tx, s_rx) = nodew.create_conn_pool(successor.get_addr())?; + nodew.successor_pool_rx = Some(Arc::new(s_rx)); + nodew.successor_pool_tx = Some(Arc::new(s_tx)); + info!( + "[CR Node {}] Connected to successor at {}", + nodew.chain.get_index(), + successor.get_addr() + ); + } else { + // NOTE: shouldn't happen + error!("Shouldn't have happened - successor interval"); + unreachable!() + } + + debug!("All aligned..."); + + Ok(()) + } + + /// + /// Entrypoint / Start procedure of this node + pub fn start(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result<()> { + let port = chain + .get_node() + .map_or(config.fallback_replication_port, |n| n.get_addr().port()); + + let node = Arc::new(ERwLock::new(CraqNode::new_node(cr_mode, chain, config)?)); + + let connector_node = node.clone(); + let handle = std::thread::spawn(move || { + Self::connect(connector_node).expect("Successor connections has failed"); + }); + + let _ = handle.join(); + + info!("Starting protocol server at port: {}", port); + Self::run_protocol_server(node, port) + } + + /// + /// Connect to given server address using CRAQ client. + fn connect_to_server(&self, addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + + debug!("Issuing connection to: {}", host); + + let mut c = BiTcp::new(); + c.open(&host.to_string())?; + let (i_chan, o_chan) = c.split()?; + let (i_tran, o_tran) = ( + TFramedReadTransport::new(i_chan), + TFramedWriteTransport::new(o_chan), + ); + let (i_prot, o_prot) = ( + TBinaryInputProtocol::new(i_tran, true), + TBinaryOutputProtocol::new(o_tran, true), + ); + + debug!("Creating client: {}", host); + Ok(CraqClient::new(i_prot, o_prot)) + } + + /// + /// Start local protocol server + fn run_protocol_server(node: Arc>, port: u16) -> Result<()> { + let node = node.read(); + debug!("Protocol medium getting set up"); + + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + + let (i_tran_fact, i_prot_fact) = ( + TFramedReadTransportFactory::new(), + TBinaryInputProtocolFactory::new(), + ); + let (o_tran_fact, o_prot_fact) = ( + TFramedWriteTransportFactory::new(), + TBinaryOutputProtocolFactory::new(), + ); + + let processor = CraqServiceSyncProcessor::new(CraqProtoServer::new( + node.tail_pool_rx.clone(), + node.tail_pool_tx.clone(), + node.successor_pool_rx.clone(), + node.successor_pool_tx.clone(), + node.chain.clone(), + node.cr_mode.clone(), + )); + + debug!("Server started"); + let mut server = TServer::new( + i_tran_fact, + i_prot_fact, + o_tran_fact, + o_prot_fact, + processor, + node.config.protocol_worker_size, + ); + + debug!("Started listening"); + Ok(server.listen(&server_addr.to_string())?) + } +} diff --git a/artillery-ddata/src/craq/proto.rs b/artillery-ddata/src/craq/proto.rs new file mode 100644 index 0000000..0389fc5 --- /dev/null +++ b/artillery-ddata/src/craq/proto.rs @@ -0,0 +1,1422 @@ +// Autogenerated by Thrift Compiler (0.13.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +#![allow(unused_imports)] +#![allow(unused_extern_crates)] +#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments, type_complexity))] +#![cfg_attr(rustfmt, rustfmt_skip)] + +extern crate thrift; + +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::{From, TryFrom}; +use std::default::Default; +use std::error::Error; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::rc::Rc; +use thrift::OrderedFloat; + +use thrift::protocol::field_id; +use thrift::protocol::verify_expected_message_type; +use thrift::protocol::verify_expected_sequence_number; +use thrift::protocol::verify_expected_service_call; +use thrift::protocol::verify_required_field_exists; +use thrift::protocol::{ + TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier, + TMessageType, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType, +}; +use thrift::server::TProcessor; +use thrift::{ + ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient, +}; + +/// Consistency models. +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum CraqConsistencyModel { + Strong = 0, + Eventual = 1, + EventualMaxBounded = 2, + Debug = 3, +} + +impl CraqConsistencyModel { + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + o_prot.write_i32(*self as i32) + } + pub fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + let enum_value = i_prot.read_i32()?; + CraqConsistencyModel::try_from(enum_value) + } +} + +impl TryFrom for CraqConsistencyModel { + type Error = thrift::Error; + fn try_from(i: i32) -> Result { + match i { + 0 => Ok(CraqConsistencyModel::Strong), + 1 => Ok(CraqConsistencyModel::Eventual), + 2 => Ok(CraqConsistencyModel::EventualMaxBounded), + 3 => Ok(CraqConsistencyModel::Debug), + _ => Err(thrift::Error::Protocol(ProtocolError::new( + ProtocolErrorKind::InvalidData, + format!("cannot convert enum constant {} to CraqConsistencyModel", i), + ))), + } + } +} + +pub type Version = i64; + +// +// CraqObject +// + +/// Object envelope. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct CraqObject { + pub value: Option>, + pub dirty: Option, +} + +impl CraqObject { + pub fn new(value: F1, dirty: F2) -> CraqObject + where + F1: Into>>, + F2: Into>, + { + CraqObject { + value: value.into(), + dirty: dirty.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option> = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_bytes()?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_bool()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqObject { + value: f_1, + dirty: f_2, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqObject"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.value { + o_prot.write_field_begin(&TFieldIdentifier::new("value", TType::String, 1))?; + o_prot.write_bytes(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.dirty { + o_prot.write_field_begin(&TFieldIdentifier::new("dirty", TType::Bool, 2))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for CraqObject { + fn default() -> Self { + CraqObject { + value: Some(Vec::new()), + dirty: Some(false), + } + } +} + +// +// InvalidState +// + +/// Artillery CRAQ Invalid State Error +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct InvalidState { + pub reason: Option, +} + +impl InvalidState { + pub fn new(reason: F1) -> InvalidState + where + F1: Into>, + { + InvalidState { + reason: reason.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some("".to_owned()); + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_string()?; + f_1 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = InvalidState { reason: f_1 }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("InvalidState"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.reason { + o_prot.write_field_begin(&TFieldIdentifier::new("reason", TType::String, 1))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for InvalidState { + fn default() -> Self { + InvalidState { + reason: Some("".to_owned()), + } + } +} + +impl Error for InvalidState { + fn description(&self) -> &str { + "remote service threw InvalidState" + } +} + +impl From for thrift::Error { + fn from(e: InvalidState) -> Self { + thrift::Error::User(Box::new(e)) + } +} + +impl Display for InvalidState { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.description().fmt(f) + } +} + +// +// CraqService service client +// + +/// Artillery CRAQ service. +pub trait TCraqServiceSyncClient { + /// Reads a value with the desired consistency model. + fn read( + &mut self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result; + /// Writes a new value. + fn write(&mut self, obj: CraqObject) -> thrift::Result; + /// Performs a test-and-set operation. * + fn test_and_set( + &mut self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result; + /// Writes a new value with the given version. + fn write_versioned(&mut self, obj: CraqObject, version: Version) -> thrift::Result<()>; + /// Returns the latest committed version. + fn version_query(&mut self) -> thrift::Result; +} + +pub trait TCraqServiceSyncClientMarker {} + +pub struct CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + _i_prot: IP, + _o_prot: OP, + _sequence_number: i32, +} + +impl CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + pub fn new(input_protocol: IP, output_protocol: OP) -> CraqServiceSyncClient { + CraqServiceSyncClient { + _i_prot: input_protocol, + _o_prot: output_protocol, + _sequence_number: 0, + } + } +} + +impl TThriftClient for CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { + &mut self._i_prot + } + fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { + &mut self._o_prot + } + fn sequence_number(&self) -> i32 { + self._sequence_number + } + fn increment_sequence_number(&mut self) -> i32 { + self._sequence_number += 1; + self._sequence_number + } +} + +impl TCraqServiceSyncClientMarker for CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ +} + +impl TCraqServiceSyncClient for C { + fn read( + &mut self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("read", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceReadArgs { + model: model, + version_bound: version_bound, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("read", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceReadResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn write(&mut self, obj: CraqObject) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("write", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceWriteArgs { obj: obj }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("write", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceWriteResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn test_and_set( + &mut self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("testAndSet", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceTestAndSetArgs { + obj: obj, + expected_version: expected_version, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("testAndSet", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceTestAndSetResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn write_versioned(&mut self, obj: CraqObject, version: Version) -> thrift::Result<()> { + ({ + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Call, + self.sequence_number(), + ); + let call_args = CraqServiceWriteVersionedArgs { + obj: obj, + version: version, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("writeVersioned", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceWriteVersionedResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn version_query(&mut self) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("versionQuery", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceVersionQueryArgs {}; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("versionQuery", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceVersionQueryResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } +} + +// +// CraqService service processor +// + +/// Artillery CRAQ service. +pub trait CraqServiceSyncHandler { + /// Reads a value with the desired consistency model. + fn handle_read( + &self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result; + /// Writes a new value. + fn handle_write(&self, obj: CraqObject) -> thrift::Result; + /// Performs a test-and-set operation. * + fn handle_test_and_set( + &self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result; + /// Writes a new value with the given version. + fn handle_write_versioned(&self, obj: CraqObject, version: Version) -> thrift::Result<()>; + /// Returns the latest committed version. + fn handle_version_query(&self) -> thrift::Result; +} + +pub struct CraqServiceSyncProcessor { + handler: H, +} + +impl CraqServiceSyncProcessor { + pub fn new(handler: H) -> CraqServiceSyncProcessor { + CraqServiceSyncProcessor { handler } + } + fn process_read( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_read( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_write( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_write( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_test_and_set( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_test_and_set( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_write_versioned( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_write_versioned( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_version_query( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_version_query( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } +} + +pub struct TCraqServiceProcessFunctions; + +impl TCraqServiceProcessFunctions { + pub fn process_read( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceReadArgs::read_from_in_protocol(i_prot)?; + match handler.handle_read(args.model, args.version_bound) { + Ok(handler_return) => { + let message_ident = + TMessageIdentifier::new("read", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceReadResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "read", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "read", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_write( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceWriteArgs::read_from_in_protocol(i_prot)?; + match handler.handle_write(args.obj) { + Ok(handler_return) => { + let message_ident = + TMessageIdentifier::new("write", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceWriteResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "write", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "write", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_test_and_set( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceTestAndSetArgs::read_from_in_protocol(i_prot)?; + match handler.handle_test_and_set(args.obj, args.expected_version) { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceTestAndSetResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_write_versioned( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceWriteVersionedArgs::read_from_in_protocol(i_prot)?; + match handler.handle_write_versioned(args.obj, args.version) { + Ok(_) => { + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceWriteVersionedResult {}; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_version_query( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let _ = CraqServiceVersionQueryArgs::read_from_in_protocol(i_prot)?; + match handler.handle_version_query() { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceVersionQueryResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } +} + +impl TProcessor for CraqServiceSyncProcessor { + fn process( + &self, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let message_ident = i_prot.read_message_begin()?; + let res = match &*message_ident.name { + "read" => self.process_read(message_ident.sequence_number, i_prot, o_prot), + "write" => self.process_write(message_ident.sequence_number, i_prot, o_prot), + "testAndSet" => { + self.process_test_and_set(message_ident.sequence_number, i_prot, o_prot) + } + "writeVersioned" => { + self.process_write_versioned(message_ident.sequence_number, i_prot, o_prot) + } + "versionQuery" => { + self.process_version_query(message_ident.sequence_number, i_prot, o_prot) + } + method => Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::UnknownMethod, + format!("unknown method {}", method), + ))), + }; + thrift::server::handle_process_result(&message_ident, res, o_prot) + } +} + +// +// CraqServiceReadArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceReadArgs { + model: CraqConsistencyModel, + version_bound: Version, +} + +impl CraqServiceReadArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqConsistencyModel::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceReadArgs.model", &f_1)?; + verify_required_field_exists("CraqServiceReadArgs.version_bound", &f_2)?; + let ret = CraqServiceReadArgs { + model: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + version_bound: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("read_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("model", TType::I32, 1))?; + self.model.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("versionBound", TType::I64, 2))?; + o_prot.write_i64(self.version_bound)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceReadResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceReadResult { + result_value: Option, +} + +impl CraqServiceReadResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceReadResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceReadResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::Struct, 0))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceRead", + ))) + } + } +} + +// +// CraqServiceWriteArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteArgs { + obj: CraqObject, +} + +impl CraqServiceWriteArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceWriteArgs.obj", &f_1)?; + let ret = CraqServiceWriteArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("write_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceWriteResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteResult { + result_value: Option, +} + +impl CraqServiceWriteResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceWriteResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceWriteResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceWrite", + ))) + } + } +} + +// +// CraqServiceTestAndSetArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceTestAndSetArgs { + obj: CraqObject, + expected_version: Version, +} + +impl CraqServiceTestAndSetArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceTestAndSetArgs.obj", &f_1)?; + verify_required_field_exists("CraqServiceTestAndSetArgs.expected_version", &f_2)?; + let ret = CraqServiceTestAndSetArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + expected_version: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("testAndSet_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("expectedVersion", TType::I64, 2))?; + o_prot.write_i64(self.expected_version)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceTestAndSetResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceTestAndSetResult { + result_value: Option, +} + +impl CraqServiceTestAndSetResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceTestAndSetResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceTestAndSetResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceTestAndSet", + ))) + } + } +} + +// +// CraqServiceWriteVersionedArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteVersionedArgs { + obj: CraqObject, + version: Version, +} + +impl CraqServiceWriteVersionedArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceWriteVersionedArgs.obj", &f_1)?; + verify_required_field_exists("CraqServiceWriteVersionedArgs.version", &f_2)?; + let ret = CraqServiceWriteVersionedArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + version: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("writeVersioned_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("version", TType::I64, 2))?; + o_prot.write_i64(self.version)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceWriteVersionedResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteVersionedResult {} + +impl CraqServiceWriteVersionedResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceWriteVersionedResult {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceWriteVersionedResult"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result<()> { + Ok(()) + } +} + +// +// CraqServiceVersionQueryArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceVersionQueryArgs {} + +impl CraqServiceVersionQueryArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceVersionQueryArgs {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("versionQuery_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceVersionQueryResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceVersionQueryResult { + result_value: Option, +} + +impl CraqServiceVersionQueryResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceVersionQueryResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceVersionQueryResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceVersionQuery", + ))) + } + } +} diff --git a/artillery-ddata/src/craq/protocol/proto.thrift b/artillery-ddata/src/craq/protocol/proto.thrift new file mode 100644 index 0000000..2580680 --- /dev/null +++ b/artillery-ddata/src/craq/protocol/proto.thrift @@ -0,0 +1,46 @@ +/* + * CRAQ replication protocol definition + */ +namespace cpp artillery_craq +namespace java artillery.craq.thrift + +/** Version numbers. */ +typedef i64 Version + +/** Consistency models. */ +enum CraqConsistencyModel { STRONG, EVENTUAL, EVENTUAL_MAX_BOUNDED, DEBUG } + +/** Object envelope. */ +struct CraqObject { + 1: optional binary value; + 2: optional bool dirty; +} + +/** Artillery CRAQ Invalid State Error */ +exception InvalidState { + 1: string reason +} + +/** Artillery CRAQ service. */ +service CraqService { + // ------------------------------------------------------------------------- + // Client-facing methods + // ------------------------------------------------------------------------- + /** Reads a value with the desired consistency model. */ + CraqObject read(1:CraqConsistencyModel model, 2:Version versionBound), + + /** Writes a new value. */ + Version write(1:CraqObject obj), + + /** Performs a test-and-set operation. **/ + Version testAndSet(1:CraqObject obj, 2:Version expectedVersion), + + // ------------------------------------------------------------------------- + // Internal methods + // ------------------------------------------------------------------------- + /** Writes a new value with the given version. */ + void writeVersioned(1:CraqObject obj, 2:Version version), + + /** Returns the latest committed version. */ + Version versionQuery() +} diff --git a/artillery-ddata/src/craq/server.rs b/artillery-ddata/src/craq/server.rs new file mode 100644 index 0000000..4f2d673 --- /dev/null +++ b/artillery-ddata/src/craq/server.rs @@ -0,0 +1,387 @@ +use super::proto::*; +use super::{chain::CraqChain, erwlock::ERwLock, node::CRMode}; +use crate::craq::node::CraqClient; + +use core::sync::atomic::AtomicI64; +use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use std::sync::{atomic::Ordering, Arc}; +use t1ha::T1haHashMap as HashMap; + +pub struct CraqProtoServer { + /// Tail connection pool receiver. + pub tp_rx: Option>>, + /// Successor connection pool receiver. + pub sp_rx: Option>>, + /// Tail connection pool sender. + pub tp_tx: Option>>, + /// Successor connection pool sender. + pub sp_tx: Option>>, + + /// + /// CR reference + chain_ref: Arc, + /// Known objects: version, bytes + pub objects: Arc>>, + /// Latest known version which is either clean or dirty. + /// + /// NOTE: This should start with a negative round to make + /// version aligned at the first WR/DR. + pub latest_version: AtomicI64, + + /// Latest clean version this one is always clean. + /// + /// NOTE: This should start with a negative round to make + /// version aligned at the first clean version upgrade commit. + pub latest_clean_version: AtomicI64, + + /// + /// Algorithmic mode of operation + cr_mode: CRMode, +} + +impl CraqProtoServer { + pub fn new( + tp_rx: Option>>, + tp_tx: Option>>, + sp_rx: Option>>, + sp_tx: Option>>, + chain_ref: Arc, + cr_mode: CRMode, + ) -> Self { + Self { + tp_rx, + sp_rx, + tp_tx, + sp_tx, + chain_ref, + cr_mode, + objects: Arc::new(ERwLock::new(HashMap::default())), + latest_version: AtomicI64::new(!0), + latest_clean_version: AtomicI64::new(!0), + } + } + + /// + /// Creates shallow dirty copy of given object + fn copy_object(&self, obj: &CraqObject, dirty: bool) -> CraqObject { + let mut copied = obj.clone(); + copied.dirty = Some(dirty); + copied + } + + /// + /// Handle all incoming write-mode requests like: test-and-set, write + fn write( + &self, + obj: CraqObject, + expected_version: i64, + ) -> std::result::Result { + if self.tp_tx.is_none() + || self.tp_rx.is_none() + || self.sp_tx.is_none() + || self.sp_rx.is_none() + { + return Err(state_error("Chain is not initialized!")); + } + + if !self.chain_ref.is_head() { + return Err(state_error("Cannot write to non-head!")); + } + + if expected_version != !0 { + // reject if latest version is not the expected version or there are uncommitted writes + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + let latest_version = self.latest_version.load(Ordering::SeqCst); + + if latest_clean_version != expected_version || latest_version != latest_clean_version { + return Ok(!0); + } + } + + // Record new object version. Do the Harlem Shake... + let mut new_version = self.latest_version.fetch_add(1, Ordering::SeqCst); + new_version += 1; + self.objects.write().insert(new_version, obj.clone()); + + // Send down chain + let s_rx = self.sp_rx.as_ref().unwrap(); + let s_tx = self.sp_tx.as_ref().unwrap(); + // TODO: tryize + let mut successor = s_rx.try_recv().unwrap(); + successor.write_versioned(obj, new_version)?; + // TODO: tryize + let _ = s_tx.try_send(successor); + + // Update current clean version + let old_clean_ver: i64 = { + // TODO: It should be CAS. + let loaded = self.latest_clean_version.load(Ordering::SeqCst); + if loaded < new_version { + self.latest_clean_version + .store(new_version, Ordering::SeqCst); + new_version + } else { + loaded + } + }; + + if new_version > old_clean_ver { + self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst)); + } + + Ok(new_version) + } + + /// + /// Strong consistency specific version query + /// This one makes a version request to tail to get the appropriate object + fn get_obj_from_version_query(&self) -> std::result::Result { + // Send a version query + let t_rx = self.tp_rx.as_ref().unwrap(); + let t_tx = self.tp_tx.as_ref().unwrap(); + // TODO: tryize + let mut tail = t_rx.try_recv().unwrap(); + let tail_version = tail.version_query()?; + // TODO: tryize + let _ = t_tx.try_send(tail); + + // If no clean version is around then return an empty obj + if tail_version < 0 { + return Ok(CraqObject::default()); + } + + let mut obj = self.objects.read().get(&tail_version).cloned(); + if obj.is_none() { + // newer version already committed (old one erased), return the latest clean version + obj = self + .objects + .read() + .get(&self.latest_clean_version.load(Ordering::SeqCst)) + .cloned(); + } + + obj.map_or( + Err(state_error("Returning empty object after a version query!")), + Result::Ok, + ) + } + + /// + /// Removes all object versions older than the latest clean one. + fn remove_old_versions(&self, latest_clean_ver: i64) { + let mut objects = self.objects.write(); + objects.retain(|k, _| k >= &latest_clean_ver); + } +} + +impl CraqServiceSyncHandler for CraqProtoServer { + /// + /// Handles versioned query request received from the protocol client + fn handle_version_query(&self) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received version query...", + self.chain_ref.get_index() + ); + + // only tail should receive version queries + if !self.chain_ref.is_tail() { + return Err(state_error( + "Cannot make a version query to a non-tail node!", + )); + } + + Ok(self.latest_clean_version.load(Ordering::SeqCst)) + } + + /// + /// Handles versioned write request received from the protocol client + fn handle_write_versioned( + &self, + obj: CraqObject, + version: i64, + ) -> std::result::Result<(), thrift::Error> { + debug!( + "[Artillery CRAQ Node {}] Received write with version: {}", + self.chain_ref.get_index(), + version + ); + + // Head should not receive versioned writes + if self.chain_ref.is_head() { + return Err(state_error("Cannot make a versioned write to the head!")); + } + + // Write latest object version + self.objects.write().insert(version, obj.clone()); + + // Update latest version if applicable + if self.latest_version.load(Ordering::SeqCst) < version { + self.latest_version.store(version, Ordering::SeqCst); + } + + // Non-tail: send down chain + if !self.chain_ref.is_tail() { + let s_rx = self.sp_rx.as_ref().unwrap(); + let s_tx = self.sp_tx.as_ref().unwrap(); + // TODO: tryize + let mut successor = s_rx.try_recv().unwrap(); + successor.write_versioned(obj, version)?; + // TODO: tryize + let _ = s_tx.try_send(successor); + } + + // Mark this current version as CLEAN + let old_clean_ver: i64 = { + // TODO: CAS it should be. + let loaded = self.latest_clean_version.load(Ordering::SeqCst); + if loaded < version { + self.latest_clean_version.store(version, Ordering::SeqCst); + version + } else { + loaded + } + }; + + if version > old_clean_ver || self.chain_ref.is_tail() { + self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst)); + } + + Ok(()) + } + + /// + /// Handles test-and-set request received from the protocol client + fn handle_test_and_set( + &self, + obj: CraqObject, + expected_version: i64, + ) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received test-and-set request from client...", + self.chain_ref.get_index() + ); + + self.write(obj, expected_version) + } + + /// + /// Handles write request received from the protocol client + fn handle_write(&self, obj: CraqObject) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received write request from client...", + self.chain_ref.get_index() + ); + + self.write(obj, !0) + } + + fn handle_read( + &self, + model: CraqConsistencyModel, + version_bound: i64, + ) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received read request from client...", + self.chain_ref.get_index() + ); + + // Node hasn't initialized? + if !self.chain_ref.is_tail() + && (self.tp_rx.is_none() + || self.tp_tx.is_none() + || self.sp_rx.is_none() + || self.sp_tx.is_none()) + { + return Err(state_error("Chain is not initialized!")); + } + + // Running normal CR: fail if we're not the tail + if self.cr_mode == CRMode::Cr && !self.chain_ref.is_tail() { + return Err(state_error("Cannot read from non-tail node in CR mode!")); + } + + // No objects stored? + if self.objects.read().is_empty() { + return Ok(CraqObject::default()); + } + + // Lazy programmers do the best they say. + // Same people said there's more than one way to do it. + match model { + CraqConsistencyModel::Strong => { + if self.latest_version.load(Ordering::SeqCst) + > self.latest_clean_version.load(Ordering::SeqCst) + && !self.chain_ref.is_tail() + { + // Non-tail: latest known version isn't clean, send a version query + let obj = self.get_obj_from_version_query()?; + return Ok(self.copy_object(&obj, true)); + } + + if self.latest_clean_version.load(Ordering::SeqCst) < 0 { + return Ok(CraqObject::default()); + } + + if self.chain_ref.is_tail() { + let latest_version = self.latest_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_version) { + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object from the tail")); + } + } + + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_clean_version) { + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object from a clean read!")); + } + } + CraqConsistencyModel::Eventual => { + // Return latest known version + let latest_version = self.latest_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_version) { + // TODO: normally None for dirty. + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object for an eventual read!")); + } + } + CraqConsistencyModel::EventualMaxBounded => { + // Return latest known version within the given bound + let latest_version = self.latest_version.load(Ordering::SeqCst); + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + let bounded_version = latest_clean_version.saturating_add(std::cmp::min( + version_bound, + latest_version.saturating_sub(latest_clean_version), + )); + if let Some(obj) = self.objects.read().get(&bounded_version) { + // TODO: normally None for dirty. + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error( + "Returning null object for a bounded eventual read!", + )); + } + } + CraqConsistencyModel::Debug => { + // make a version query + if self.chain_ref.is_tail() { + return Ok(CraqObject::default()); + } else { + self.copy_object(&self.get_obj_from_version_query()?, true); + } + } + } + + error!("Fatal state error happened."); + Err(state_error("Fatal state error.")) + } +} + +#[inline] +fn state_error(msg: &str) -> thrift::Error { + thrift::Error::from(InvalidState::new(msg.to_owned())) +} diff --git a/artillery-ddata/src/lib.rs b/artillery-ddata/src/lib.rs index 31e1bb2..80d59da 100644 --- a/artillery-ddata/src/lib.rs +++ b/artillery-ddata/src/lib.rs @@ -1,7 +1,5 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} +#[macro_use] +extern crate log; + +#[macro_use] +pub mod craq; diff --git a/ddata-tests/shutdown.sh b/ddata-tests/shutdown.sh new file mode 100755 index 0000000..f49423e --- /dev/null +++ b/ddata-tests/shutdown.sh @@ -0,0 +1,9 @@ +for i in `seq 0 $CHAIN_LEN` +do + a=`printenv PID$i` + kill $a + echo "kill" $a + export PID$i= +done + +export CHAIN_LEN= diff --git a/ddata-tests/test.sh b/ddata-tests/test.sh new file mode 100755 index 0000000..ce0c0f2 --- /dev/null +++ b/ddata-tests/test.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -eo pipefail + +len=$(($#-1)) +export CHAIN_LEN=$len +for i in `seq 0 $(($#-1))` +do + echo $i + target/debug/examples/craq_node server 0 $i $* & + export PID$i=$! + echo ${PID}$i +done