diff --git a/ext/hermann/hermann_lib.c b/ext/hermann/hermann_lib.c index 4aafa9d..69a40c8 100644 --- a/ext/hermann/hermann_lib.c +++ b/ext/hermann/hermann_lib.c @@ -534,16 +534,19 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) { * * @param self VALUE the Ruby producer instance * @param message VALUE the ruby String containing the outgoing message. + * @param topic VALUE the ruby String containing the topic to use for the + * outgoing message. * @param result VALUE the Hermann::Result object to be fulfilled when the * push completes */ -static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) { +static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) { HermannInstanceConfig* producerConfig; /* Context pointer, pointing to `result`, for the librdkafka delivery * callback */ hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t)); + rd_kafka_topic_t *rkt = NULL; TRACER("self: %p, message: %p, result: %p)\n", self, message, result); @@ -554,10 +557,9 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) { TRACER("producerConfig: %p\n", producerConfig); - if ((NULL == producerConfig->topic) || - (0 == strlen(producerConfig->topic))) { - fprintf(stderr, "Topic is null!\n"); - rb_raise(rb_eRuntimeError, "Topic cannot be empty"); + if ((Qnil == topic) || + (0 == RSTRING_LEN(topic))) { + rb_raise(rb_eArgError, "Topic cannot be empty"); return self; } @@ -567,6 +569,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) { TRACER("kafka initialized\n"); + rkt = rd_kafka_topic_new(producerConfig->rk, + RSTRING_PTR(topic), + NULL); + + if (NULL == rkt) { + rb_raise(rb_eRuntimeError, "Could not construct a topic structure"); + return self; + } + /* Only pass result through if it's non-nil */ if (Qnil != result) { delivery_ctx->result = result; @@ -576,7 +587,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) { TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message)); /* Send/Produce message. */ - if (-1 == rd_kafka_produce(producerConfig->rkt, + if (-1 == rd_kafka_produce(rkt, producerConfig->partition, RD_KAFKA_MSG_F_COPY, RSTRING_PTR(message), @@ -590,6 +601,10 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) { /* TODO: raise a Ruby exception here, requires a test though */ } + if (NULL != rkt) { + rd_kafka_topic_destroy(rkt); + } + TRACER("returning\n"); return self; @@ -913,11 +928,9 @@ static VALUE producer_allocate(VALUE klass) { * Set up the configuration context for the Producer instance * * @param self VALUE the Producer instance - * @param topic VALUE the Ruby string naming the topic * @param brokers VALUE a Ruby string containing host:port pairs separated by commas */ static VALUE producer_initialize(VALUE self, - VALUE topic, VALUE brokers) { HermannInstanceConfig* producerConfig; @@ -926,12 +939,9 @@ static VALUE producer_initialize(VALUE self, TRACER("initialize Producer ruby object\n"); - - topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); Data_Get_Struct(self, HermannInstanceConfig, producerConfig); - producerConfig->topic = topicPtr; producerConfig->brokers = brokersPtr; /** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target * partition @@ -1011,11 +1021,11 @@ void Init_hermann_lib() { rb_define_alloc_func(c_producer, producer_allocate); /* Initialize */ - rb_define_method(c_producer, "initialize", producer_initialize, 2); + rb_define_method(c_producer, "initialize", producer_initialize, 1); rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1); /* Producer.push_single(msg) */ - rb_define_method(c_producer, "push_single", producer_push_single, 2); + rb_define_method(c_producer, "push_single", producer_push_single, 3); /* Producer.tick */ rb_define_method(c_producer, "tick", producer_tick, 1); diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index ab270aa..9dccd46 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -12,13 +12,17 @@ module Hermann class Producer attr_reader :topic, :brokers, :internal, :children + # Initialize a producer object with a default topic and broker list + # + # @param [String] topic The default topic to use for pushing messages + # @param [Array] brokers An array of "host:port" strings for the brokers def initialize(topic, brokers) @topic = topic @brokers = brokers if RUBY_PLATFORM == "java" @internal = Hermann::Provider::JavaProducer.new(brokers) else - @internal = Hermann::Lib::Producer.new(topic, brokers) + @internal = Hermann::Lib::Producer.new(brokers) end # We're tracking children so we can make sure that at Producer exit we # make a reasonable attempt to clean up outstanding result objects @@ -42,28 +46,29 @@ module Hermann # Push a value onto the Kafka topic passed to this +Producer+ # - # @param [Array] value An array of values to push, will push each one - # separately # @param [Object] value A single object to push - # # @param [Hash] opts to pass to push method - # @params opts [String] :topic The topic to push messages to + # @option opts [String] :topic The topic to push messages to # # @return [Hermann::Result] A future-like object which will store the # result from the broker def push(value, opts={}) topic = opts[:topic] || @topic - result = create_result + result = nil if value.kind_of? Array return value.map { |e| self.push(e) } + end + + if RUBY_PLATFORM == "java" + result = @internal.push_single(value, topic) + @children << result + # Reaping children on the push just to make sure that it does get + # called correctly and we don't leak memory + reap_children else - if RUBY_PLATFORM == "java" - result = @internal.push_single(value, topic) - @children << result - else - @internal.push_single(value, result) - end + result = create_result + @internal.push_single(value, topic, result) end return result diff --git a/spec/hermann_lib/producer_spec.rb b/spec/hermann_lib/producer_spec.rb index 184ef51..01ab1e1 100644 --- a/spec/hermann_lib/producer_spec.rb +++ b/spec/hermann_lib/producer_spec.rb @@ -7,7 +7,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do let(:topic) { 'rspec' } let(:brokers) { 'localhost:1337' } - subject(:producer) { Hermann::Lib::Producer.new(topic, brokers) } + subject(:producer) { Hermann::Lib::Producer.new(brokers) } let(:timeout) { 3000 } it { should respond_to :push_single } @@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do let(:brokers) { 'localhost:13337' } it 'should error after attempting to connect' do |example| - producer.push_single(example.full_description, nil) + producer.push_single(example.full_description, 'test-topic', nil) begin producer.tick(timeout) rescue StandardError => ex @@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do describe '#push_single', :type => :integration do let(:message) { |example| example.full_description } - subject(:push) { producer.push_single(message, nil) } + subject(:push) { producer.push_single(message, topic, nil) } it 'should return' do expect(push).not_to be_nil @@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do context 'with a single queued request' do before :each do - producer.push_single('hello', nil) + producer.push_single('hello', topic, nil) end it 'should return successfully' do diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index d953ea1..777885e 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -38,14 +38,14 @@ describe Hermann::Producer do end end - context "not java", :platform => :mri do + context "on C ruby", :platform => :mri do describe '#push' do subject(:result) { producer.push(value) } context 'error conditions' do shared_examples 'an error condition' do it 'should raise an exception' do - expect { producer.push('rspec') }.to raise_error(RuntimeError) + expect { producer.push('rspec') }.to raise_error end end @@ -87,7 +87,7 @@ describe Hermann::Producer do it 'should invoke #push_single for each element' do value.each do |v| - expect(producer.internal).to receive(:push_single).with(v, anything) + expect(producer.internal).to receive(:push_single).with(v, topic, anything) end expect(result).to be_instance_of Array