diff --git a/ext/hermann/hermann_rdkafka.c b/ext/hermann/hermann_rdkafka.c index 1d84a1e..09663b9 100644 --- a/ext/hermann/hermann_rdkafka.c +++ b/ext/hermann/hermann_rdkafka.c @@ -33,6 +33,11 @@ #include "hermann_rdkafka.h" +/* This header file exposes the functions in librdkafka.a that are needed for + * consistent partitioning. After librdkafka releases a new tag and Hermann + * points to it, this can be removed. */ +#include "rdcrc32.h" + #ifdef HAVE_RUBY_VERSION_H #include #endif @@ -134,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk, } } + +/* This function is in rdkafka.h on librdkafka master. As soon as a new + * version is released and Hermann points to it, this can be removed. */ +int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt, + const void *key, size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque) { + return rd_crc32(key, keylen) % partition_cnt; +} + + /** * Producer partitioner callback. * Used to determine the target partition within a topic for production. @@ -154,17 +171,11 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { - /* Pick a random partition */ - int retry = 0; - int32_t partition = RD_KAFKA_PARTITION_UA; - - for (; retry < partition_cnt; retry++) { - partition = rand() % partition_cnt; - if (rd_kafka_topic_partition_available(rkt, partition)) { - break; /* this one will do */ - } + if (keylen) { + return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque); + } else { + return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque); } - return partition; } /** @@ -589,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) { * @param message VALUE the ruby String containing the outgoing message. * @param topic VALUE the ruby String containing the topic to use for the * outgoing message. + * @param key VALUE the ruby String containing the key to partition by * @param result VALUE the Hermann::Result object to be fulfilled when the * push completes */ -static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) { +static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) { HermannInstanceConfig* producerConfig; /* Context pointer, pointing to `result`, for the librdkafka delivery @@ -600,6 +612,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE */ hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t)); rd_kafka_topic_t *rkt = NULL; + rd_kafka_topic_conf_t *rkt_conf = NULL; TRACER("self: %p, message: %p, result: %p)\n", self, message, result); @@ -622,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE TRACER("kafka initialized\n"); + /* Topic configuration */ + rkt_conf = rd_kafka_topic_conf_new(); + + /* Set the partitioner callback */ + rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback); + rkt = rd_kafka_topic_new(producerConfig->rk, RSTRING_PTR(topic), - NULL); + rkt_conf); if (NULL == rkt) { rb_raise(rb_eRuntimeError, "Could not construct a topic structure"); @@ -645,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE RD_KAFKA_MSG_F_COPY, RSTRING_PTR(message), RSTRING_LEN(message), - NULL, - 0, + RSTRING_PTR(partition_key), + RSTRING_LEN(partition_key), delivery_ctx)) { fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n", rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition, @@ -1240,7 +1259,7 @@ void Init_hermann_rdkafka() { rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1); /* Producer.push_single(msg) */ - rb_define_method(c_producer, "push_single", producer_push_single, 3); + rb_define_method(c_producer, "push_single", producer_push_single, 4); /* Producer.tick */ rb_define_method(c_producer, "tick", producer_tick, 1); diff --git a/ext/hermann/rdcrc32.h b/ext/hermann/rdcrc32.h new file mode 100644 index 0000000..8ee546f --- /dev/null +++ b/ext/hermann/rdcrc32.h @@ -0,0 +1,103 @@ +/** + * \file rdcrc32.h + * Functions and types for CRC checks. + * + * Generated on Tue May 8 17:36:59 2012, + * by pycrc v0.7.10, http://www.tty1.net/pycrc/ + * + * NOTE: Contains librd modifications: + * - rd_crc32() helper. + * - __RDCRC32___H__ define (was missing the '32' part). + * + * using the configuration: + * Width = 32 + * Poly = 0x04c11db7 + * XorIn = 0xffffffff + * ReflectIn = True + * XorOut = 0xffffffff + * ReflectOut = True + * Algorithm = table-driven + *****************************************************************************/ +#ifndef __RDCRC32___H__ +#define __RDCRC32___H__ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +/** + * The definition of the used algorithm. + *****************************************************************************/ +#define CRC_ALGO_TABLE_DRIVEN 1 + + +/** + * The type of the CRC values. + * + * This type must be big enough to contain at least 32 bits. + *****************************************************************************/ +typedef uint32_t rd_crc32_t; + + +/** + * Reflect all bits of a \a data word of \a data_len bytes. + * + * \param data The data word to be reflected. + * \param data_len The width of \a data expressed in number of bits. + * \return The reflected data. + *****************************************************************************/ +rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len); + + +/** + * Calculate the initial crc value. + * + * \return The initial crc value. + *****************************************************************************/ +static inline rd_crc32_t rd_crc32_init(void) +{ + return 0xffffffff; +} + + +/** + * Update the crc value with new data. + * + * \param crc The current crc value. + * \param data Pointer to a buffer of \a data_len bytes. + * \param data_len Number of bytes in the \a data buffer. + * \return The updated crc value. + *****************************************************************************/ +rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len); + + +/** + * Calculate the final crc value. + * + * \param crc The current crc value. + * \return The final crc value. + *****************************************************************************/ +static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc) +{ + return crc ^ 0xffffffff; +} + + +/** + * Wrapper for performing CRC32 on the provided buffer. + */ +static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) { + return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(), + (const unsigned char *)data, + data_len)); +} + +#ifdef __cplusplus +} /* closing brace for extern "C" */ +#endif + +#endif /* __RDCRC32___H__ */ diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index ea722ac..50ab838 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -63,8 +63,7 @@ module Hermann end if Hermann.jruby? - key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil - result = @internal.push_single(value, topic, key) + result = @internal.push_single(value, topic, opts[:partition_key], nil) unless result.nil? @children << result end @@ -76,7 +75,7 @@ module Hermann # librdkafka callback queue overflow tick_reactor result = create_result - @internal.push_single(value, topic, result) + @internal.push_single(value, topic, opts[:partition_key].to_s, result) end return result diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb index a16df24..241454b 100644 --- a/lib/hermann/provider/java_producer.rb +++ b/lib/hermann/provider/java_producer.rb @@ -43,7 +43,8 @@ module Hermann # @return +Concurrent::Promise+ Representa a promise to send the # data to the kafka broker. Upon execution the Promise's status # will be set - def push_single(msg, topic, key) + def push_single(msg, topic, key, _) + key = key && key.to_java Concurrent::Promise.execute { data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes) begin diff --git a/spec/hermann_lib/producer_spec.rb b/spec/hermann_lib/producer_spec.rb index 3d1344f..b777913 100644 --- a/spec/hermann_lib/producer_spec.rb +++ b/spec/hermann_lib/producer_spec.rb @@ -41,7 +41,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do let(:brokers) { 'localhost:13337' } it 'should error after attempting to connect' do |example| - producer.push_single(example.full_description, 'test-topic', nil) + producer.push_single(example.full_description, 'test-topic', '', nil) begin producer.tick(timeout) rescue StandardError => ex @@ -62,7 +62,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do describe '#push_single', :type => :integration do let(:message) { |example| example.full_description } - subject(:push) { producer.push_single(message, topic, nil) } + subject(:push) { producer.push_single(message, topic, '', nil) } it 'should return' do expect(push).not_to be_nil @@ -105,7 +105,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do context 'with a single queued request' do before :each do - producer.push_single('hello', topic, nil) + producer.push_single('hello', topic, '', nil) end it 'should return successfully' do diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index a5bf34f..9db2d1a 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -43,34 +43,34 @@ describe Hermann::Producer do context 'without topic passed' do it 'uses initialized topic' do - expect(producer.internal).to receive(:push_single).with(msg, topic, anything) + expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything) producer.push(msg) end end context 'without topic passed', :platform => :java do it 'uses initialized topic and does not have a partition key' do - expect(producer.internal).to receive(:push_single).with(msg, topic, nil) + expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything) producer.push(msg) end end context 'with topic passed' do it 'can change topic' do - expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything) + expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything) producer.push(msg, :topic => passed_topic) end context 'and an array of messags' do it 'should propagate the topic' do messages = 3.times.map { |i| msg } - expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times + expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times producer.push(messages, :topic => passed_topic) end end end - context 'with explicit partition key', :platform => :java do + context 'with explicit partition key' do it 'uses the partition key' do - expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java) + expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything) producer.push(msg, :partition_key => partition_key) end end @@ -178,7 +178,7 @@ describe Hermann::Producer do it 'should invoke #push_single for each element' do value.each do |v| - expect(producer.internal).to receive(:push_single).with(v, topic, anything) + expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything) end expect(result).to be_instance_of Array diff --git a/spec/providers/java_producer_spec.rb b/spec/providers/java_producer_spec.rb index acfdee4..650746c 100644 --- a/spec/providers/java_producer_spec.rb +++ b/spec/providers/java_producer_spec.rb @@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do let(:topic) { 'rspec' } let(:brokers) { '0:1337'} let(:opts) { {} } - let(:part_key) { "key".to_java } + let(:part_key) { "key" } let(:msg) { "bar" } describe '#push_single' do - subject(:result) { producer.push_single(msg, topic, nil) } + subject(:result) { producer.push_single(msg, topic, nil, nil) } let(:passed_topic) { 'foo' } @@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do it 'can change topic' do expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything) - producer.push_single(msg, passed_topic, nil).wait(1) + producer.push_single(msg, passed_topic, nil, nil).wait(1) end it 'can change partition key' do expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything) - producer.push_single(msg, passed_topic, part_key).wait(1) + producer.push_single(msg, passed_topic, part_key, nil).wait(1) end context 'error conditions' do shared_examples 'an error condition' do it 'should be rejected' do - promise = producer.push_single('rspec', topic, nil).wait(1) + promise = producer.push_single('rspec', topic, nil, nil).wait(1) expect(promise).to be_rejected expect { promise.value! }.to raise_error end