Merge pull request #114 from zendesk/yield_key_and_offset

yield key and offset into Consumer#consume block
This commit is contained in:
R. Tyler Croy 2015-07-01 04:49:18 -07:00
commit e276f60b27
2 changed files with 14 additions and 4 deletions

View File

@ -90,8 +90,8 @@ new_topic = 'other_topic'
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
puts "Recv: #{msg}"
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end
```

View File

@ -264,9 +264,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
// Yield the data to the Consumer's block
if (rb_block_given_p()) {
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
VALUE key, data, offset;
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
offset = rb_ll2inum(rkmessage->offset);
if ( rkmessage->key_len > 0 ) {
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
} else {
key = Qnil;
}
rd_kafka_message_destroy(rkmessage);
rb_yield(value);
rb_yield_values(3, data, key, offset);
}
else {
if (DEBUG) {