Compare commits

...

107 Commits

Author SHA1 Message Date
R. Tyler Croy d822ae541a Merge pull request #138 from bitwiseman/patch-3
Locked rake version to ~11.3.0
2016-12-13 17:24:47 -08:00
Liam Newman f6f71c98e4 Locked rake version to ~11.3.0
Rake 12.0 replace "last_comment" with "last_description".  

This describes the issue and possible fix:
http://stackoverflow.com/questions/35893584/nomethoderror-undefined-method-last-comment-after-upgrading-to-rake-11

Rather than upgrade rspec to 3.5.x and risk changes to test behavior, I've locked rake to the last version before 12.0.
2016-12-13 15:53:55 -08:00
R. Tyler Croy f9957b5629 Merge pull request #137 from bitwiseman/patch-2
Updated for latest Pipeline fixes
2016-11-17 13:56:48 -08:00
Liam Newman 35eba84dc6 Updated for latest Pipeline fixes
JENKINS-29711 was fixed and stage step now uses curly braces
2016-11-17 13:53:09 -08:00
R. Tyler Croy 68515effc7 Merge pull request #136 from bitwiseman/feature/simplepipe
Simplify Pipeline
2016-07-06 17:52:11 -07:00
Liam Newman 69af2e310f Simplify Pipeline 2016-07-06 17:01:35 -07:00
R. Tyler Croy 9473c8b24b Merge pull request #135 from bitwiseman/feature/jenkinsfile
Add Jenkins pipeline
2016-06-30 10:15:37 -07:00
Liam Newman e0225b6761 Add Jenkins pipeline 2016-06-29 17:36:30 -07:00
R. Tyler Croy a4d678deaf Merge pull request #134 from bitwiseman/patch-1
Update README.md
2016-06-17 13:12:51 -07:00
Liam Newman 5dd952145b Update README.md
Add simple build and unit test instructions.   Similar to what is in releasing document.
2016-06-17 13:09:26 -07:00
R. Tyler Croy 9f5cf12151
No sense supporting ree in any capacity any more 2016-05-29 18:32:37 -07:00
R. Tyler Croy f71ab11ea2 Merge pull request #132 from thedrow/patch-1
Added Ruby 2.2 and 2.3 to the matrix
2016-05-29 18:29:58 -07:00
R. Tyler Croy 436ec9be82 Merge pull request #133 from jfeltesse-mdsol/concurrent_ruby
depend on concurrent-ruby ~> 1.0.0
2016-02-24 08:29:11 -08:00
Julien Feltesse 587f3be16e depend on concurrent-ruby ~> 1.0.0 2016-02-24 18:19:06 +09:00
Omer Katz c2b2274af3 Specify 2.3.0 instead of 2.3 for travis. 2016-02-04 17:06:39 +02:00
Omer Katz c8fe7e06b5 Added Ruby 2.2 and 2.3 to the matrix. 2016-02-04 14:17:55 +02:00
R. Tyler Croy 246f27e76d
Add some code coverage while we're here 2016-02-03 05:32:57 -08:00
R. Tyler Croy bc560f21ea
Add support for generating JUnit-compatible output for Jenkins 2016-02-03 05:09:30 -08:00
R. Tyler Croy 9f8fbe381c Merge pull request #128 from rtyler/minor-doc-fixes
Minor doc fixes
2015-09-30 09:51:42 -07:00
R. Tyler Croy 5b669504af
Document the current janky release process
At least this way I won't forget each time

