diff --git a/Cargo.toml b/Cargo.toml index a4660f2..285cf10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,8 @@ members = [ [profile.release] lto = "fat" -codegen-units = 1 \ No newline at end of file +codegen-units = 1 + + +# [profile.bench] +# debug = true \ No newline at end of file diff --git a/Makefile.toml b/Makefile.toml new file mode 100644 index 0000000..fd2d12c --- /dev/null +++ b/Makefile.toml @@ -0,0 +1,12 @@ +[config] +default_to_workspace = false + +[env] +CRAQ_DIR = "artillery-ddata/src/craq" + +[tasks.compile-craq] +script = [ + ''' + thrift -out $CRAQ_DIR --gen rs $CRAQ_DIR/protocol/proto.thrift + ''' +] \ No newline at end of file diff --git a/artillery-core/Cargo.toml b/artillery-core/Cargo.toml index f90ea32..1136c4a 100644 --- a/artillery-core/Cargo.toml +++ b/artillery-core/Cargo.toml @@ -2,13 +2,14 @@ name = "artillery-core" version = "0.1.0" authors = ["Mahmut Bulut "] +description = "Fire-forged cluster management & Distributed data protocol" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -log = "0.4.8" -failure = "0.1.6" +log = "0.4" +failure = "0.1.7" failure_derive = "0.1.6" bastion-utils = "0.3.2" cuneiform-fields = "0.1" @@ -20,7 +21,7 @@ rand = "0.7.3" mio = { version = "0.7.0-alpha.1", features = ["os-poll", "udp"] } futures = "0.3" pin-utils = "0.1.0-alpha.4" -libp2p = "0.16.0" +libp2p = { version = "0.18", features = ["mdns"] } bastion-executor = "0.3.4" lightproc = "0.3.4" crossbeam-channel = "0.4.2" diff --git a/artillery-ddata/Cargo.toml b/artillery-ddata/Cargo.toml index 5dbd592..59463a4 100644 --- a/artillery-ddata/Cargo.toml +++ b/artillery-ddata/Cargo.toml @@ -4,6 +4,25 @@ version = "0.1.0" authors = ["Mahmut Bulut "] edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] +log = "0.4" +failure = "0.1.7" +thrift = "0.13.0" +t1ha = "0.1" +crossbeam-channel = "0.4" + + +[dev-dependencies] +clap = "2.33.0" +pretty_env_logger = "0.4.0" +bastion = "0.3" +bastion-executor = "0.3" +lightproc = "0.3" +rand = "0.7" +criterion = "0.3" +futures = "0.3" + + +[[bench]] +name = "craq_bencher" +harness = false diff --git a/artillery-ddata/benches/craq_bencher.rs b/artillery-ddata/benches/craq_bencher.rs new file mode 100644 index 0000000..04f4d16 --- /dev/null +++ b/artillery-ddata/benches/craq_bencher.rs @@ -0,0 +1,194 @@ +use criterion::{criterion_group, criterion_main, Criterion}; + +use artillery_ddata::craq::prelude::*; + +use futures::stream::StreamExt; +use rand::distributions::Alphanumeric; +use rand::prelude::*; + +pub fn entry_bench_read(sip: String, args: Vec<&str>) -> Vec { + // Connect to servers + let num_clients: usize = args[0].parse::().unwrap(); + let num_bytes: usize = args[1].parse::().unwrap(); + let _trials: usize = args[2].parse::().unwrap(); + let num_servers = args.len() - 3; + + let hpc: Vec<&str> = sip.split(":").collect(); + let mut hosts = vec![(hpc[0], hpc[1].parse::().unwrap())]; + + (0..num_servers).into_iter().for_each(|i| { + let hpc: Vec<&str> = args[i + 3].split(":").collect(); + let host = hpc[0]; + let port = hpc[1]; + let port = port.parse::().unwrap(); + + hosts.extend([(host, port)].iter()); + }); + + let mut clients: Vec = (0..num_clients) + .into_iter() + .map(|i| { + let (host, port) = hosts[i % hosts.len()]; + DDataCraqClient::connect_host_port(host, port).unwrap() + }) + .collect(); + + if clients[0].write(gen_random_str(num_bytes)).is_err() { + println!("bench_write: Couldn't write new revision."); + } + + // Check if any object is written... + if clients[0].read(CraqConsistencyModel::Strong, 0).is_err() { + println!("bench_read: Could not read object."); + } + + clients +} + +pub fn gen_random_str(slen: usize) -> String { + thread_rng().sample_iter(&Alphanumeric).take(slen).collect() +} + +pub fn stub_read(clients: &mut Vec) { + clients.iter_mut().for_each(|client| { + let _ = client.read(CraqConsistencyModel::Eventual, 0); + }); +} + +fn client_benchmarks(c: &mut Criterion) { + { + // 1 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["1", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_1_client", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 2 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["2", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_2_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 3 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["3", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_3_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 4 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["4", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_4_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 5 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["5", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_5_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 10 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["10", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_10_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 20 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["20", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_20_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 30 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["30", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_30_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 40 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["40", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_40_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 50 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["50", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_50_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + { + // 100 clients + let mut clients = entry_bench_read( + "127.0.0.1:30001".to_string(), + vec!["100", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"], + ); + c.bench_function("benchmark_read_100_clients", |b| { + b.iter(|| stub_read(&mut clients)) + }); + } + + // { + // // 500 clients + // let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["500", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]); + // c.bench_function("benchmark_read_500_clients", |b| b.iter(|| stub_read(&mut clients))); + // } + + // { + // // 1000 clients + // let mut clients = entry_bench_read("127.0.0.1:30001".to_string(), vec!["1000", "1000", "100", "127.0.0.1:30002", "127.0.0.1:30003"]); + // c.bench_function("benchmark_read_1000_clients", |b| b.iter(|| stub_read(&mut clients))); + // } +} + +criterion_group!(benches, client_benchmarks); +criterion_main!(benches); diff --git a/artillery-ddata/examples/craq_node.rs b/artillery-ddata/examples/craq_node.rs new file mode 100644 index 0000000..3b08118 --- /dev/null +++ b/artillery-ddata/examples/craq_node.rs @@ -0,0 +1,98 @@ +extern crate pretty_env_logger; + +#[macro_use] +extern crate log; + +use artillery_ddata::craq::prelude::*; +use clap::*; + +fn main() { + pretty_env_logger::init(); + + let matches = App::new("Artillery CRAQ") + .author("Mahmut Bulut, vertexclique [ta] gmail [tod] com") + .version(crate_version!()) + .about("Artillery Distributed Data Protocol Tester") + .subcommand( + SubCommand::with_name("server") + .about("Runs a CRAQ server") + .arg( + Arg::with_name("cr_mode") + .required(true) + .help("CR mode that server would use: 0 for CRAQ, 1 for CR") + .index(1), + ) + .arg( + Arg::with_name("node_index") + .required(true) + .help("Node index this server would use") + .index(2), + ) + .arg( + Arg::with_name("chain_servers") + .required(true) + .multiple(true), + ), + ) + .subcommand( + SubCommand::with_name("client") + .about("Runs a CRAQ client") + .arg( + Arg::with_name("server_ip_port") + .required(true) + .help("Server ip and port to connect") + .index(1), + ) + .arg( + Arg::with_name("test_method") + .required(true) + .help("Test method of client to test against the server") + .index(2), + ) + .arg( + Arg::with_name("extra_args") + .required(true) + .multiple(true) + .min_values(3), + ), + ) + .after_help("Enables Artillery CRAQ protocol to be tested in the server/client fashion") + .get_matches(); + + match matches.subcommand() { + ("server", Some(server_matches)) => { + let cr_mode = match server_matches.value_of("cr_mode") { + Some("0") => CRMode::Craq, + Some("1") => CRMode::Cr, + _ => panic!("CR mode not as expected"), + }; + + if let Some(node_index) = server_matches.value_of("node_index") { + let node_index = node_index.parse::().unwrap(); + let varargs: Vec<&str> = + server_matches.values_of("chain_servers").unwrap().collect(); + + let nodes: Vec = varargs.iter().flat_map(ChainNode::new).collect(); + + assert_eq!(nodes.len(), varargs.len(), "Node address parsing failed"); + + let chain = CraqChain::new(&nodes, node_index).unwrap(); + CraqNode::start(cr_mode, chain, CraqConfig::default()); + } + } + ("client", Some(client_matches)) => { + let _sip = client_matches + .value_of("server_ip_port") + .unwrap() + .to_string(); + + match client_matches.value_of("test_method") { + Some("bench_read") => todo!(), + _ => unreachable!(), + } + } + _ => { + error!("Couldn't find any known subcommands"); + } + } +} diff --git a/artillery-ddata/src/craq/chain.rs b/artillery-ddata/src/craq/chain.rs new file mode 100644 index 0000000..6a65501 --- /dev/null +++ b/artillery-ddata/src/craq/chain.rs @@ -0,0 +1,88 @@ +use super::chain_node::ChainNode; +use super::errors::*; +use std::fmt; +use std::fmt::Display; + +/// +/// Representation of a closed-loop CRAQ chain +#[derive(Default, Debug)] +pub struct CraqChain { + /// List of nodes in this chain, in order. + nodes: Vec, + /// Index of this node. + node_idx: usize, +} + +impl CraqChain { + /// + /// Create a new chain. + pub fn new(nodes: &[ChainNode], node_idx: usize) -> Result { + ensure!( + node_idx < nodes.len(), + "Node index can't be greater than chain length." + ); + + Ok(Self { + nodes: nodes.to_vec(), + node_idx, + }) + } + + /// + /// Returns whether this node is the head of its chain. + pub fn is_head(&self) -> bool { + self.node_idx == 0 + } + + /// + /// Returns the successor node if exists + pub fn is_tail(&self) -> bool { + self.node_idx == self.nodes.len().saturating_sub(1) + } + + /// + /// Returns the successor node if exists + pub fn get_successor(&self) -> Option<&ChainNode> { + if self.is_tail() { + None + } else { + self.nodes.get(self.node_idx.saturating_add(1)) + } + } + + /// + /// Returns the tail node. + pub fn get_tail(&self) -> Option<&ChainNode> { + self.nodes.last() + } + + /// + /// Returns the chain node associated with the current node index. + pub fn get_node(&self) -> Option<&ChainNode> { + self.nodes.get(self.node_idx) + } + + /// + /// Returns the current node index. + pub fn get_index(&self) -> usize { + self.node_idx + } + + /// + /// Returns the size of this chain. + pub fn chain_size(&self) -> usize { + self.nodes.len() + } +} + +/// +/// Human-readable display impl for the Chain +impl Display for CraqChain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "CR: Index [{}] in chain: {:#?}", + self.node_idx, self.nodes + ) + } +} diff --git a/artillery-ddata/src/craq/chain_node.rs b/artillery-ddata/src/craq/chain_node.rs new file mode 100644 index 0000000..60ab804 --- /dev/null +++ b/artillery-ddata/src/craq/chain_node.rs @@ -0,0 +1,26 @@ +use super::errors::*; +use std::net::{SocketAddr, ToSocketAddrs}; + +/// +/// Chain node representation +#[derive(Debug, Clone)] +pub struct ChainNode { + host: SocketAddr, +} + +impl ChainNode { + pub fn new(addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + Ok(Self { host }) + } + + pub fn get_addr(&self) -> &SocketAddr { + &self.host + } +} diff --git a/artillery-ddata/src/craq/client.rs b/artillery-ddata/src/craq/client.rs new file mode 100644 index 0000000..1d36025 --- /dev/null +++ b/artillery-ddata/src/craq/client.rs @@ -0,0 +1,117 @@ +use std::fmt; + +use super::errors::*; +use super::{ + node::CraqClient, + proto::{CraqConsistencyModel, CraqObject, TCraqServiceSyncClient}, +}; +use std::net::{SocketAddr, ToSocketAddrs}; +use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol}; +use thrift::transport::{ + TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, + TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp, +}; + +pub struct ReadObject { + /// + /// Object's value. + value: Vec, + /// + /// Whether the read was dirty (true) or clean (false). + dirty: bool, +} + +impl ReadObject { + /// + /// Creates a new wrapper Read Object + pub fn new(value: Vec, dirty: bool) -> Self { + Self { value, dirty } + } +} + +impl fmt::Debug for ReadObject { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadObject") + .field("value", &self.value) + .field("dirty", &self.dirty) + .finish() + } +} + +impl fmt::Display for ReadObject { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReadObject") + .field("value", &self.value) + .field("dirty", &self.dirty) + .finish() + } +} + +pub struct DDataCraqClient { + host: SocketAddr, + cc: CraqClient, +} + +impl DDataCraqClient { + pub fn connect_host_port(host: T, port: u16) -> Result + where + T: AsRef, + { + Self::connect(format!("{}:{}", host.as_ref(), port)) + } + + pub fn connect(addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + + debug!("Client is initiating connection to: {}", host); + + let mut c = BiTcp::new(); + c.open(&host.to_string())?; + let (i_chan, o_chan) = c.split()?; + let (i_tran, o_tran) = ( + TFramedReadTransport::new(i_chan), + TFramedWriteTransport::new(o_chan), + ); + let (i_prot, o_prot) = ( + TBinaryInputProtocol::new(i_tran, true), + TBinaryOutputProtocol::new(o_tran, true), + ); + + debug!("Created client: {}", host); + let cc = CraqClient::new(i_prot, o_prot); + Ok(Self { host, cc }) + } + + /// + /// Writes an object to the cluster, returning the new object version or -1 upon failure. + pub fn write(&mut self, value: String) -> Result { + let mut obj = CraqObject::default(); + obj.value = Some(value.into_bytes()); + Ok(self.cc.write(obj)?) + } + + /// + /// Reads an object with given bound version. + pub fn read(&mut self, model: CraqConsistencyModel, version_bound: i64) -> Result { + let obj = self.cc.read(model, version_bound)?; + + match (obj.value, obj.dirty) { + (Some(v), Some(d)) => Ok(ReadObject::new(v, d)), + _ => bail!(CraqError::ReadError, "Read request failed"), + } + } + + /// + /// Performs a test-and-set operation, returning the new object version or -1 upon failure. + pub fn test_and_set(&mut self, value: String, expected_version: i64) -> Result { + let mut obj = CraqObject::default(); + obj.value = Some(value.into_bytes()); + Ok(self.cc.test_and_set(obj, expected_version)?) + } +} diff --git a/artillery-ddata/src/craq/craq_config.rs b/artillery-ddata/src/craq/craq_config.rs new file mode 100644 index 0000000..54039ca --- /dev/null +++ b/artillery-ddata/src/craq/craq_config.rs @@ -0,0 +1,22 @@ +use super::node::CRMode; + +#[derive(Debug, Clone)] +pub struct CraqConfig { + pub fallback_replication_port: u16, + pub operation_mode: CRMode, + pub connection_sleep_time: u64, + pub connection_pool_size: usize, + pub protocol_worker_size: usize, +} + +impl Default for CraqConfig { + fn default() -> Self { + CraqConfig { + fallback_replication_port: 22991_u16, + operation_mode: CRMode::Craq, + connection_sleep_time: 1000_u64, + connection_pool_size: 50_usize, + protocol_worker_size: 100_usize, + } + } +} diff --git a/artillery-ddata/src/craq/errors.rs b/artillery-ddata/src/craq/errors.rs new file mode 100644 index 0000000..4871a28 --- /dev/null +++ b/artillery-ddata/src/craq/errors.rs @@ -0,0 +1,56 @@ +use failure::Fail; +use std::io; + +use std::result; + +/// Result type for operations that could result in an `CraqError` +pub type Result = result::Result; + +#[derive(Fail, Debug)] +pub enum CraqError { + #[fail(display = "Artillery :: CRAQ :: I/O error occurred: {}", _0)] + IOError(io::Error), + #[fail(display = "Artillery :: CRAQ :: Socket addr: {}", _0)] + SocketAddrError(String), + #[fail(display = "Artillery :: CRAQ :: Assertion failed: {}", _0)] + AssertionError(String, failure::Backtrace), + #[fail(display = "Artillery :: CRAQ :: Protocol error: {}", _0)] + ProtocolError(thrift::Error), + #[fail(display = "Artillery :: CRAQ :: Read error: {}", _0)] + ReadError(String), +} + +impl From for CraqError { + fn from(e: io::Error) -> Self { + CraqError::IOError(e) + } +} + +impl From for CraqError { + fn from(e: thrift::Error) -> Self { + CraqError::ProtocolError(e) + } +} + +#[macro_export] +macro_rules! bail { + ($kind:expr, $e:expr) => { + return Err($kind($e.to_owned())); + }; + ($kind:expr, $fmt:expr, $($arg:tt)+) => { + return Err($kind(format!($fmt, $($arg)+).to_owned())); + }; +} + +macro_rules! ensure { + ($cond:expr, $e:expr) => { + if !($cond) { + return Err(CraqError::AssertionError($e.to_string(), failure::Backtrace::new())); + } + }; + ($cond:expr, $fmt:expr, $($arg:tt)+) => { + if !($cond) { + return Err(CraqError::AssertionError(format!($fmt, $($arg)+).to_string(), failure::Backtrace::new())); + } + }; +} diff --git a/artillery-ddata/src/craq/erwlock.rs b/artillery-ddata/src/craq/erwlock.rs new file mode 100644 index 0000000..ae283ec --- /dev/null +++ b/artillery-ddata/src/craq/erwlock.rs @@ -0,0 +1,41 @@ +use core::sync::atomic::spin_loop_hint; +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +/// +/// +/// RwLock that blocks until yielding +pub struct ERwLock(RwLock); + +impl ERwLock { + pub fn new(t: T) -> ERwLock { + ERwLock(RwLock::new(t)) + } +} + +impl ERwLock { + #[inline] + pub fn read(&self) -> RwLockReadGuard { + loop { + match self.0.try_read() { + Ok(guard) => break guard, + _ => spin_loop_hint(), + } + } + } + + #[inline] + pub fn write(&self) -> RwLockWriteGuard { + loop { + match self.0.try_write() { + Ok(guard) => break guard, + _ => spin_loop_hint(), + } + } + } + + #[allow(dead_code)] + #[inline] + pub fn inner(&self) -> &RwLock { + &self.0 + } +} diff --git a/artillery-ddata/src/craq/mod.rs b/artillery-ddata/src/craq/mod.rs new file mode 100644 index 0000000..7cacd87 --- /dev/null +++ b/artillery-ddata/src/craq/mod.rs @@ -0,0 +1,28 @@ +/// Error API for CRAQ distributed store +#[macro_use] +pub mod errors; + +mod chain; +mod chain_node; +mod erwlock; + +#[allow(clippy::all)] +#[allow(deprecated)] +#[allow(unknown_lints)] +mod proto; +mod server; + +pub mod client; +pub mod craq_config; +pub mod node; + +/// Prelude for CRAQ distributed store +pub mod prelude { + pub use super::chain::*; + pub use super::chain_node::*; + pub use super::client::*; + pub use super::craq_config::*; + pub use super::errors::*; + pub use super::node::*; + pub use super::proto::*; +} diff --git a/artillery-ddata/src/craq/node.rs b/artillery-ddata/src/craq/node.rs new file mode 100644 index 0000000..cc81d33 --- /dev/null +++ b/artillery-ddata/src/craq/node.rs @@ -0,0 +1,262 @@ +use super::errors::*; + +use super::chain::CraqChain; +use super::{craq_config::CraqConfig, erwlock::ERwLock, proto::*, server::CraqProtoServer}; + +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}, + sync::Arc, +}; +use thrift::protocol::{ + TBinaryInputProtocol, TBinaryInputProtocolFactory, TBinaryOutputProtocol, + TBinaryOutputProtocolFactory, +}; +use thrift::transport::{ + ReadHalf, TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, + TFramedWriteTransportFactory, TIoChannel, TTcpChannel as BiTcp, WriteHalf, +}; + +use thrift::server::TServer; + +use crossbeam_channel::{unbounded, Receiver, Sender}; + +/// +/// CR mode that will be used. +#[derive(Debug, Clone, PartialEq)] +pub enum CRMode { + /// Standard CR mode. + Cr, + /// CRAQ mode. + Craq, +} + +impl Default for CRMode { + fn default() -> Self { + CRMode::Craq + } +} + +type CraqClientInputProtocol = TBinaryInputProtocol>>; +type CraqClientOutputProtocol = TBinaryOutputProtocol>>; +pub(crate) type CraqClient = + CraqServiceSyncClient; + +/// +/// Representation of a physical CRAQ node. +#[derive(Default)] +pub struct CraqNode { + /// Run mode which is either CR or CRAQ mode. + pub cr_mode: CRMode, + /// Whole chain. + pub chain: Arc, + /// Tail connection pool receiver. + pub tail_pool_rx: Option>>, + /// Successor connection pool receiver. + pub successor_pool_rx: Option>>, + /// Tail connection pool sender. + pub tail_pool_tx: Option>>, + /// Successor connection pool sender. + pub successor_pool_tx: Option>>, + /// Stored node configuration to be reused across iterations + pub config: CraqConfig, +} + +impl CraqNode { + /// + /// Initialize a CRAQ node with given chain + fn new_node(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result { + Ok(Self { + cr_mode, + chain: Arc::new(chain), + config, + ..Default::default() + }) + } + + /// + /// Initial connection to the underlying protocol server. + fn connect_to_first(&self, server_addr: A) -> CraqClient + where + A: ToSocketAddrs, + { + self.connect_to_server(&server_addr).unwrap_or_else(|_e| { + std::thread::sleep(std::time::Duration::from_millis( + self.config.connection_sleep_time, + )); + self.connect_to_first(server_addr) + }) + } + + /// + /// Creates a connection pool to the given underlying protocol server. + fn create_conn_pool( + &self, + server_addr: A, + ) -> Result<(Sender, Receiver)> + where + A: ToSocketAddrs, + { + let (tx, rx) = unbounded(); + let client = self.connect_to_first(&server_addr); + // TODO: tryize + let _ = tx.try_send(client); + + let _ = (0..self.config.connection_pool_size) + .flat_map(|_| -> Result<_> { Ok(tx.try_send(self.connect_to_server(&server_addr)?)) }); + // while let Ok(_) = tx.try_send(self.connect_to_server(&server_addr)?) {} + + Ok((tx, rx)) + } + + /// + /// Connects to the other nodes in the given chain. + fn connect(noderef: Arc>) -> Result<()> { + let mut nodew = noderef.write(); + + debug!("Trying to connect"); + if nodew.chain.is_tail() { + return Ok(()); + } + + debug!("Checking tail connection..."); + if let Some(tail) = nodew.chain.clone().get_tail() { + let tail = tail.clone(); + let (t_tx, t_rx) = nodew.create_conn_pool(tail.get_addr())?; + nodew.tail_pool_rx = Some(Arc::new(t_rx)); + nodew.tail_pool_tx = Some(Arc::new(t_tx)); + info!( + "[CR Node {}] Connected to tail at {}", + nodew.chain.get_index(), + tail.get_addr() + ); + } else { + // NOTE: shouldn't happen + error!("Shouldn't have happened - tail follows"); + unreachable!() + } + + debug!("Checking node before the tail..."); + // Is this the node before the tail? + if nodew.chain.get_index() == nodew.chain.chain_size().saturating_sub(2) { + nodew.successor_pool_tx = nodew.tail_pool_tx.clone(); + nodew.successor_pool_rx = nodew.tail_pool_rx.clone(); + info!("[CR Node {}] Node before the tail", nodew.chain.get_index()); + return Ok(()); + } + + debug!("Checking successor..."); + if let Some(successor) = nodew.chain.get_successor() { + let successor = successor.clone(); + info!( + "[CR Node {}] Connecting to successor at {}", + nodew.chain.get_index(), + successor.get_addr() + ); + let (s_tx, s_rx) = nodew.create_conn_pool(successor.get_addr())?; + nodew.successor_pool_rx = Some(Arc::new(s_rx)); + nodew.successor_pool_tx = Some(Arc::new(s_tx)); + info!( + "[CR Node {}] Connected to successor at {}", + nodew.chain.get_index(), + successor.get_addr() + ); + } else { + // NOTE: shouldn't happen + error!("Shouldn't have happened - successor interval"); + unreachable!() + } + + debug!("All aligned..."); + + Ok(()) + } + + /// + /// Entrypoint / Start procedure of this node + pub fn start(cr_mode: CRMode, chain: CraqChain, config: CraqConfig) -> Result<()> { + let port = chain + .get_node() + .map_or(config.fallback_replication_port, |n| n.get_addr().port()); + + let node = Arc::new(ERwLock::new(CraqNode::new_node(cr_mode, chain, config)?)); + + let connector_node = node.clone(); + let handle = std::thread::spawn(move || { + Self::connect(connector_node).expect("Successor connections has failed"); + }); + + let _ = handle.join(); + + info!("Starting protocol server at port: {}", port); + Self::run_protocol_server(node, port) + } + + /// + /// Connect to given server address using CRAQ client. + fn connect_to_server(&self, addr: A) -> Result + where + A: ToSocketAddrs, + { + let host: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| CraqError::SocketAddrError("No node address given or parsed.".into()))?; + + debug!("Issuing connection to: {}", host); + + let mut c = BiTcp::new(); + c.open(&host.to_string())?; + let (i_chan, o_chan) = c.split()?; + let (i_tran, o_tran) = ( + TFramedReadTransport::new(i_chan), + TFramedWriteTransport::new(o_chan), + ); + let (i_prot, o_prot) = ( + TBinaryInputProtocol::new(i_tran, true), + TBinaryOutputProtocol::new(o_tran, true), + ); + + debug!("Creating client: {}", host); + Ok(CraqClient::new(i_prot, o_prot)) + } + + /// + /// Start local protocol server + fn run_protocol_server(node: Arc>, port: u16) -> Result<()> { + let node = node.read(); + debug!("Protocol medium getting set up"); + + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + + let (i_tran_fact, i_prot_fact) = ( + TFramedReadTransportFactory::new(), + TBinaryInputProtocolFactory::new(), + ); + let (o_tran_fact, o_prot_fact) = ( + TFramedWriteTransportFactory::new(), + TBinaryOutputProtocolFactory::new(), + ); + + let processor = CraqServiceSyncProcessor::new(CraqProtoServer::new( + node.tail_pool_rx.clone(), + node.tail_pool_tx.clone(), + node.successor_pool_rx.clone(), + node.successor_pool_tx.clone(), + node.chain.clone(), + node.cr_mode.clone(), + )); + + debug!("Server started"); + let mut server = TServer::new( + i_tran_fact, + i_prot_fact, + o_tran_fact, + o_prot_fact, + processor, + node.config.protocol_worker_size, + ); + + debug!("Started listening"); + Ok(server.listen(&server_addr.to_string())?) + } +} diff --git a/artillery-ddata/src/craq/proto.rs b/artillery-ddata/src/craq/proto.rs new file mode 100644 index 0000000..0389fc5 --- /dev/null +++ b/artillery-ddata/src/craq/proto.rs @@ -0,0 +1,1422 @@ +// Autogenerated by Thrift Compiler (0.13.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +#![allow(unused_imports)] +#![allow(unused_extern_crates)] +#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments, type_complexity))] +#![cfg_attr(rustfmt, rustfmt_skip)] + +extern crate thrift; + +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::{From, TryFrom}; +use std::default::Default; +use std::error::Error; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::rc::Rc; +use thrift::OrderedFloat; + +use thrift::protocol::field_id; +use thrift::protocol::verify_expected_message_type; +use thrift::protocol::verify_expected_sequence_number; +use thrift::protocol::verify_expected_service_call; +use thrift::protocol::verify_required_field_exists; +use thrift::protocol::{ + TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier, + TMessageType, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType, +}; +use thrift::server::TProcessor; +use thrift::{ + ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient, +}; + +/// Consistency models. +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum CraqConsistencyModel { + Strong = 0, + Eventual = 1, + EventualMaxBounded = 2, + Debug = 3, +} + +impl CraqConsistencyModel { + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + o_prot.write_i32(*self as i32) + } + pub fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + let enum_value = i_prot.read_i32()?; + CraqConsistencyModel::try_from(enum_value) + } +} + +impl TryFrom for CraqConsistencyModel { + type Error = thrift::Error; + fn try_from(i: i32) -> Result { + match i { + 0 => Ok(CraqConsistencyModel::Strong), + 1 => Ok(CraqConsistencyModel::Eventual), + 2 => Ok(CraqConsistencyModel::EventualMaxBounded), + 3 => Ok(CraqConsistencyModel::Debug), + _ => Err(thrift::Error::Protocol(ProtocolError::new( + ProtocolErrorKind::InvalidData, + format!("cannot convert enum constant {} to CraqConsistencyModel", i), + ))), + } + } +} + +pub type Version = i64; + +// +// CraqObject +// + +/// Object envelope. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct CraqObject { + pub value: Option>, + pub dirty: Option, +} + +impl CraqObject { + pub fn new(value: F1, dirty: F2) -> CraqObject + where + F1: Into>>, + F2: Into>, + { + CraqObject { + value: value.into(), + dirty: dirty.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option> = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_bytes()?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_bool()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqObject { + value: f_1, + dirty: f_2, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqObject"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.value { + o_prot.write_field_begin(&TFieldIdentifier::new("value", TType::String, 1))?; + o_prot.write_bytes(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.dirty { + o_prot.write_field_begin(&TFieldIdentifier::new("dirty", TType::Bool, 2))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for CraqObject { + fn default() -> Self { + CraqObject { + value: Some(Vec::new()), + dirty: Some(false), + } + } +} + +// +// InvalidState +// + +/// Artillery CRAQ Invalid State Error +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct InvalidState { + pub reason: Option, +} + +impl InvalidState { + pub fn new(reason: F1) -> InvalidState + where + F1: Into>, + { + InvalidState { + reason: reason.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some("".to_owned()); + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_string()?; + f_1 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = InvalidState { reason: f_1 }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("InvalidState"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.reason { + o_prot.write_field_begin(&TFieldIdentifier::new("reason", TType::String, 1))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for InvalidState { + fn default() -> Self { + InvalidState { + reason: Some("".to_owned()), + } + } +} + +impl Error for InvalidState { + fn description(&self) -> &str { + "remote service threw InvalidState" + } +} + +impl From for thrift::Error { + fn from(e: InvalidState) -> Self { + thrift::Error::User(Box::new(e)) + } +} + +impl Display for InvalidState { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.description().fmt(f) + } +} + +// +// CraqService service client +// + +/// Artillery CRAQ service. +pub trait TCraqServiceSyncClient { + /// Reads a value with the desired consistency model. + fn read( + &mut self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result; + /// Writes a new value. + fn write(&mut self, obj: CraqObject) -> thrift::Result; + /// Performs a test-and-set operation. * + fn test_and_set( + &mut self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result; + /// Writes a new value with the given version. + fn write_versioned(&mut self, obj: CraqObject, version: Version) -> thrift::Result<()>; + /// Returns the latest committed version. + fn version_query(&mut self) -> thrift::Result; +} + +pub trait TCraqServiceSyncClientMarker {} + +pub struct CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + _i_prot: IP, + _o_prot: OP, + _sequence_number: i32, +} + +impl CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + pub fn new(input_protocol: IP, output_protocol: OP) -> CraqServiceSyncClient { + CraqServiceSyncClient { + _i_prot: input_protocol, + _o_prot: output_protocol, + _sequence_number: 0, + } + } +} + +impl TThriftClient for CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ + fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { + &mut self._i_prot + } + fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { + &mut self._o_prot + } + fn sequence_number(&self) -> i32 { + self._sequence_number + } + fn increment_sequence_number(&mut self) -> i32 { + self._sequence_number += 1; + self._sequence_number + } +} + +impl TCraqServiceSyncClientMarker for CraqServiceSyncClient +where + IP: TInputProtocol, + OP: TOutputProtocol, +{ +} + +impl TCraqServiceSyncClient for C { + fn read( + &mut self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("read", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceReadArgs { + model: model, + version_bound: version_bound, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("read", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceReadResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn write(&mut self, obj: CraqObject) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("write", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceWriteArgs { obj: obj }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("write", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceWriteResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn test_and_set( + &mut self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("testAndSet", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceTestAndSetArgs { + obj: obj, + expected_version: expected_version, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("testAndSet", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceTestAndSetResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn write_versioned(&mut self, obj: CraqObject, version: Version) -> thrift::Result<()> { + ({ + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Call, + self.sequence_number(), + ); + let call_args = CraqServiceWriteVersionedArgs { + obj: obj, + version: version, + }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("writeVersioned", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceWriteVersionedResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } + fn version_query(&mut self) -> thrift::Result { + ({ + self.increment_sequence_number(); + let message_ident = + TMessageIdentifier::new("versionQuery", TMessageType::Call, self.sequence_number()); + let call_args = CraqServiceVersionQueryArgs {}; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + })?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("versionQuery", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = + thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)); + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CraqServiceVersionQueryResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } +} + +// +// CraqService service processor +// + +/// Artillery CRAQ service. +pub trait CraqServiceSyncHandler { + /// Reads a value with the desired consistency model. + fn handle_read( + &self, + model: CraqConsistencyModel, + version_bound: Version, + ) -> thrift::Result; + /// Writes a new value. + fn handle_write(&self, obj: CraqObject) -> thrift::Result; + /// Performs a test-and-set operation. * + fn handle_test_and_set( + &self, + obj: CraqObject, + expected_version: Version, + ) -> thrift::Result; + /// Writes a new value with the given version. + fn handle_write_versioned(&self, obj: CraqObject, version: Version) -> thrift::Result<()>; + /// Returns the latest committed version. + fn handle_version_query(&self) -> thrift::Result; +} + +pub struct CraqServiceSyncProcessor { + handler: H, +} + +impl CraqServiceSyncProcessor { + pub fn new(handler: H) -> CraqServiceSyncProcessor { + CraqServiceSyncProcessor { handler } + } + fn process_read( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_read( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_write( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_write( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_test_and_set( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_test_and_set( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_write_versioned( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_write_versioned( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } + fn process_version_query( + &self, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + TCraqServiceProcessFunctions::process_version_query( + &self.handler, + incoming_sequence_number, + i_prot, + o_prot, + ) + } +} + +pub struct TCraqServiceProcessFunctions; + +impl TCraqServiceProcessFunctions { + pub fn process_read( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceReadArgs::read_from_in_protocol(i_prot)?; + match handler.handle_read(args.model, args.version_bound) { + Ok(handler_return) => { + let message_ident = + TMessageIdentifier::new("read", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceReadResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "read", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "read", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_write( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceWriteArgs::read_from_in_protocol(i_prot)?; + match handler.handle_write(args.obj) { + Ok(handler_return) => { + let message_ident = + TMessageIdentifier::new("write", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceWriteResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "write", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "write", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_test_and_set( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceTestAndSetArgs::read_from_in_protocol(i_prot)?; + match handler.handle_test_and_set(args.obj, args.expected_version) { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceTestAndSetResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "testAndSet", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_write_versioned( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let args = CraqServiceWriteVersionedArgs::read_from_in_protocol(i_prot)?; + match handler.handle_write_versioned(args.obj, args.version) { + Ok(_) => { + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceWriteVersionedResult {}; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "writeVersioned", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } + pub fn process_version_query( + handler: &H, + incoming_sequence_number: i32, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let _ = CraqServiceVersionQueryArgs::read_from_in_protocol(i_prot)?; + match handler.handle_version_query() { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Reply, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + let ret = CraqServiceVersionQueryResult { + result_value: Some(handler_return), + }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + Err(e) => match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + _ => { + let ret_err = + ApplicationError::new(ApplicationErrorKind::Unknown, e.description()); + let message_ident = TMessageIdentifier::new( + "versionQuery", + TMessageType::Exception, + incoming_sequence_number, + ); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + } + }, + } + } +} + +impl TProcessor for CraqServiceSyncProcessor { + fn process( + &self, + i_prot: &mut dyn TInputProtocol, + o_prot: &mut dyn TOutputProtocol, + ) -> thrift::Result<()> { + let message_ident = i_prot.read_message_begin()?; + let res = match &*message_ident.name { + "read" => self.process_read(message_ident.sequence_number, i_prot, o_prot), + "write" => self.process_write(message_ident.sequence_number, i_prot, o_prot), + "testAndSet" => { + self.process_test_and_set(message_ident.sequence_number, i_prot, o_prot) + } + "writeVersioned" => { + self.process_write_versioned(message_ident.sequence_number, i_prot, o_prot) + } + "versionQuery" => { + self.process_version_query(message_ident.sequence_number, i_prot, o_prot) + } + method => Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::UnknownMethod, + format!("unknown method {}", method), + ))), + }; + thrift::server::handle_process_result(&message_ident, res, o_prot) + } +} + +// +// CraqServiceReadArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceReadArgs { + model: CraqConsistencyModel, + version_bound: Version, +} + +impl CraqServiceReadArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqConsistencyModel::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceReadArgs.model", &f_1)?; + verify_required_field_exists("CraqServiceReadArgs.version_bound", &f_2)?; + let ret = CraqServiceReadArgs { + model: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + version_bound: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("read_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("model", TType::I32, 1))?; + self.model.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("versionBound", TType::I64, 2))?; + o_prot.write_i64(self.version_bound)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceReadResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceReadResult { + result_value: Option, +} + +impl CraqServiceReadResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceReadResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceReadResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::Struct, 0))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceRead", + ))) + } + } +} + +// +// CraqServiceWriteArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteArgs { + obj: CraqObject, +} + +impl CraqServiceWriteArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceWriteArgs.obj", &f_1)?; + let ret = CraqServiceWriteArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("write_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceWriteResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteResult { + result_value: Option, +} + +impl CraqServiceWriteResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceWriteResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceWriteResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceWrite", + ))) + } + } +} + +// +// CraqServiceTestAndSetArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceTestAndSetArgs { + obj: CraqObject, + expected_version: Version, +} + +impl CraqServiceTestAndSetArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceTestAndSetArgs.obj", &f_1)?; + verify_required_field_exists("CraqServiceTestAndSetArgs.expected_version", &f_2)?; + let ret = CraqServiceTestAndSetArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + expected_version: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("testAndSet_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("expectedVersion", TType::I64, 2))?; + o_prot.write_i64(self.expected_version)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceTestAndSetResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceTestAndSetResult { + result_value: Option, +} + +impl CraqServiceTestAndSetResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceTestAndSetResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceTestAndSetResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceTestAndSet", + ))) + } + } +} + +// +// CraqServiceWriteVersionedArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteVersionedArgs { + obj: CraqObject, + version: Version, +} + +impl CraqServiceWriteVersionedArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = CraqObject::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + } + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CraqServiceWriteVersionedArgs.obj", &f_1)?; + verify_required_field_exists("CraqServiceWriteVersionedArgs.version", &f_2)?; + let ret = CraqServiceWriteVersionedArgs { + obj: f_1 + .expect("auto-generated code should have checked for presence of required fields"), + version: f_2 + .expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("writeVersioned_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("obj", TType::Struct, 1))?; + self.obj.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("version", TType::I64, 2))?; + o_prot.write_i64(self.version)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceWriteVersionedResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceWriteVersionedResult {} + +impl CraqServiceWriteVersionedResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceWriteVersionedResult {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceWriteVersionedResult"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result<()> { + Ok(()) + } +} + +// +// CraqServiceVersionQueryArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceVersionQueryArgs {} + +impl CraqServiceVersionQueryArgs { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceVersionQueryArgs {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("versionQuery_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CraqServiceVersionQueryResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CraqServiceVersionQueryResult { + result_value: Option, +} + +impl CraqServiceVersionQueryResult { + fn read_from_in_protocol( + i_prot: &mut dyn TInputProtocol, + ) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let val = i_prot.read_i64()?; + f_0 = Some(val); + } + _ => { + i_prot.skip(field_ident.field_type)?; + } + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CraqServiceVersionQueryResult { result_value: f_0 }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CraqServiceVersionQueryResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::I64, 0))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err(thrift::Error::Application(ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CraqServiceVersionQuery", + ))) + } + } +} diff --git a/artillery-ddata/src/craq/protocol/proto.thrift b/artillery-ddata/src/craq/protocol/proto.thrift new file mode 100644 index 0000000..2580680 --- /dev/null +++ b/artillery-ddata/src/craq/protocol/proto.thrift @@ -0,0 +1,46 @@ +/* + * CRAQ replication protocol definition + */ +namespace cpp artillery_craq +namespace java artillery.craq.thrift + +/** Version numbers. */ +typedef i64 Version + +/** Consistency models. */ +enum CraqConsistencyModel { STRONG, EVENTUAL, EVENTUAL_MAX_BOUNDED, DEBUG } + +/** Object envelope. */ +struct CraqObject { + 1: optional binary value; + 2: optional bool dirty; +} + +/** Artillery CRAQ Invalid State Error */ +exception InvalidState { + 1: string reason +} + +/** Artillery CRAQ service. */ +service CraqService { + // ------------------------------------------------------------------------- + // Client-facing methods + // ------------------------------------------------------------------------- + /** Reads a value with the desired consistency model. */ + CraqObject read(1:CraqConsistencyModel model, 2:Version versionBound), + + /** Writes a new value. */ + Version write(1:CraqObject obj), + + /** Performs a test-and-set operation. **/ + Version testAndSet(1:CraqObject obj, 2:Version expectedVersion), + + // ------------------------------------------------------------------------- + // Internal methods + // ------------------------------------------------------------------------- + /** Writes a new value with the given version. */ + void writeVersioned(1:CraqObject obj, 2:Version version), + + /** Returns the latest committed version. */ + Version versionQuery() +} diff --git a/artillery-ddata/src/craq/server.rs b/artillery-ddata/src/craq/server.rs new file mode 100644 index 0000000..4f2d673 --- /dev/null +++ b/artillery-ddata/src/craq/server.rs @@ -0,0 +1,387 @@ +use super::proto::*; +use super::{chain::CraqChain, erwlock::ERwLock, node::CRMode}; +use crate::craq::node::CraqClient; + +use core::sync::atomic::AtomicI64; +use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use std::sync::{atomic::Ordering, Arc}; +use t1ha::T1haHashMap as HashMap; + +pub struct CraqProtoServer { + /// Tail connection pool receiver. + pub tp_rx: Option>>, + /// Successor connection pool receiver. + pub sp_rx: Option>>, + /// Tail connection pool sender. + pub tp_tx: Option>>, + /// Successor connection pool sender. + pub sp_tx: Option>>, + + /// + /// CR reference + chain_ref: Arc, + /// Known objects: version, bytes + pub objects: Arc>>, + /// Latest known version which is either clean or dirty. + /// + /// NOTE: This should start with a negative round to make + /// version aligned at the first WR/DR. + pub latest_version: AtomicI64, + + /// Latest clean version this one is always clean. + /// + /// NOTE: This should start with a negative round to make + /// version aligned at the first clean version upgrade commit. + pub latest_clean_version: AtomicI64, + + /// + /// Algorithmic mode of operation + cr_mode: CRMode, +} + +impl CraqProtoServer { + pub fn new( + tp_rx: Option>>, + tp_tx: Option>>, + sp_rx: Option>>, + sp_tx: Option>>, + chain_ref: Arc, + cr_mode: CRMode, + ) -> Self { + Self { + tp_rx, + sp_rx, + tp_tx, + sp_tx, + chain_ref, + cr_mode, + objects: Arc::new(ERwLock::new(HashMap::default())), + latest_version: AtomicI64::new(!0), + latest_clean_version: AtomicI64::new(!0), + } + } + + /// + /// Creates shallow dirty copy of given object + fn copy_object(&self, obj: &CraqObject, dirty: bool) -> CraqObject { + let mut copied = obj.clone(); + copied.dirty = Some(dirty); + copied + } + + /// + /// Handle all incoming write-mode requests like: test-and-set, write + fn write( + &self, + obj: CraqObject, + expected_version: i64, + ) -> std::result::Result { + if self.tp_tx.is_none() + || self.tp_rx.is_none() + || self.sp_tx.is_none() + || self.sp_rx.is_none() + { + return Err(state_error("Chain is not initialized!")); + } + + if !self.chain_ref.is_head() { + return Err(state_error("Cannot write to non-head!")); + } + + if expected_version != !0 { + // reject if latest version is not the expected version or there are uncommitted writes + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + let latest_version = self.latest_version.load(Ordering::SeqCst); + + if latest_clean_version != expected_version || latest_version != latest_clean_version { + return Ok(!0); + } + } + + // Record new object version. Do the Harlem Shake... + let mut new_version = self.latest_version.fetch_add(1, Ordering::SeqCst); + new_version += 1; + self.objects.write().insert(new_version, obj.clone()); + + // Send down chain + let s_rx = self.sp_rx.as_ref().unwrap(); + let s_tx = self.sp_tx.as_ref().unwrap(); + // TODO: tryize + let mut successor = s_rx.try_recv().unwrap(); + successor.write_versioned(obj, new_version)?; + // TODO: tryize + let _ = s_tx.try_send(successor); + + // Update current clean version + let old_clean_ver: i64 = { + // TODO: It should be CAS. + let loaded = self.latest_clean_version.load(Ordering::SeqCst); + if loaded < new_version { + self.latest_clean_version + .store(new_version, Ordering::SeqCst); + new_version + } else { + loaded + } + }; + + if new_version > old_clean_ver { + self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst)); + } + + Ok(new_version) + } + + /// + /// Strong consistency specific version query + /// This one makes a version request to tail to get the appropriate object + fn get_obj_from_version_query(&self) -> std::result::Result { + // Send a version query + let t_rx = self.tp_rx.as_ref().unwrap(); + let t_tx = self.tp_tx.as_ref().unwrap(); + // TODO: tryize + let mut tail = t_rx.try_recv().unwrap(); + let tail_version = tail.version_query()?; + // TODO: tryize + let _ = t_tx.try_send(tail); + + // If no clean version is around then return an empty obj + if tail_version < 0 { + return Ok(CraqObject::default()); + } + + let mut obj = self.objects.read().get(&tail_version).cloned(); + if obj.is_none() { + // newer version already committed (old one erased), return the latest clean version + obj = self + .objects + .read() + .get(&self.latest_clean_version.load(Ordering::SeqCst)) + .cloned(); + } + + obj.map_or( + Err(state_error("Returning empty object after a version query!")), + Result::Ok, + ) + } + + /// + /// Removes all object versions older than the latest clean one. + fn remove_old_versions(&self, latest_clean_ver: i64) { + let mut objects = self.objects.write(); + objects.retain(|k, _| k >= &latest_clean_ver); + } +} + +impl CraqServiceSyncHandler for CraqProtoServer { + /// + /// Handles versioned query request received from the protocol client + fn handle_version_query(&self) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received version query...", + self.chain_ref.get_index() + ); + + // only tail should receive version queries + if !self.chain_ref.is_tail() { + return Err(state_error( + "Cannot make a version query to a non-tail node!", + )); + } + + Ok(self.latest_clean_version.load(Ordering::SeqCst)) + } + + /// + /// Handles versioned write request received from the protocol client + fn handle_write_versioned( + &self, + obj: CraqObject, + version: i64, + ) -> std::result::Result<(), thrift::Error> { + debug!( + "[Artillery CRAQ Node {}] Received write with version: {}", + self.chain_ref.get_index(), + version + ); + + // Head should not receive versioned writes + if self.chain_ref.is_head() { + return Err(state_error("Cannot make a versioned write to the head!")); + } + + // Write latest object version + self.objects.write().insert(version, obj.clone()); + + // Update latest version if applicable + if self.latest_version.load(Ordering::SeqCst) < version { + self.latest_version.store(version, Ordering::SeqCst); + } + + // Non-tail: send down chain + if !self.chain_ref.is_tail() { + let s_rx = self.sp_rx.as_ref().unwrap(); + let s_tx = self.sp_tx.as_ref().unwrap(); + // TODO: tryize + let mut successor = s_rx.try_recv().unwrap(); + successor.write_versioned(obj, version)?; + // TODO: tryize + let _ = s_tx.try_send(successor); + } + + // Mark this current version as CLEAN + let old_clean_ver: i64 = { + // TODO: CAS it should be. + let loaded = self.latest_clean_version.load(Ordering::SeqCst); + if loaded < version { + self.latest_clean_version.store(version, Ordering::SeqCst); + version + } else { + loaded + } + }; + + if version > old_clean_ver || self.chain_ref.is_tail() { + self.remove_old_versions(self.latest_clean_version.load(Ordering::SeqCst)); + } + + Ok(()) + } + + /// + /// Handles test-and-set request received from the protocol client + fn handle_test_and_set( + &self, + obj: CraqObject, + expected_version: i64, + ) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received test-and-set request from client...", + self.chain_ref.get_index() + ); + + self.write(obj, expected_version) + } + + /// + /// Handles write request received from the protocol client + fn handle_write(&self, obj: CraqObject) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received write request from client...", + self.chain_ref.get_index() + ); + + self.write(obj, !0) + } + + fn handle_read( + &self, + model: CraqConsistencyModel, + version_bound: i64, + ) -> std::result::Result { + debug!( + "[Artillery CRAQ Node {}] Received read request from client...", + self.chain_ref.get_index() + ); + + // Node hasn't initialized? + if !self.chain_ref.is_tail() + && (self.tp_rx.is_none() + || self.tp_tx.is_none() + || self.sp_rx.is_none() + || self.sp_tx.is_none()) + { + return Err(state_error("Chain is not initialized!")); + } + + // Running normal CR: fail if we're not the tail + if self.cr_mode == CRMode::Cr && !self.chain_ref.is_tail() { + return Err(state_error("Cannot read from non-tail node in CR mode!")); + } + + // No objects stored? + if self.objects.read().is_empty() { + return Ok(CraqObject::default()); + } + + // Lazy programmers do the best they say. + // Same people said there's more than one way to do it. + match model { + CraqConsistencyModel::Strong => { + if self.latest_version.load(Ordering::SeqCst) + > self.latest_clean_version.load(Ordering::SeqCst) + && !self.chain_ref.is_tail() + { + // Non-tail: latest known version isn't clean, send a version query + let obj = self.get_obj_from_version_query()?; + return Ok(self.copy_object(&obj, true)); + } + + if self.latest_clean_version.load(Ordering::SeqCst) < 0 { + return Ok(CraqObject::default()); + } + + if self.chain_ref.is_tail() { + let latest_version = self.latest_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_version) { + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object from the tail")); + } + } + + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_clean_version) { + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object from a clean read!")); + } + } + CraqConsistencyModel::Eventual => { + // Return latest known version + let latest_version = self.latest_version.load(Ordering::SeqCst); + if let Some(obj) = self.objects.read().get(&latest_version) { + // TODO: normally None for dirty. + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error("Returning null object for an eventual read!")); + } + } + CraqConsistencyModel::EventualMaxBounded => { + // Return latest known version within the given bound + let latest_version = self.latest_version.load(Ordering::SeqCst); + let latest_clean_version = self.latest_clean_version.load(Ordering::SeqCst); + let bounded_version = latest_clean_version.saturating_add(std::cmp::min( + version_bound, + latest_version.saturating_sub(latest_clean_version), + )); + if let Some(obj) = self.objects.read().get(&bounded_version) { + // TODO: normally None for dirty. + return Ok(self.copy_object(obj, false)); + } else { + return Err(state_error( + "Returning null object for a bounded eventual read!", + )); + } + } + CraqConsistencyModel::Debug => { + // make a version query + if self.chain_ref.is_tail() { + return Ok(CraqObject::default()); + } else { + self.copy_object(&self.get_obj_from_version_query()?, true); + } + } + } + + error!("Fatal state error happened."); + Err(state_error("Fatal state error.")) + } +} + +#[inline] +fn state_error(msg: &str) -> thrift::Error { + thrift::Error::from(InvalidState::new(msg.to_owned())) +} diff --git a/artillery-ddata/src/lib.rs b/artillery-ddata/src/lib.rs index 31e1bb2..80d59da 100644 --- a/artillery-ddata/src/lib.rs +++ b/artillery-ddata/src/lib.rs @@ -1,7 +1,5 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} +#[macro_use] +extern crate log; + +#[macro_use] +pub mod craq; diff --git a/ddata-tests/shutdown.sh b/ddata-tests/shutdown.sh new file mode 100755 index 0000000..f49423e --- /dev/null +++ b/ddata-tests/shutdown.sh @@ -0,0 +1,9 @@ +for i in `seq 0 $CHAIN_LEN` +do + a=`printenv PID$i` + kill $a + echo "kill" $a + export PID$i= +done + +export CHAIN_LEN= diff --git a/ddata-tests/test.sh b/ddata-tests/test.sh new file mode 100755 index 0000000..ce0c0f2 --- /dev/null +++ b/ddata-tests/test.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -eo pipefail + +len=$(($#-1)) +export CHAIN_LEN=$len +for i in `seq 0 $(($#-1))` +do + echo $i + target/debug/examples/craq_node server 0 $i $* & + export PID$i=$! + echo ${PID}$i +done