mirror of https://github.com/reiseburo/hermann
Properly marshal bytes in and out of the Kafka client library
The primary issue with marshalling bytes back and forth previously was the use of the StringEncoder class. By default the Kafka client library uses the DefaultEncoder which is a no-op and simply allwos byte arrays through
This commit is contained in:
parent
61c0426cbe
commit
f2ceab296c
|
@ -13,7 +13,6 @@ module Hermann
|
|||
|
||||
#default kafka Producer options
|
||||
DEFAULTS = {
|
||||
'serializer.class' => 'kafka.serializer.StringEncoder',
|
||||
'partitioner.class' => 'kafka.producer.DefaultPartitioner',
|
||||
'request.required.acks' => '1',
|
||||
'message.send.max.retries' => '0'
|
||||
|
@ -46,7 +45,7 @@ module Hermann
|
|||
# will be set
|
||||
def push_single(msg, topic, unused)
|
||||
Concurrent::Promise.execute {
|
||||
data = ProducerUtil::KeyedMessage.new(topic, msg)
|
||||
data = ProducerUtil::KeyedMessage.new(topic, msg.to_java_bytes)
|
||||
begin
|
||||
@producer.send(data)
|
||||
rescue Java::KafkaCommon::FailedToSendMessageException => jexc
|
||||
|
|
|
@ -60,7 +60,8 @@ module Hermann
|
|||
stream = get_stream(topic)
|
||||
it = stream.iterator
|
||||
while it.hasNext do
|
||||
yield it.next.message.to_s
|
||||
message = it.next.message
|
||||
yield String.from_java_bytes(message)
|
||||
end
|
||||
rescue Exception => e
|
||||
puts "#{self.class.name}#consume exception: #{e.class.name}"
|
||||
|
|
Loading…
Reference in New Issue