Support passing in a topic for every #push in MRI

This commit also fixes a memory leak with the JRuby version of the gem where we
were not properly cleaning up children objects and holding onto references of
them forever

Fixes #46
This commit is contained in:
R. Tyler Croy 2014-10-14 09:51:24 -07:00
parent 0b5a9b4de5
commit 5b6fa5075f
4 changed files with 47 additions and 32 deletions

View File

@ -534,16 +534,19 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
*
* @param self VALUE the Ruby producer instance
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
* callback
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
@ -554,10 +557,9 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
TRACER("producerConfig: %p\n", producerConfig);
if ((NULL == producerConfig->topic) ||
(0 == strlen(producerConfig->topic))) {
fprintf(stderr, "Topic is null!\n");
rb_raise(rb_eRuntimeError, "Topic cannot be empty");
if ((Qnil == topic) ||
(0 == RSTRING_LEN(topic))) {
rb_raise(rb_eArgError, "Topic cannot be empty");
return self;
}
@ -567,6 +569,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
TRACER("kafka initialized\n");
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
return self;
}
/* Only pass result through if it's non-nil */
if (Qnil != result) {
delivery_ctx->result = result;
@ -576,7 +587,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));
/* Send/Produce message. */
if (-1 == rd_kafka_produce(producerConfig->rkt,
if (-1 == rd_kafka_produce(rkt,
producerConfig->partition,
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
@ -590,6 +601,10 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
/* TODO: raise a Ruby exception here, requires a test though */
}
if (NULL != rkt) {
rd_kafka_topic_destroy(rkt);
}
TRACER("returning\n");
return self;
@ -913,11 +928,9 @@ static VALUE producer_allocate(VALUE klass) {
* Set up the configuration context for the Producer instance
*
* @param self VALUE the Producer instance
* @param topic VALUE the Ruby string naming the topic
* @param brokers VALUE a Ruby string containing host:port pairs separated by commas
*/
static VALUE producer_initialize(VALUE self,
VALUE topic,
VALUE brokers) {
HermannInstanceConfig* producerConfig;
@ -926,12 +939,9 @@ static VALUE producer_initialize(VALUE self,
TRACER("initialize Producer ruby object\n");
topicPtr = StringValuePtr(topic);
brokersPtr = StringValuePtr(brokers);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
producerConfig->topic = topicPtr;
producerConfig->brokers = brokersPtr;
/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
* partition
@ -1011,11 +1021,11 @@ void Init_hermann_lib() {
rb_define_alloc_func(c_producer, producer_allocate);
/* Initialize */
rb_define_method(c_producer, "initialize", producer_initialize, 2);
rb_define_method(c_producer, "initialize", producer_initialize, 1);
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 2);
rb_define_method(c_producer, "push_single", producer_push_single, 3);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);

View File

@ -12,13 +12,17 @@ module Hermann
class Producer
attr_reader :topic, :brokers, :internal, :children
# Initialize a producer object with a default topic and broker list
#
# @param [String] topic The default topic to use for pushing messages
# @param [Array] brokers An array of "host:port" strings for the brokers
def initialize(topic, brokers)
@topic = topic
@brokers = brokers
if RUBY_PLATFORM == "java"
@internal = Hermann::Provider::JavaProducer.new(brokers)
else
@internal = Hermann::Lib::Producer.new(topic, brokers)
@internal = Hermann::Lib::Producer.new(brokers)
end
# We're tracking children so we can make sure that at Producer exit we
# make a reasonable attempt to clean up outstanding result objects
@ -42,28 +46,29 @@ module Hermann
# Push a value onto the Kafka topic passed to this +Producer+
#
# @param [Array] value An array of values to push, will push each one
# separately
# @param [Object] value A single object to push
#
# @param [Hash] opts to pass to push method
# @params opts [String] :topic The topic to push messages to
# @option opts [String] :topic The topic to push messages to
#
# @return [Hermann::Result] A future-like object which will store the
# result from the broker
def push(value, opts={})
topic = opts[:topic] || @topic
result = create_result
result = nil
if value.kind_of? Array
return value.map { |e| self.push(e) }
end
if RUBY_PLATFORM == "java"
result = @internal.push_single(value, topic)
@children << result
# Reaping children on the push just to make sure that it does get
# called correctly and we don't leak memory
reap_children
else
if RUBY_PLATFORM == "java"
result = @internal.push_single(value, topic)
@children << result
else
@internal.push_single(value, result)
end
result = create_result
@internal.push_single(value, topic, result)
end
return result

View File

@ -7,7 +7,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
subject(:producer) { Hermann::Lib::Producer.new(topic, brokers) }
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
let(:timeout) { 3000 }
it { should respond_to :push_single }
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, nil)
producer.push_single(example.full_description, 'test-topic', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, nil) }
subject(:push) { producer.push_single(message, topic, nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', nil)
producer.push_single('hello', topic, nil)
end
it 'should return successfully' do

View File

@ -38,14 +38,14 @@ describe Hermann::Producer do
end
end
context "not java", :platform => :mri do
context "on C ruby", :platform => :mri do
describe '#push' do
subject(:result) { producer.push(value) }
context 'error conditions' do
shared_examples 'an error condition' do
it 'should raise an exception' do
expect { producer.push('rspec') }.to raise_error(RuntimeError)
expect { producer.push('rspec') }.to raise_error
end
end
@ -87,7 +87,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
end
expect(result).to be_instance_of Array