Add support for passing partition_key in MRI

This commit is contained in:
cory and jakesandlund 2015-08-19 14:47:09 +00:00 committed by jakesandlund
parent cd58cb33cd
commit e8703e1df4
6 changed files with 149 additions and 27 deletions

View File

@ -32,6 +32,7 @@
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
#include "hermann_rdkafka.h"
#include "rdcrc32.h"
#ifdef HAVE_RUBY_VERSION_H
#include <ruby/version.h>
@ -134,6 +135,16 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -154,17 +165,11 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
int32_t partition = RD_KAFKA_PARTITION_UA;
for (; retry < partition_cnt; retry++) {
partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
if (keylen) {
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
} else {
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
}
return partition;
}
/**
@ -589,10 +594,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param key VALUE the ruby String containing the key to partition by
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
@ -600,6 +606,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
@ -622,9 +629,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
@ -645,8 +658,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
@ -1240,7 +1253,7 @@ void Init_hermann_rdkafka() {
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 3);
rb_define_method(c_producer, "push_single", producer_push_single, 4);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);

103
ext/hermann/rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

View File

@ -64,7 +64,7 @@ module Hermann
if Hermann.jruby?
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
result = @internal.push_single(value, topic, key, nil)
unless result.nil?
@children << result
end
@ -76,7 +76,7 @@ module Hermann
# librdkafka callback queue overflow
tick_reactor
result = create_result
@internal.push_single(value, topic, result)
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
end
return result

View File

@ -43,7 +43,7 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, key)
def push_single(msg, topic, key, _)
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin

View File

@ -41,7 +41,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, 'test-topic', nil)
producer.push_single(example.full_description, 'test-topic', '', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, topic, nil) }
subject(:push) { producer.push_single(message, topic, '', nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', topic, nil)
producer.push_single('hello', topic, '', nil)
end
it 'should return successfully' do

View File

@ -43,26 +43,26 @@ describe Hermann::Producer do
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
producer.push(msg, :topic => passed_topic)
end
context 'and an array of messags' do
it 'should propagate the topic' do
messages = 3.times.map { |i| msg }
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
producer.push(messages, :topic => passed_topic)
end
end
@ -70,7 +70,13 @@ describe Hermann::Producer do
context 'with explicit partition key', :platform => :java do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java, anything)
producer.push(msg, :partition_key => partition_key)
end
end
context 'with explicit partition key', :platform => :mri do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
producer.push(msg, :partition_key => partition_key)
end
end
@ -178,7 +184,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
end
expect(result).to be_instance_of Array