mirror of https://github.com/reiseburo/hermann
yield key and offset into Consumer#consume block
This commit is contained in:
parent
60bc473fdd
commit
5b8dd6feef
|
@ -90,8 +90,8 @@ new_topic = 'other_topic'
|
|||
|
||||
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
|
||||
|
||||
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
|
||||
puts "Recv: #{msg}"
|
||||
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
|
||||
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
|
||||
end
|
||||
```
|
||||
|
||||
|
|
|
@ -264,9 +264,18 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
|
|||
|
||||
// Yield the data to the Consumer's block
|
||||
if (rb_block_given_p()) {
|
||||
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
|
||||
VALUE key, data, offset;
|
||||
|
||||
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
|
||||
offset = rb_ll2inum(rkmessage->offset);
|
||||
|
||||
if ( rkmessage->key_len > 0 )
|
||||
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
|
||||
else
|
||||
key = Qnil;
|
||||
|
||||
rd_kafka_message_destroy(rkmessage);
|
||||
rb_yield(value);
|
||||
rb_yield_values(3, data, key, offset);
|
||||
}
|
||||
else {
|
||||
if (DEBUG) {
|
||||
|
|
Loading…
Reference in New Issue