Merge pull request #121 from braintree/stable-partition

Add support for passing partition_key in MRI
This commit is contained in:
R. Tyler Croy 2015-09-22 11:59:32 -07:00
commit 74cb8656e1
7 changed files with 156 additions and 34 deletions

View File

@ -33,6 +33,11 @@
#include "hermann_rdkafka.h"
/* This header file exposes the functions in librdkafka.a that are needed for
* consistent partitioning. After librdkafka releases a new tag and Hermann
* points to it, this can be removed. */
#include "rdcrc32.h"
#ifdef HAVE_RUBY_VERSION_H
#include <ruby/version.h>
#endif
@ -134,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
/* This function is in rdkafka.h on librdkafka master. As soon as a new
* version is released and Hermann points to it, this can be removed. */
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -154,17 +171,11 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
int32_t partition = RD_KAFKA_PARTITION_UA;
for (; retry < partition_cnt; retry++) {
partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
if (keylen) {
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
} else {
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
}
return partition;
}
/**
@ -589,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param key VALUE the ruby String containing the key to partition by
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
@ -600,6 +612,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
@ -622,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
@ -645,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
@ -1240,7 +1259,7 @@ void Init_hermann_rdkafka() {
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 3);
rb_define_method(c_producer, "push_single", producer_push_single, 4);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);

103
ext/hermann/rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

View File

@ -63,8 +63,7 @@ module Hermann
end
if Hermann.jruby?
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
result = @internal.push_single(value, topic, opts[:partition_key], nil)
unless result.nil?
@children << result
end
@ -76,7 +75,7 @@ module Hermann
# librdkafka callback queue overflow
tick_reactor
result = create_result
@internal.push_single(value, topic, result)
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
end
return result

View File

@ -43,7 +43,8 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, key)
def push_single(msg, topic, key, _)
key = key && key.to_java
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin

View File

@ -41,7 +41,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, 'test-topic', nil)
producer.push_single(example.full_description, 'test-topic', '', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, topic, nil) }
subject(:push) { producer.push_single(message, topic, '', nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', topic, nil)
producer.push_single('hello', topic, '', nil)
end
it 'should return successfully' do

View File

@ -43,34 +43,34 @@ describe Hermann::Producer do
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
producer.push(msg, :topic => passed_topic)
end
context 'and an array of messags' do
it 'should propagate the topic' do
messages = 3.times.map { |i| msg }
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
producer.push(messages, :topic => passed_topic)
end
end
end
context 'with explicit partition key', :platform => :java do
context 'with explicit partition key' do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
producer.push(msg, :partition_key => partition_key)
end
end
@ -178,7 +178,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
end
expect(result).to be_instance_of Array

View File

@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
let(:part_key) { "key".to_java }
let(:part_key) { "key" }
let(:msg) { "bar" }
describe '#push_single' do
subject(:result) { producer.push_single(msg, topic, nil) }
subject(:result) { producer.push_single(msg, topic, nil, nil) }
let(:passed_topic) { 'foo' }
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
producer.push_single(msg, passed_topic, nil).wait(1)
producer.push_single(msg, passed_topic, nil, nil).wait(1)
end
it 'can change partition key' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
producer.push_single(msg, passed_topic, part_key).wait(1)
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
end
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec', topic, nil).wait(1)
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
expect(promise).to be_rejected
expect { promise.value! }.to raise_error
end