diff --git a/README.md b/README.md index c233198..475f8fd 100644 --- a/README.md +++ b/README.md @@ -90,8 +90,8 @@ new_topic = 'other_topic' the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1) -the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume - puts "Recv: #{msg}" +the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume + puts "Recv: #{msg}, key: #{key}, offset: #{offset}" end ``` diff --git a/ext/hermann/hermann_rdkafka.c b/ext/hermann/hermann_rdkafka.c index fa747c4..1d84a1e 100644 --- a/ext/hermann/hermann_rdkafka.c +++ b/ext/hermann/hermann_rdkafka.c @@ -264,9 +264,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf // Yield the data to the Consumer's block if (rb_block_given_p()) { - VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len); + VALUE key, data, offset; + + data = rb_str_new((char *)rkmessage->payload, rkmessage->len); + offset = rb_ll2inum(rkmessage->offset); + + if ( rkmessage->key_len > 0 ) { + key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len); + } else { + key = Qnil; + } + rd_kafka_message_destroy(rkmessage); - rb_yield(value); + rb_yield_values(3, data, key, offset); } else { if (DEBUG) {