Fixes #126
2015-09-30 09:45:20 -07:00
R. Tyler Croy d4a350c65c
Update the gitter and travis badges after repo move 2015-09-30 09:39:10 -07:00
R. Tyler Croy 17b51359a9 Merge pull request #127 from rtyler/upgrade-scala-dependency
Upgrade scala dependency
2015-09-28 11:18:33 -07:00
R. Tyler Croy 0ad9be88f0
Upgrade bundler for our Travis build 2015-09-28 11:12:33 -07:00
R. Tyler Croy 8422e82125
Move hermann to the newer faster travis infra 2015-09-28 10:25:43 -07:00
R. Tyler Croy 1c7543f730
Upgrade to a Kafka (JVM) version which uses Scala 2.11
Scala 2.10 is three years old and it appears to me tha 2.10 is moderately
broken and old, at that 2.11 is the bare minimum folks should be using these
days (see also: https://github.com/twitter/scrooge/pull/198)
2015-09-28 08:43:50 -07:00
R. Tyler Croy 9d5b773542
Properly include hermann_jars.rb in the built gem but exclude from the tree 2015-09-23 13:19:40 -07:00
R. Tyler Croy ecae27dd65 Merge pull request #125 from rtyler/no-fixed-jars
Remove hard-coded hermann_jars.rb
2015-09-23 08:08:24 -07:00
R. Tyler Croy e9d301be3d
Remove hermann_jars.rb to gem installation to generate it properly
Fixes #124
2015-09-23 07:52:21 -07:00
R. Tyler Croy 4dd732dc63
Bump the version for a minor release 2015-09-22 12:08:28 -07:00
R. Tyler Croy 5c20c6c5ba
Another minor version bump to clean our dependency range up 2015-09-22 12:00:04 -07:00
R. Tyler Croy 9f1e16071f
Bump the minor for some new functionality, at least on MRI 2015-09-22 12:00:04 -07:00
R. Tyler Croy 74cb8656e1 Merge pull request #121 from braintree/stable-partition
Add support for passing partition_key in MRI
2015-09-22 11:59:32 -07:00
jakesandlund 6946d4d82c Fix java_producer_spec for modified push_single signature 2015-09-22 18:36:10 +00:00
jakesandlund c64d38cff3 Comment that rdcrc32.h and rd_kafka_msg_partitioner_consistent can be removed when librdkafka tags and Hermann updates 2015-09-16 16:04:35 +00:00
jakesandlund c29bb5e4d0 Move to_java into java_producer to make push_single signature match MRI 2015-09-16 15:54:42 +00:00
cory and jakesandlund e8703e1df4 Add support for passing partition_key in MRI 2015-09-14 16:11:43 +00:00
R. Tyler Croy cd58cb33cd Merge pull request #120 from mkristian/patch-1
use semantic versioning for jar-dependency runtime dependency
2015-09-08 07:31:58 -07:00
Christian Meier 763e2cce97 use semantic versioning for jar-dependency runtime dependency 2015-09-08 09:26:09 +02:00
R. Tyler Croy e276f60b27 Merge pull request #114 from zendesk/yield_key_and_offset
yield key and offset into Consumer#consume block
2015-07-01 04:49:18 -07:00
Ben Osheroff 2c99af440e surround if with braces 2015-06-30 10:51:19 -07:00
Ben Osheroff 5b8dd6feef yield key and offset into Consumer#consume block 2015-06-30 10:49:37 -07:00
R. Tyler Croy 60bc473fdd Merge pull request #118 from zendesk/namespace
REFACTOR ONLY: Namespace cleanup
2015-06-27 11:04:23 -07:00
Ben Osheroff 9edc4b9301 move hermann_lib -> hermann_rdkafka / Hermann::Provider::RDKafka 2015-06-23 09:01:31 -07:00
Ben Osheroff a9d80242dd rename hermann_lib -> hermann_rdkafka 2015-06-22 19:39:56 -07:00
R. Tyler Croy c6fe9838d7 Merge pull request #116 from rtyler/jar-dep-change
Change jar-dependencies to work with current jbundler installs
2015-06-18 18:15:51 -07:00
R. Tyler Croy 45fe45cb96
Remove the exclusion syntax from the kafka jar-dependency which breaks in the latest jbundler
The version of jar-dependencies we were testing against was yanked and with
jbundler and jar-dependencies (0.7.[3-4] and 0.1.10 respectively) I cannot get
Hermann to successfully install with jbundler
2015-06-18 17:11:36 -07:00
R. Tyler Croy c5707f5515 Merge pull request #110 from zendesk/cleanup_warnings
cleanup warnings
2015-06-17 08:05:47 -07:00
Ben Osheroff 783d7dac0d cleanup a few merge-induced warnings 2015-06-16 23:45:25 -07:00
Ben Osheroff 09df7ec1b9 Merge remote-tracking branch 'upstream/master' into cleanup_warnings 2015-06-16 23:44:13 -07:00
Ben Osheroff e7fce39f83 Merge remote-tracking branch 'upstream/master' into cleanup_warnings 2015-06-16 23:42:09 -07:00
Ben Osheroff d8b8f83690 1.8.7 has no version.h 2015-06-16 20:55:55 -07:00
R. Tyler Croy 7f63e3c0d3 Merge pull request #109 from zendesk/better_postloop_cleanup
Better postloop cleanup
2015-06-15 06:55:35 -07:00
R. Tyler Croy c272bff063 Merge pull request #108 from zendesk/metadata
Add the ability to request cluster / topic metadata from the brokers
2015-06-15 06:54:31 -07:00
R. Tyler Croy 6fb9e064b4
Up the minor version for a manual deployment 2015-06-15 06:19:30 -07:00
R. Tyler Croy 4e1d359107
Remove auto-deploy until Java- specific versions can be published too 2015-06-15 06:18:02 -07:00
R. Tyler Croy 755d578a75 Merge pull request #111 from zendesk/rdkafka_0_8_6
upgrade to librdkafka 0.8.6
2015-06-15 06:00:49 -07:00
Ben Osheroff 74cba3c513 upgrade to librdkafka 0.8.6
R. Tyler's patches are included in librdkafka 0.8.6
2015-06-14 19:33:41 -07:00
Ben Osheroff 5c898144f2 cleanup warnings
- fix partition selection function
- get ruby >= 2 calling the correct no-gvl function
2015-06-13 19:47:07 -07:00
Ben Osheroff 17e5c5b31d ensure we call consumer_consume_loop_stop at the end of the loop
we need rb_ensure so that if the loop terminates in a "break" statement
we'll still call rd_kafka_consume_stop
2015-06-13 19:06:45 -07:00
Ben Osheroff 942fd87728 call rd_kafka_message_destroy right before rb_yield()
if rb_yield() ends in a "break" statement, it never returns control back
to the caller; thus we leak the message.
2015-06-13 19:05:25 -07:00
Ben Osheroff 3f0da9e0cd stop using named params in Partition#consumer 2015-06-13 14:12:57 -07:00
Ben Osheroff 0bc9e9d9ee don't destory metadata unless it's allocated 2015-06-12 18:11:34 -07:00
Ben Osheroff 83ea24a20e Merge remote-tracking branch 'upstream/master' into metadata 2015-06-11 16:22:32 -07:00
Ben Osheroff f3b6df06d3 update README.md with metadata request 2015-06-11 16:21:08 -07:00
Ben Osheroff c2540a8410 update api to have #topic(topic) as well as #topics
also add Partition#consumer to instantiate a coonsumer object on the
particular partition
2015-06-11 15:29:02 -07:00
Ben Osheroff 7edd297071 add Broker#to_s 2015-06-11 15:25:01 -07:00
R. Tyler Croy 2187463a5d Merge pull request #107 from lookout/fix-integ
fix integration tests
2015-06-01 11:50:41 -07:00
jway 2aecf369e6 fix integration tests 2015-06-01 09:42:37 -07:00
Ben Osheroff e7e1a2a7ac raise default timeout and make it configurable 2015-05-19 13:51:59 -07:00
Ben Osheroff 314ea3d8f8 fetching an empty topic doesn't work 2015-05-19 13:44:57 -07:00
Ben Osheroff 326b428b34 slap a friendlier face on the returned metadata interface 2015-05-19 13:34:39 -07:00
Ben Osheroff 7779654f36 Merge remote-tracking branch 'upstream/master' into metadata 2015-05-19 11:38:08 -07:00
Ben Osheroff 5942e1810b fix bug, constantize TIMEOUT_MS, add README.md info 2015-05-19 11:33:13 -07:00
Ben Osheroff e38b626b71 Add metadata API to Hermann::Discovery module 2015-05-19 11:13:27 -07:00
R. Tyler Croy 8c614437bf Merge pull request #106 from pocman/patch-2
Update Hacking.md with mri instructions
2015-05-19 07:54:07 -07:00
R. Tyler Croy a833a7e366 Merge pull request #105 from pocman/patch-4
Fix TypeError conversion of Array into String
2015-05-19 07:52:00 -07:00
Thomas 7100c84059 Fix TypeError conversion of Array into String
When running rake spec:integration with /fixtures/integration.yml

Hermann::Lib::Producer #connect should connect
     Failure/Error: subject(:producer) { Hermann::Lib::Producer.new(brokers) }
     TypeError: no implicit conversion of Array into String
     # ./spec/hermann_lib/producer_spec.rb:10:in `initialize'
2015-05-19 11:00:25 +02:00
Thomas 05814e4f57 Update Hacking.md with mri instructions 2015-05-19 10:33:00 +02:00
R. Tyler Croy 5142115847 Merge pull request #104 from pocman/patch-1
Add Zk discovery availablility with MRI to readme
2015-05-18 08:15:26 -07:00
Thomas 6838ec2cdf Add Zk discovery availablility with MRI to readme 2015-05-18 12:58:22 +02:00
R. Tyler Croy 6275021f11 Merge pull request #103 from lookout/retro-ruby
I've had requests to run CI against retro versions of Ruby
2015-05-07 16:39:25 -07:00
R. Tyler Croy 7b4ea2f2df Properly handle pulling in fixtures code across Ruby versions 2015-05-07 13:56:36 -07:00
R. Tyler Croy 3fda5ec3f3 Rework the dependencies and some of the code to support retro rubies better 2015-05-07 13:18:41 -07:00
R. Tyler Croy 069150cd1f I've had requests to run CI against retro versions of Ruby 2015-05-07 11:46:16 -07:00
R. Tyler Croy a362ce153e Merge pull request #98 from zendesk/fix_1.9.3
1.9.3 fixes
2015-04-30 08:11:38 -07:00
Ben Osheroff efd1a80a37 1.9.3 fixes
1.9.3 seems like it should actually still be using the
rb_thread_blocking_region() call, as ...call_without_gvl() isn't defined
in any header (although it is available).

Consuming that implicit declaration was causing crashes.
2015-04-30 07:52:01 -07:00
R. Tyler Croy 4c451b4344 Merge pull request #97 from zendesk/fix_use_after_free
Fix use-after-free bug
2015-04-29 15:54:38 -07:00
Ben Osheroff 30669be4d5 Fix use-after-free bug
it's not safe to simply pluck strings off the heap unless we were also
going to maintain a reference to them.  sidestep this problem by
strdup'ing the topic and brokers strings
2015-04-29 14:33:58 -07:00
R. Tyler Croy a3d8998a6f Image and link, DUH 2015-04-28 15:53:14 -07:00
R. Tyler Croy 76e8841998 Modify the build badge to point to the right travis project 2015-04-28 15:52:12 -07:00
R. Tyler Croy 4aef448dbc Merge pull request #95 from zendesk/mri_offsets
mri, initial offsets
2015-04-28 15:41:33 -07:00
Ben Osheroff 3fbe6969fb update tests with InvalidOffsetError 2015-04-28 15:14:14 -07:00
Ben Osheroff 8832ad28fd update rdoc section with :offset documentation 2015-04-28 11:52:16 -07:00
Ben Osheroff 8e89b75801 add a type to the invalid offset error 2015-04-28 11:50:28 -07:00
R. Tyler Croy e872906465 Merge pull request #92 from zendesk/fix_gvl
use rb_thread_call_without_gvl
2015-04-28 11:39:31 -07:00
Ben Osheroff 4f4e3f9b55 add some documentation to the new MRI consumer functions 2015-04-28 11:07:33 -07:00
Ben Osheroff 8857531cb7 add a constant for the time we ignore the gvl's interrupt requests 2015-04-28 11:06:34 -07:00
Ben Osheroff 752123b83e allow choosing of an inital offset in the MRI consumer 2015-04-28 10:17:09 -07:00
Ben Osheroff 21190d7acd randomize integration test topic 2015-04-28 10:09:30 -07:00
Ben Osheroff 8b7a31a232 add mri script 2015-04-28 10:09:02 -07:00
Ben Osheroff 784a7f3afe 1.9.3 compatibility 2015-04-27 12:32:58 -07:00
R. Tyler Croy dd25bc9586 Merge pull request #94 from zendesk/unify_initialize
Unify Consumer#initialize
2015-04-27 11:16:26 -07:00
Ben Osheroff 71c21d327e delete options in Consumer#initialize before passing along 2015-04-27 10:12:31 -07:00
Ben Osheroff 6db14ceeb7 version 0.23.0 2015-04-27 09:54:02 -07:00
Ben Osheroff 18c73f3f82 update README.md with new consumer info 2015-04-27 09:37:53 -07:00
Ben Osheroff 3792ee206b unify jruby/mri Consumer#initialize
While we do require different options between jruby & mri, there is a
common "topic" option.  The rest we pass in the options hash.
2015-04-27 09:32:16 -07:00
Ben Osheroff d579f2f8ec use rb_thread_call_without_gvl
this allows threaded code on modern (>=2.0) rubies working.  Also,
switch to the single-message form of the rd_kafka call -- the callback
form isn't safe to run without the GVL held, as it enters the ruby
interpreter in rb_yield().
2015-04-21 14:02:48 -07:00
31 changed files with 831 additions and 246 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ tmp/
Gemfile.lock
Jarfile.lock
.jbundler/
lib/hermann_jars.rb
coverage/
spec/reports

View File

@ -1,11 +1,19 @@
language: ruby
sudo: false
rvm:
- ruby-head
- 2.3.0
- 2.2
- 2.1
- 1.9.3
- jruby
deploy:
provider: rubygems
gemspec: hermann.gemspec
api_key:
secure: NQDoSKjV0bs2MSZHHwP6gsG3a8JXCCT5nCHiggTvVV2vvFS9WCyBtMDY3WxQzAU/Zbt+FcPobOvbd53HW5hQYkDOpc84j/utVyBBZCtew0wjEY+Z18ygr+oUQtoALoaRh+cr3MUEFA1Q68fsLlzpRH4M6ZQxbUNOQtwNHgLaZco=
# upgrading bundler since our gemspec relying on lib/hermann_jars.rb being
# present causes troubles with bundler .17.x
before_install:
- gem install bundler -N --version ">= 1.8.0"
#deploy:
# provider: rubygems
# gemspec: hermann.gemspec
# api_key:
# secure: NQDoSKjV0bs2MSZHHwP6gsG3a8JXCCT5nCHiggTvVV2vvFS9WCyBtMDY3WxQzAU/Zbt+FcPobOvbd53HW5hQYkDOpc84j/utVyBBZCtew0wjEY+Z18ygr+oUQtoALoaRh+cr3MUEFA1Q68fsLlzpRH4M6ZQxbUNOQtwNHgLaZco=

View File

@ -4,7 +4,9 @@ gemspec
group :development do
gem 'jbundler', :platform => :jruby
gem 'rake'
gem 'rake', '~> 11.3.0'
gem 'i18n', '~> 0.6.11', :platform => :mri_18
gem 'activesupport', '~> 3.x', :platform => :mri_18
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
gem 'jar-dependencies', :platform => :jruby
gem 'rake-compiler'
@ -21,4 +23,8 @@ group :test do
# Used for testing encoding protobufs in an out of Hermann in integration
# tests
gem 'protobuffy'
gem 'ci_reporter_rspec'
gem 'simplecov'
gem 'simplecov-rcov'
end

View File

@ -8,5 +8,23 @@
* set port 2181
* Start Kafka
* Set properties file ```zookeeper.connect=localhost:2181```
You can also use a docker instance like this one : https://github.com/spotify/docker-kafka
On mac :
* ```boot2docker start ```
* ```$(boot2docker shellinit)```
* ```docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip` --env ADVERTISED_PORT=9092 spotify/kafka```
* ```export ZOOKEEPER=`boot2docker ip`:2181```
* ```export KAFKA=`boot2docker ip`:9092 ```
* modify ./fixtures/integration.yml with values in $KAKFA and $ZOOKEEPER
#### With JRuby
* ```bundle exec jruby -S rspec spec/integration```
#### With MRI
* ```bundle install```
* ```rake default``` or ```rake compile``` and then ```rake spec```
* ```rake spec:intregration```

33
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,33 @@
#!groovy
/* Only keep the 10 most recent builds. */
properties([[$class: 'BuildDiscarderProperty',
strategy: [$class: 'LogRotator', numToKeepStr: '10']]])
stage ('Build') {
node {
// Checkout
checkout scm
// install required bundles
sh 'bundle install'
// build and run tests with coverage
sh 'bundle exec rake build spec'
// Archive the built artifacts
archive (includes: 'pkg/*.gem')
// publish html
publishHTML ([
allowMissing: false,
alwaysLinkToLastBuild: false,
keepAll: true,
reportDir: 'coverage',
reportFiles: 'index.html',
reportName: "RCov Report"
])
}
}

View File

@ -1,6 +1,6 @@
# Hermann
[![Gitter chat](https://badges.gitter.im/lookout/Hermann.png)](https://gitter.im/lookout/Hermann) [![Build Status](https://travis-ci.org/lookout/Hermann.svg?branch=master)](https://travis-ci.org/lookout/Hermann)
[![Gitter chat](https://badges.gitter.im/reiseburo/hermann.png)](https://gitter.im/reiseburo/hermann) [![Build Status](https://travis-ci.org/reiseburo/hermann.svg?branch=master)](https://travis-ci.org/reiseburo/hermann)
A Ruby gem implementing a Kafka Publisher and Consumer
@ -26,7 +26,7 @@ straightforward.
### Producer
#### Zookeeper discovery (JRuby-only)
#### Zookeeper discovery
Discover Kafka brokers through zookeeper. Looks at ```/brokers/ids``` in Zookeeper to find the list of brokers.
@ -48,10 +48,11 @@ promise.state # the state of the promise
```ruby
require 'hermann/producer'
p = Hermann::Producer.new('topic', ['localhost:6667']) # arguments topic, list of brokers
f = p.push('hello world from mri')
f.state
p.tick_reactor
broker_ids_array = Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
p = Hermann::Producer.new('topic', broker_ids_array) # arguments topic, list of brokers
f = p.push('hello world from mri')
f.state
p.tick_reactor
f.state
```
@ -59,18 +60,16 @@ f.state
Messages can be consumed by calling the consume method and passing a block to handle the yielded messages. The consume method blocks, so take care to handle that functionality appropriately (i.e. use Concurrent::Promise, Thread, etc).
#### (JRuby-only)
#### (JRuby)
```ruby
require 'hermann'
require 'hermann/consumer'
require 'hermann_jars'
zookeepers = "localhost:2181"
groupId = "group1"
topic = 'topic'
new_topic = 'other_topic'
the_consumer = Hermann::Consumer.new(topic, groupId, zookeepers)
the_consumer = Hermann::Consumer.new(topic, zookeepers: "localhost:2181", group_id: "group1")
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
puts "Recv: #{msg}"
@ -78,6 +77,57 @@ end
```
#### (MRI)
MRI currently has no zookeeper / client group support.
```ruby
require 'hermann'
require 'hermann/consumer'
topic = 'topic'
new_topic = 'other_topic'
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end
```
### Metadata request (MRI-only)
Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.
```ruby
require 'hermann'
require 'hermann/discovery/metadata'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
topic = c.topic("topic")
puts topic.partitions.first
consumers = topic.partitions.map do |partition|
partition.consumer
end
```
#### Build & Unit Test
First time (from a clean repository):
`bundle install && bundle exec rake`
Thereafter:
`bundle exec rake spec`
#### Testing
To run the integration tests:
* startup your own instance of zookeeper/kafka
* `rspec spec/integration/producer_spec.rb`
#### How to convert from using jruby-kafka

17
RELEASING.md Normal file
View File

@ -0,0 +1,17 @@
# Releasing Hermann
Hermann is a multi-platform gem, which means that two actual `.gem` files need
to be built and uploaded to [rubygems.org](https://rubygems.org/gems/hermann).
Here's the current process that [I](https://github.com/rtyler) use:
* `rvm use ruby@rubygems` (*switch to MRI*)
* `bundle install && rake` (*ensure that MRI tests pass*)
* `rvm use jruby@rubygems` (*switch to JRuby*)
* `bundle install && rake` (*ensure that the JRuby tests pass*)
* `rake release` (*tag the release and upload the `-java` platform gem*)
* `rvm use ruby@rubygems` (*switch back to MRI*)
* `gem build hermann.gemspec` (*build the 'ruby' platform gem*)
* `gem push pkg/hermann-0.blah.gem` (*upload the ruby platform gem*)
This can certainly be cleaned up, but this is the process at it is right now

View File

@ -3,10 +3,10 @@ require 'fileutils'
require "bundler/gem_tasks"
require 'rspec/core/rake_task'
require 'rake/extensiontask'
require 'ci/reporter/rake/rspec'
Rake::ExtensionTask.new do |t|
t.name = 'hermann_lib'
t.name = 'hermann_rdkafka'
t.ext_dir = 'ext/hermann'
t.gem_spec = Gem::Specification.load('hermann.gemspec')
end

View File

@ -122,10 +122,9 @@ class RdKafkaRecipe < MiniPortile
end
################################################################################
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.4')
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.6')
librdkafka.files = ["https://github.com/edenhill/librdkafka/archive/#{librdkafka.version}.tar.gz"]
librdkafka.checksum = '28a3252fd0f31d4a38bea9cd25083a06'
librdkafka.patch_files = Dir["#{File.join(BASE_DIR, 'ext', 'patches', 'librdkafka')}/*.patch"]
librdkafka.checksum = '1b77543f9be82d3f700c0ef98f494990'
checkpoint = ".librdkafka.#{librdkafka.version}.cooked"
unless File.exists?(checkpoint)
@ -146,4 +145,10 @@ dir_config('rdkafka', HEADER_DIRS, LIB_DIRS)
# <http://blog.zachallett.com/howto-ruby-c-extension-with-a-static-library>
$LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a')
create_makefile('hermann/hermann_lib')
have_header('ruby/thread.h')
have_header('ruby/intern.h')
have_header('ruby/version.h')
have_func('rb_thread_blocking_region')
have_func('rb_thread_call_without_gvl')
create_makefile('hermann/hermann_rdkafka')

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.c - Ruby wrapper for the librdkafka library
* hermann_rdkafka.c - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -31,7 +31,20 @@
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
#include "hermann_lib.h"
#include "hermann_rdkafka.h"
/* This header file exposes the functions in librdkafka.a that are needed for
* consistent partitioning. After librdkafka releases a new tag and Hermann
* points to it, this can be removed. */
#include "rdcrc32.h"
#ifdef HAVE_RUBY_VERSION_H
#include <ruby/version.h>
#endif
/* how long to let librdkafka block on the socket before returning back to the interpreter.
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
#define CONSUMER_RECVMSG_TIMEOUT_MS 100
/**
* Convenience function
@ -115,7 +128,7 @@ static void msg_delivered(rd_kafka_t *rk,
/* call back into our Hermann::Result if it exists, discarding the
* return value
*/
if (NULL != push_ctx->result) {
if (NULL != (void *)push_ctx->result) {
rb_funcall(push_ctx->result,
hermann_result_fulfill_method,
2,
@ -126,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
/* This function is in rdkafka.h on librdkafka master. As soon as a new
* version is released and Hermann points to it, this can be removed. */
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -146,13 +171,10 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
for (; retry < partition_cnt; retry++) {
int32_t partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
if (keylen) {
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
} else {
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
}
}
@ -204,22 +226,16 @@ static void hexdump(FILE *fp,
* @param rkmessage rd_kafka_message_t* the message
* @param opaque void* opaque context
*/
static void msg_consume(rd_kafka_message_t *rkmessage,
void *opaque) {
HermannInstanceConfig* cfg;
cfg = (HermannInstanceConfig*)opaque;
static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cfg) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
if (cfg->exit_eof) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
cfg->run = 0;
}
@ -259,8 +275,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage,
// Yield the data to the Consumer's block
if (rb_block_given_p()) {
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
rb_yield(value);
VALUE key, data, offset;
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
offset = rb_ll2inum(rkmessage->offset);
if ( rkmessage->key_len > 0 ) {
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
} else {
key = Qnil;
}
rd_kafka_message_destroy(rkmessage);
rb_yield_values(3, data, key, offset);
}
else {
if (DEBUG) {
@ -322,9 +349,6 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
rd_kafka_set_logger(config->rk, logger);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
/* TODO: offset calculation */
config->start_offset = RD_KAFKA_OFFSET_END;
/* Add brokers */
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
@ -341,7 +365,7 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
// Ruby gem extensions
#ifdef RB_THREAD_BLOCKING_REGION
#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
/* NOTE: We only need this method defined if RB_THREAD_BLOCKING_REGION is
* defined, otherwise it's unused
*/
@ -360,26 +384,88 @@ static void consumer_consume_stop_callback(void *ptr) {
#endif
/**
* Loop on a timeout to receive messages from Kafka. When the consumer_consume_stop_callback is invoked by Ruby,
* we'll break out of our loop and return.
* consumer_recv_msg
*
* Consume a single message from the kafka stream. The only function that should be invoked
* without the GVL held.
*
* @param HermannInstanceConfig* The hermann configuration for this consumer
*
*/
void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
static void *consumer_recv_msg(void *ptr)
{
rd_kafka_message_t *ret;
HermannInstanceConfig *consumerConfig = (HermannInstanceConfig *) ptr;
ret = rd_kafka_consume(consumerConfig->rkt, consumerConfig->partition, CONSUMER_RECVMSG_TIMEOUT_MS);
if ( ret == NULL ) {
if ( errno != ETIMEDOUT )
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno)));
}
return (void *) ret;
}
/**
* consumer_consume_loop
*
* A timeout-interrupted loop in which we drop the GVL and attemptto receive
* messages from Kafka. We'll check every CONSUMER_RECVMSG_TIMEOUT_MS, or
* after every message, to see if the ruby interpreter wants us to exit the
* loop.
*
* @param self The consumer instance
*/
static VALUE consumer_consume_loop(VALUE self) {
HermannInstanceConfig* consumerConfig;
rd_kafka_message_t *msg;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
TRACER("\n");
while (consumerConfig->run) {
if (rd_kafka_consume_callback(consumerConfig->rkt, consumerConfig->partition,
1000/*timeout*/,
msg_consume,
consumerConfig) < 0) {
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno)));
}
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
msg = rb_thread_call_without_gvl(consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#else
msg = consumer_recv_msg(consumerConfig);
#endif
if ( msg ) {
msg_consume(msg, consumerConfig);
}
}
return Qnil;
}
/**
* consumer_consume_loop_stop
*
* called when we're done with the .consume() loop. lets rdkafa cleanup some internal structures
*/
static VALUE consumer_consume_loop_stop(VALUE self) {
HermannInstanceConfig* consumerConfig;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
return Qnil;
}
/**
* Hermann::Lib::Consumer.consume
* Hermann::Provider::RDKafka::Consumer.consume
*
* @param VALUE self the Ruby object for this consumer
* @param VALUE topic the Ruby string representing a topic to consume
@ -407,33 +493,12 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
rb_raise(rb_eRuntimeError,
rb_raise(rb_eRuntimeError, "%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return Qnil;
}
#ifdef RB_THREAD_BLOCKING_REGION
/** The consumer will listen for incoming messages in a loop, timing out and checking the consumerConfig->run
* flag every second.
*
* Call rb_thread_blocking_region to release the GVM lock and allow Ruby to amuse itself while we wait on
* IO from Kafka.
*
* If Ruby needs to interrupt the consumer loop, the stop callback will be invoked and the loop should exit.
*/
rb_thread_blocking_region(consumer_consume_loop,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#else
consumer_consume_loop(consumerConfig);
#endif
/* Stop consuming */
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
return Qnil;
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}
@ -535,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param key VALUE the ruby String containing the key to partition by
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
@ -546,13 +612,14 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = NULL;
delivery_ctx->result = (VALUE) NULL;
TRACER("producerConfig: %p\n", producerConfig);
@ -568,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
@ -591,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
@ -643,19 +716,56 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, conf->error);
rb_raise(rb_eStandardError, "%s", conf->error);
}
return rb_int_new(events);
}
/*
* producer_metadata_request_nogvl
*
* call rd_kafka_metadata without the GVL held. Note that rd_kafka_metadata is not interruptible,
* so in case of interrupt the thread will not respond until timeout_ms is reached.
*
* rd_kafka_metadata will fill in the ctx->data pointer on success
*
* @param ptr void* the hermann_metadata_ctx_t
*/
static void *producer_metadata_request_nogvl(void *ptr)
{
hermann_metadata_ctx_t *ctx = (hermann_metadata_ctx_t*)ptr;
return (void *) rd_kafka_metadata(ctx->rk,
ctx->topic ? 0 : 1,
ctx->topic,
(const struct rd_kafka_metadata **) &(ctx->data),
ctx->timeout_ms);
}
static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
{
int err;
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
NULL, NULL);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
err = (int) rb_thread_call_without_gvl(producer_metadata_request_nogvl, ctx, NULL, NULL);
#else
err = (int) producer_metadata_request_nogvl(ctx);
#endif
return err;
}
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
int timeout_ms = rb_num2int(timeout);
struct rd_kafka_metadata *data = NULL;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
@ -663,17 +773,19 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producer_init_kafka(self, producerConfig);
}
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
&data,
timeout_ms);
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.data = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,
data->topic_cnt);
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
@ -681,11 +793,118 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
if ( md_context.data )
rd_kafka_metadata_destroy(md_context.data);
return result;
}
/*
* producer_metadata_make_hash
*
* transform the rd_kafka_metadata structure into a ruby hash. eg:
* { :brokers => [ {:id=>0, :host=>"172.20.10.3", :port=>9092} ],
* :topics => { "maxwell" => [ {:id=>0, :leader_id=>0, :replica_ids=>[0], :isr_ids=>[0]}]} }
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata_make_hash(struct rd_kafka_metadata *data)
{
int i, j, k;
VALUE broker_hash, topic_hash, partition_ary, partition_hash, partition_replica_ary, partition_isr_ary;
VALUE hash = rb_hash_new();
VALUE brokers = rb_ary_new2(data->broker_cnt);
VALUE topics = rb_hash_new();
for ( i = 0; i < data->broker_cnt; i++ ) {
broker_hash = rb_hash_new();
rb_hash_aset(broker_hash, ID2SYM(rb_intern("id")), INT2FIX(data->brokers[i].id));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("host")), rb_str_new2(data->brokers[i].host));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("port")), INT2FIX(data->brokers[i].port));
rb_ary_push(brokers, broker_hash);
}
for ( i = 0; i < data->topic_cnt; i++ ) {
partition_ary = rb_ary_new2(data->topics[i].partition_cnt);
for ( j = 0 ; j < data->topics[i].partition_cnt ; j++ ) {
VALUE partition_hash = rb_hash_new();
rd_kafka_metadata_partition_t *partition = &(data->topics[i].partitions[j]);
/* id => 1, leader_id => 0 */
rb_hash_aset(partition_hash, ID2SYM(rb_intern("id")), INT2FIX(partition->id));
rb_hash_aset(partition_hash, ID2SYM(rb_intern("leader_id")), INT2FIX(partition->leader));
/* replica_ids => [1, 0] */
partition_replica_ary = rb_ary_new2(partition->replica_cnt);
for ( k = 0 ; k < partition->replica_cnt ; k++ ) {
rb_ary_push(partition_replica_ary, INT2FIX(partition->replicas[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("replica_ids")), partition_replica_ary);
/* isr_ids => [1, 0] */
partition_isr_ary = rb_ary_new2(partition->isr_cnt);
for ( k = 0 ; k < partition->isr_cnt ; k++ ) {
rb_ary_push(partition_isr_ary, INT2FIX(partition->isrs[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("isr_ids")), partition_isr_ary);
rb_ary_push(partition_ary, partition_hash);
}
rb_hash_aset(topics, rb_str_new2(data->topics[i].topic), partition_ary);
}
rb_hash_aset(hash, ID2SYM(rb_intern("brokers")), brokers);
rb_hash_aset(hash, ID2SYM(rb_intern("topics")), topics);
return hash;
}
/*
* producer_metadata
*
* make a metadata request to the kafka server, returning a hash
* containing a list of brokers and topics.
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
@ -739,6 +958,9 @@ static void consumer_free(void *p) {
rd_kafka_destroy(config->rk);
}
free(config->topic);
free(config->brokers);
// clean up the struct
free(config);
}
@ -794,11 +1016,13 @@ static VALUE consumer_allocate(VALUE klass) {
* @param topic VALUE a Ruby string
* @param brokers VALUE a Ruby string containing list of host:port
* @param partition VALUE a Ruby number
* @param offset VALUE a Ruby number
*/
static VALUE consumer_initialize(VALUE self,
VALUE topic,
VALUE brokers,
VALUE partition) {
VALUE partition,
VALUE offset) {
HermannInstanceConfig* consumerConfig;
char* topicPtr;
@ -812,13 +1036,24 @@ static VALUE consumer_initialize(VALUE self,
partitionNo = FIX2INT(partition);
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
consumerConfig->topic = topicPtr;
consumerConfig->brokers = brokersPtr;
consumerConfig->topic = strdup(topicPtr);
consumerConfig->brokers = strdup(brokersPtr);
consumerConfig->partition = partitionNo;
consumerConfig->run = 1;
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
if ( FIXNUM_P(offset) ) {
consumerConfig->start_offset = FIX2LONG(offset);
} else if ( SYMBOL_P(offset) ) {
if ( offset == ID2SYM(rb_intern("start")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if ( offset == ID2SYM(rb_intern("end")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
} else {
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
}
return self;
}
@ -984,21 +1219,21 @@ static VALUE producer_init_copy(VALUE copy,
}
/**
* Init_hermann_lib
* Init_hermann_rdkafka
*
* Called by Ruby when the Hermann gem is loaded.
* Defines the Hermann module.
* Defines the Producer and Consumer classes.
*/
void Init_hermann_lib() {
VALUE lib_module, c_consumer, c_producer;
void Init_hermann_rdkafka() {
VALUE lib_module, provider_module, c_consumer, c_producer;
TRACER("setting up Hermann::Lib\n");
TRACER("setting up Hermann::Provider::RDKafka\n");
/* Define the module */
hermann_module = rb_define_module("Hermann");
lib_module = rb_define_module_under(hermann_module, "Lib");
provider_module = rb_define_module_under(hermann_module, "Provider");
lib_module = rb_define_module_under(provider_module, "RDKafka");
/* ---- Define the consumer class ---- */
c_consumer = rb_define_class_under(lib_module, "Consumer", rb_cObject);
@ -1007,7 +1242,7 @@ void Init_hermann_lib() {
rb_define_alloc_func(c_consumer, consumer_allocate);
/* Initialize */
rb_define_method(c_consumer, "initialize", consumer_initialize, 3);
rb_define_method(c_consumer, "initialize", consumer_initialize, 4);
rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1);
/* Consumer has method 'consume' */
@ -1024,7 +1259,7 @@ void Init_hermann_lib() {
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 3);
rb_define_method(c_producer, "push_single", producer_push_single, 4);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);
@ -1037,4 +1272,7 @@ void Init_hermann_lib() {
/* Producer.connect */
rb_define_method(c_producer, "connect", producer_connect, 1);
/* Producer.metadata */
rb_define_method(c_producer, "metadata", producer_metadata, 2);
}

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.h - Ruby wrapper for the librdkafka library
* hermann_rdkafka.h - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -32,6 +32,14 @@
#include <ruby.h>
#ifdef HAVE_RUBY_THREAD_H
#include <ruby/thread.h>
#endif
#ifdef HAVE_RUBY_INTERN_H
#include <ruby/intern.h>
#endif
#include <ctype.h>
#include <signal.h>
#include <string.h>
@ -92,16 +100,24 @@ typedef struct HermannInstanceConfig {
int isConnected;
int isErrored;
char *error;
} HermannInstanceConfig;
typedef HermannInstanceConfig hermann_conf_t;
typedef struct {
/* Hermann::Lib::Producer */
/* Hermann::Provider::RDKafka::Producer */
hermann_conf_t *producer;
/* Hermann::Result */
VALUE result;
} hermann_push_ctx_t;
typedef struct {
rd_kafka_t *rk;
rd_kafka_topic_t *topic;
struct rd_kafka_metadata *data;
int timeout_ms;
} hermann_metadata_ctx_t;
#endif

103
ext/hermann/rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

View File

@ -1,57 +0,0 @@
From 888ca33b571d99e877d665235b822f7c961c8fdb Mon Sep 17 00:00:00 2001
From: "R. Tyler Croy" <tyler@monkeypox.org>
Date: Thu, 28 Aug 2014 16:24:04 -0700
Subject: [PATCH 6/8] Update some headers to include the right headers to build
on FreeBSD
---
src/rd.h | 9 +++++++++
src/rdaddr.h | 4 ++++
2 files changed, 13 insertions(+)
diff --git a/src/rd.h b/src/rd.h
index c31501e..4789493 100644
--- a/src/rd.h
+++ b/src/rd.h
@@ -37,7 +37,11 @@
#include <errno.h>
#include <time.h>
#include <sys/time.h>
+
+#ifndef __FreeBSD__
+/* alloca(3) is in stdlib on FreeBSD */
#include <alloca.h>
+#endif
#include <assert.h>
#include <pthread.h>
@@ -110,6 +114,11 @@
# endif
#endif /* sun */
+#ifdef __FreeBSD__
+/* FreeBSD defines be64toh() in sys/endian.h */
+#include <sys/endian.h>
+#endif
+
#ifndef be64toh
#ifndef __APPLE__
#ifndef sun
diff --git a/src/rdaddr.h b/src/rdaddr.h
index 0b37354..e55bd55 100644
--- a/src/rdaddr.h
+++ b/src/rdaddr.h
@@ -32,6 +32,10 @@
#include <arpa/inet.h>
#include <netdb.h>
+#ifdef __FreeBSD__
+#include <sys/socket.h>
+#endif
+
/**
* rd_sockaddr_inx_t is a union for either ipv4 or ipv6 sockaddrs.
* It provides conveniant abstraction of AF_INET* agnostic operations.
--
1.9.0

View File

@ -16,7 +16,7 @@ Gem::Specification.new do |s|
s.homepage = 'https://github.com/lookout/Hermann'
s.licenses = ['MIT']
s.files = [ "Rakefile"]
s.files = ['Rakefile']
s.files += `git ls-files -- lib`.split($\)
s.files += `git ls-files -- ext`.split($\)
@ -24,17 +24,20 @@ Gem::Specification.new do |s|
s.rubygems_version = '2.2.2'
s.specification_version = 3 if s.respond_to?(:specification_version)
s.add_dependency 'concurrent-ruby', '~> 0.7.0'
s.add_dependency 'json', '~> 1.8.2'
s.add_dependency 'thread_safe', '~> 0.3.4'
if RUBY_PLATFORM == "java"
s.files << 'lib/hermann_jars.rb'
s.add_dependency 'concurrent-ruby', '~> 1.0.0'
# IMPORTANT: make sure that jar-dependencies is only a development
# dependency of your gem. if it is a runtime dependencies the require_jars
# file will be overwritten during installation.
s.add_dependency 'jar-dependencies', '~>0.1.9'
s.requirements << "jar org.apache.kafka:kafka_2.10, ~>0.8.1.1, ['junit:junit']"
s.add_dependency 'jar-dependencies', ['~> 0.1', '>= 0.1.10']
s.requirements << "jar org.apache.kafka:kafka_2.11, ~> 0.8.2.2"
# use log4j-1.2.16+ to as 1.2.15 declares deps which are not in maven central and causes the dep resolution to fail
s.requirements << "jar log4j:log4j, ~>1.2.16"
s.requirements << "jar log4j:log4j, ~> 1.2.16"
s.require_paths = ["lib"]
s.platform = 'java'
else

View File

@ -1,40 +1,43 @@
require 'hermann'
require 'hermann/errors'
if Hermann.jruby?
require 'hermann/provider/java_simple_consumer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
# Hermann::Consumer provides a simple consumer API which is only safe to be
# executed in a single thread
class Consumer
attr_reader :topic, :brokers, :partition, :internal
attr_reader :topic, :internal
# Instantiate Consumer
#
# @params [String] kafka topic
#
# @params [String] group ID
#
# @params [String] comma separated zookeeper list
#
# @params [Hash] options for Consumer
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [Integer] :partition (for MRI) The kafka partition
def initialize(topic, groupId, zookeepers, opts={})
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [Integer] :partition (for MRI) The kafka partition
# @option opts [Symbol|Fixnum] :offset (for MRI) Starting consumer offset. either :start, :end, or Fixnum
# @option opts [Integer] :zookeepers (for jruby) list of zookeeper servers
# @option opts [Integer] :group_id (for jruby) client group_id
#
def initialize(topic, opts = {})
@topic = topic
@brokers = brokers
@partition = partition
offset = opts.delete(:offset)
raise Hermann::Errors::InvalidOffsetError.new("Bad offset: #{offset}") unless valid_offset?(offset)
if Hermann.jruby?
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, groupId, topic, opts)
zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
else
brokers = opts.delete(:brokers)
partition = opts.delete(:partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
brokers, partition = require_values_at(opts, :brokers, :partition)
@internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset)
end
end
@ -51,5 +54,18 @@ module Hermann
#no op
end
end
private
def valid_offset?(offset)
offset.nil? || offset.is_a?(Fixnum) || offset == :start || offset == :end
end
def require_values_at(opts, *args)
args.map do |a|
raise "Please provide :#{a} option!" unless opts[a]
opts.delete(a)
end
end
end
end

View File

@ -0,0 +1,77 @@
require 'hermann_rdkafka'
require 'hermann/consumer'
module Hermann
module Discovery
class Metadata
Broker = Struct.new(:id, :host, :port) do
def to_s
"#{host}:#{port}"
end
end
Topic = Struct.new(:name, :partitions)
Partition = Struct.new(:id, :leader, :replicas, :insync_replicas, :topic_name) do
def consumer(offset=:end)
Hermann::Consumer.new(topic_name, brokers: ([leader] + replicas).join(','), partition: id, offset: offset)
end
end
DEFAULT_TIMEOUT_MS = 2_000
def initialize(brokers, options = {})
raise "this is an MRI api only!" if Hermann.jruby?
@internal = Hermann::Provider::RDKafka::Producer.new(brokers)
@timeout = options[:timeout] || DEFAULT_TIMEOUT_MS
end
#
# @internal.metadata returns:
# {:brokers => [{:id=>3, :host=>"kafka3.alpha4.sac1.zdsys.com", :port=>9092}],
# :topics => {"testtopic"=>[{:id=>0, :leader_id=>3, :replica_ids=>[3, 1], :isr_ids=>[3, 1]}}}
#
def brokers
brokers_from_metadata(@internal.metadata(nil, @timeout))
end
def topic(t)
get_topics(t)[t]
end
def topics
get_topics
end
private
def get_topics(filter_topics = nil)
md = @internal.metadata(filter_topics, @timeout)
broker_hash = brokers_from_metadata(md).inject({}) do |h, broker|
h[broker.id] = broker
h
end
md[:topics].inject({}) do |topic_hash, arr|
topic_name, raw_partitions = *arr
partitions = raw_partitions.map do |p|
leader = broker_hash[p[:leader_id]]
all_replicas = p[:replica_ids].map { |i| broker_hash[i] }
isr_replicas = p[:isr_ids].map { |i| broker_hash[i] }
Partition.new(p[:id], leader, all_replicas, isr_replicas, topic_name)
end
topic_hash[topic_name] = Topic.new(topic_name, partitions)
topic_hash
end
end
def brokers_from_metadata(md)
md[:brokers].map do |h|
Broker.new(h[:id], h[:host], h[:port])
end
end
end
end
end

View File

@ -24,6 +24,9 @@ module Hermann
# cannot discover brokers from zookeeper
class NoBrokersError < GeneralError; end
# offsets can only be two symbols or a fixnum
class InvalidOffsetError < GeneralError; end
end
end

View File

@ -1,6 +1,7 @@
module Hermann
require 'java'
require 'hermann_jars'
require 'concurrent'
module JavaUtil
include_package 'java.util'

View File

@ -6,7 +6,7 @@ require 'hermann/result'
if RUBY_PLATFORM == "java"
require 'hermann/provider/java_producer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
@ -23,7 +23,7 @@ module Hermann
if Hermann.jruby?
@internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts)
else
@internal = Hermann::Lib::Producer.new(brokers.join(','))
@internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(','))
end
# We're tracking children so we can make sure that at Producer exit we
# make a reasonable attempt to clean up outstanding result objects
@ -63,8 +63,7 @@ module Hermann
end
if Hermann.jruby?
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
result = @internal.push_single(value, topic, opts[:partition_key], nil)
unless result.nil?
@children << result
end
@ -76,7 +75,7 @@ module Hermann
# librdkafka callback queue overflow
tick_reactor
result = create_result
@internal.push_single(value, topic, result)
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
end
return result
@ -103,7 +102,8 @@ module Hermann
@children.each do |child|
# Skip over any children that should already be reaped for other
# reasons
next if child.completed?
next if (Hermann.jruby? ? child.fulfilled? : child.completed?)
# Propagate errors to the remaining children
child.internal_set_error(ex)
end
@ -119,7 +119,7 @@ module Hermann
# Filter all children who are no longer pending/fulfilled
total_children = @children.size
@children = @children.reject { |c| c.completed? }
@children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }
return (total_children - children.size)
end

View File

@ -1,6 +1,5 @@
require 'hermann'
require 'concurrent'
require 'json'
require 'hermann'
require 'hermann/errors'
module Hermann
@ -44,7 +43,8 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, key)
def push_single(msg, topic, key, _)
key = key && key.to_java
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin

View File

@ -1,3 +1,3 @@
module Hermann
VERSION = '0.22.0'
VERSION = '0.26.1'
end

View File

@ -1,13 +0,0 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
require_jar( 'jline', 'jline', '0.9.94' )
require_jar( 'com.101tec', 'zkclient', '0.3' )
require_jar( 'log4j', 'log4j', '1.2.17' )
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )

View File

@ -2,7 +2,7 @@ require 'rubygems'
require 'lib/hermann'
require 'lib/hermann/consumer'
c = Hermann::Consumer.new( "lms_messages", "localhost:9092", 0 )
c = Hermann::Consumer.new( "lms_messages", :zookeepers => "localhost:2181", :group_id => "lms_message_consumer" )
t1 = 0
c.consume() do
|msg| puts("Received: #{msg}")

View File

@ -0,0 +1,25 @@
require 'rubygems'
$LOAD_PATH << File.dirname(__FILE__) + "/../lib"
$LOAD_PATH << File.dirname(__FILE__) + "/../ext"
require 'hermann'
require 'hermann/consumer'
t1 = 0
threads = []
100.times do |i|
threads << Thread.new do
puts "booting #{i}"
c = Hermann::Consumer.new( "maxwell", brokers: "localhost:9092", partition: i, offset: :start)
c.consume() do
|msg| puts("Received: #{msg}")
if(t1 == 0)
t1 = Time.now
end
t2 = Time.now
elapsed = t2 - t1
puts("Total elapsed time: #{elapsed} seconds")
end
end
end
threads.each(&:join)

15
scripts/metadata_mri.rb Normal file
View File

@ -0,0 +1,15 @@
require 'bundler/setup'
require 'hermann'
require 'hermann/discovery/metadata'
require 'hermann/consumer'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
c.topic("maxwell")
puts c.topic("maxwell").inspect
puts c.brokers.inspect
consumer = Hermann::Consumer.new("maxwell", brokers: "localhost:9092, localhost:9092", partition: c.topic('maxwell').partitions.first.id, offset: :start)
consumer.consume do |c|
puts c
end

View File

@ -1,15 +1,17 @@
require 'spec_helper'
require 'hermann/consumer'
require 'hermann/errors'
# XXX: Hermann::Consumer isn't really supported anywhere, MRI included right
# now
describe Hermann::Consumer do
subject(:consumer) { described_class.new(topic, nil, nil, opts) }
subject(:consumer) { described_class.new(topic, opts) }
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
let(:partition) { 1 }
let(:opts) { { :brokers => brokers, :partition => partition } }
let(:offset) { nil }
let(:opts) { { :brokers => brokers, :partition => partition, :offset => offset } }
context "on C ruby", :platform => :mri do
@ -36,6 +38,13 @@ describe Hermann::Consumer do
let(:topic) { '' }
it_behaves_like 'an error condition'
end
context 'with a bad offset' do
let(:offset) { :foo }
it "raises an InvalidOffset error" do
expect { subject }.to raise_error(Hermann::Errors::InvalidOffsetError)
end
end
end
describe '#shutdown' do
@ -46,7 +55,7 @@ describe Hermann::Consumer do
end
context 'on Jruby', :platform => :java do
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
subject(:consumer) { described_class.new(topic, :group_id => groupId, :zookeepers => zookeepers) }
let(:zookeepers) { 'localhost:2181' }
let(:groupId) { 'groupId' }

View File

@ -1,13 +1,13 @@
require 'spec_helper'
describe 'Hermann::Lib::Producer', :platform => :mri do
describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
before :all do
require 'hermann_lib'
require 'hermann_rdkafka'
end
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
subject(:producer) { Hermann::Provider::RDKafka::Producer.new(brokers) }
let(:timeout) { 3000 }
it { should respond_to :push_single }
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, 'test-topic', nil)
producer.push_single(example.full_description, 'test-topic', '', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, topic, nil) }
subject(:push) { producer.push_single(message, topic, '', nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', topic, nil)
producer.push_single('hello', topic, '', nil)
end
it 'should return successfully' do

View File

@ -3,18 +3,19 @@ require 'spec_helper'
require 'hermann/producer'
require 'hermann/consumer'
require 'hermann/discovery/zookeeper'
require 'concurrent'
require 'protobuf'
require_relative '../fixtures/testevent.pb'
describe 'producer' do
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../'))
require 'fixtures/testevent.pb'
describe 'producer', :platform => :java do
include_context 'integration test context'
let(:timeout) { 10 }
let(:message) { 'msg' }
let(:consumer) do
Hermann::Consumer.new(topic, "rspec-group", zookeepers)
Hermann::Consumer.new(topic, { :group_id => "rspec-group", :zookeepers => zookeepers })
end
let(:consumer_promise) do
Concurrent::Promise.execute do

View File

@ -11,10 +11,11 @@ describe Hermann::Producer do
describe '#initialize' do
context 'with C ruby', :platform => :mri do
it 'joins broker array' do
expect(Hermann::Lib::Producer).to receive(:new).with(brokers.first)
expect(Hermann::Provider::RDKafka::Producer).to receive(:new).with(brokers.first)
expect(producer).to be_a Hermann::Producer
end
end
context 'with Java', :platform => :java do
it 'joins broker array' do
expect(Hermann::Provider::JavaProducer).to receive(:new).with(brokers.first, opts)
@ -42,34 +43,34 @@ describe Hermann::Producer do
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
producer.push(msg, :topic => passed_topic)
end
context 'and an array of messags' do
it 'should propagate the topic' do
messages = 3.times.map { |i| msg }
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
producer.push(messages, :topic => passed_topic)
end
end
end
context 'with explicit partition key', :platform => :java do
context 'with explicit partition key' do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
producer.push(msg, :partition_key => partition_key)
end
end
@ -177,7 +178,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
end
expect(result).to be_instance_of Array
@ -190,7 +191,7 @@ describe Hermann::Producer do
describe '#tick_reactor' do
let(:timeout) { 0 }
let(:internal) { double('Hermann::Lib::Producer mock') }
let(:internal) { double('Hermann::Provider::RDKafka::Producer mock') }
subject(:tick) { producer.tick_reactor(timeout) }
before :each do

View File

@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
let(:part_key) { "key".to_java }
let(:part_key) { "key" }
let(:msg) { "bar" }
describe '#push_single' do
subject(:result) { producer.push_single(msg, topic, nil) }
subject(:result) { producer.push_single(msg, topic, nil, nil) }
let(:passed_topic) { 'foo' }
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
producer.push_single(msg, passed_topic, nil).wait(1)
producer.push_single(msg, passed_topic, nil, nil).wait(1)
end
it 'can change partition key' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
producer.push_single(msg, passed_topic, part_key).wait(1)
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
end
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec', topic, nil).wait(1)
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
expect(promise).to be_rejected
expect { promise.value! }.to raise_error
end

View File

@ -2,6 +2,13 @@ require 'rubygems'
require 'yaml'
require 'rspec'
require 'simplecov'
require 'simplecov-rcov'
SimpleCov.start do
formatter = SimpleCov::Formatter::RcovFormatter
end
# Add ext/ to the load path so we can load `hermann_lib`
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../lib/'))
@ -13,8 +20,8 @@ RSpec.configure do |c|
c.formatter = :documentation
shared_context 'integration test context', :type => :integration do
let(:topic) { $integrationconf['kafka']['topic'] }
let(:brokers) { $integrationconf['kafka']['brokers'] }
let(:topic) { "hermann_testing" }
let(:brokers) { $integrationconf['kafka']['brokers'].join(',') }
let(:zookeepers) { $integrationconf['zookeepers'] }
end
end