Fix Consumer behavior to properly handle passed blocks.

This commit is contained in:
Stan Campbell 2014-09-04 15:25:02 -07:00
parent 5cf9f352e1
commit ab4f4cca5d
4 changed files with 24 additions and 7 deletions

View File

@ -252,8 +252,11 @@ static void msg_consume(rd_kafka_message_t *rkmessage,
rb_yield(value);
}
else {
if (DEBUG) {
fprintf(stderr, "No block given\n"); // todo: should this be an error?
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
// If there is a defined executable block, provide the value to it
ID sym_method_call = rb_intern("call");
if(NULL != cfg->block) {
rb_funcall(*(cfg->block), sym_method_call, 1, value);
}
}
}
@ -378,8 +381,9 @@ void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
* Begin listening on the configured topic for messages. msg_consume will be called on each message received.
*
* @param VALUE self the Ruby object for this consumer
* @param VALUE block the Ruby object (a Proc) referring to the passed block
*/
static VALUE consumer_consume(VALUE self) {
static VALUE consumer_consume(VALUE self, VALUE block) {
HermannInstanceConfig* consumerConfig;
@ -400,6 +404,9 @@ static VALUE consumer_consume(VALUE self) {
consumer_init_kafka(consumerConfig);
}
/* If present, save the executable block in the context */
consumerConfig->block = █
/* Start consuming */
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
@ -638,6 +645,7 @@ static VALUE consumer_allocate(VALUE klass) {
consumerConfig->debug = NULL;
consumerConfig->start_offset = -1;
consumerConfig->do_conf_dump = -1;
consumerConfig->block = NULL;
consumerConfig->run = 0;
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
@ -788,6 +796,7 @@ static VALUE producer_allocate(VALUE klass) {
producerConfig->debug = NULL;
producerConfig->start_offset = -1;
producerConfig->do_conf_dump = -1;
producerConfig->block = NULL;
producerConfig->run = 0;
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
@ -900,7 +909,7 @@ void Init_hermann_lib() {
rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1);
/* Consumer has method 'consume' */
rb_define_method( c_consumer, "consume", consumer_consume, 0 );
rb_define_method( c_consumer, "consume", consumer_consume, 1 );
/* ---- Define the producer class ---- */
c_producer = rb_define_class_under(lib_module, "Producer", rb_cObject);

View File

@ -75,6 +75,9 @@ typedef struct HermannInstanceConfig {
int64_t start_offset;
int do_conf_dump;
/* Optional pointer to executable block */
VALUE *block;
int run;
int exit_eof;
int quiet;

View File

@ -12,8 +12,13 @@ module Hermann
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
end
def consume(&block)
@internal.consume(&block)
def consume
begin
proc = Proc.new # obtain reference to any passed block
rescue
proc = NIL
end
@internal.consume(proc)
end
end
end

View File

@ -1,6 +1,6 @@
require 'rubygems'
require 'lib/hermann'
require 'lib/hermann/consumer'
require 'lib/hermann/producer'
p = Hermann::Producer.new("lms_messages", "localhost:9092")
arr = (0..1000000).to_a.map { |x| "message_#{x}"}