Go to file
R. Tyler Croy d822ae541a Merge pull request #138 from bitwiseman/patch-3
Locked rake version to ~11.3.0
2016-12-13 17:24:47 -08:00
ext/hermann Comment that rdcrc32.h and rd_kafka_msg_partitioner_consistent can be removed when librdkafka tags and Hermann updates 2015-09-16 16:04:35 +00:00
lib depend on concurrent-ruby ~> 1.0.0 2016-02-24 18:19:06 +09:00
scripts update api to have #topic(topic) as well as #topics 2015-06-11 15:29:02 -07:00
spec Add some code coverage while we're here 2016-02-03 05:32:57 -08:00
.gitignore Add some code coverage while we're here 2016-02-03 05:32:57 -08:00
.travis.yml No sense supporting ree in any capacity any more 2016-05-29 18:32:37 -07:00
Gemfile Locked rake version to ~11.3.0 2016-12-13 15:53:55 -08:00
HACKING.md Update Hacking.md with mri instructions 2015-05-19 10:33:00 +02:00
Jarfile Bring back Jarfile and remove curator from dependency 2015-03-27 17:25:31 -07:00
Jenkinsfile Updated for latest Pipeline fixes 2016-11-17 13:53:09 -08:00
LICENSE.md Moved license to LICENSE.md and reformatted markdown. Closes #1 2014-08-18 13:48:56 -07:00
README.md Update README.md 2016-06-17 13:09:26 -07:00
RELEASING.md Document the current janky release process 2015-09-30 09:45:20 -07:00
Rakefile Add support for generating JUnit-compatible output for Jenkins 2016-02-03 05:09:30 -08:00
hermann.gemspec depend on concurrent-ruby ~> 1.0.0 2016-02-24 18:19:06 +09:00

README.md

Hermann

Gitter chat Build Status

A Ruby gem implementing a Kafka Publisher and Consumer

On MRI (C-based Ruby), this library wraps the librdkafka library which is implemented in C.

On JRuby this library declares jar dependencies inside the .gemspec to express dependencies on the Java-based Kafka library provided by the Kafka project. Tools like jbundler will handle these declarations correctly.

Usage

Usage is modelled on the kafka-rb gem and is fairly straightforward.

  • Kafka 0.8 is supported.
  • Ruby 1.9.3, 2.1.1 and JRuby are tested against

Producer

Zookeeper discovery

Discover Kafka brokers through zookeeper. Looks at /brokers/ids in Zookeeper to find the list of brokers.

require 'hermann/producer'
require 'hermann/discovery/zookeeper'

broker_ids_array = Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
producer = Hermann::Producer.new('topic', broker_ids_array)

promise = producer.push('hello world') # send message to kafka
promise.value                          # forces the Concurrent::Promise to finish excuting (#value!)
promise.state                          # the state of the promise

MRI only

require 'hermann/producer'

broker_ids_array = Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
p = Hermann::Producer.new('topic', broker_ids_array)  # arguments topic, list of brokers
f = p.push('hello world from mri')
f.state
p.tick_reactor
f.state

Consumer

Messages can be consumed by calling the consume method and passing a block to handle the yielded messages. The consume method blocks, so take care to handle that functionality appropriately (i.e. use Concurrent::Promise, Thread, etc).

(JRuby)

require 'hermann'
require 'hermann/consumer'
require 'hermann_jars'

topic     = 'topic'
new_topic = 'other_topic'

the_consumer = Hermann::Consumer.new(topic, zookeepers: "localhost:2181", group_id: "group1")

the_consumer.consume(new_topic) do |msg|   # can change topic with optional argument to .consume
  puts "Recv: #{msg}"
end

(MRI)

MRI currently has no zookeeper / client group support.

require 'hermann'
require 'hermann/consumer'

topic     = 'topic'
new_topic = 'other_topic'

the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)

the_consumer.consume(new_topic) do |msg, key, offset|   # can change topic with optional argument to .consume
  puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end

Metadata request (MRI-only)

Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.

require 'hermann'
require 'hermann/discovery/metadata'

c = Hermann::Discovery::Metadata.new( "localhost:9092" )
topic = c.topic("topic")

puts topic.partitions.first

consumers = topic.partitions.map do |partition|
  partition.consumer
end

Build & Unit Test

First time (from a clean repository): bundle install && bundle exec rake

Thereafter: bundle exec rake spec

Testing

To run the integration tests:

  • startup your own instance of zookeeper/kafka
  • rspec spec/integration/producer_spec.rb

How to convert from using jruby-kafka

  • Gemfile
    • remove jruby-kafka
    • add gem "hermann"
    • bundle install
  • Jarfile
    • removed unecessary jars from your Jarfile (i.e. kafka, log4j)
    • jar dependencies are automatically included with Hermann
    • jbundle install
  • Test out one of the Producer/Consumer examples above