Compare commits

...

68 Commits

Author SHA1 Message Date
R. Tyler Croy d822ae541a Merge pull request #138 from bitwiseman/patch-3
Locked rake version to ~11.3.0
2016-12-13 17:24:47 -08:00
Liam Newman f6f71c98e4 Locked rake version to ~11.3.0
Rake 12.0 replace "last_comment" with "last_description".  

This describes the issue and possible fix:
http://stackoverflow.com/questions/35893584/nomethoderror-undefined-method-last-comment-after-upgrading-to-rake-11

Rather than upgrade rspec to 3.5.x and risk changes to test behavior, I've locked rake to the last version before 12.0.
2016-12-13 15:53:55 -08:00
R. Tyler Croy f9957b5629 Merge pull request #137 from bitwiseman/patch-2
Updated for latest Pipeline fixes
2016-11-17 13:56:48 -08:00
Liam Newman 35eba84dc6 Updated for latest Pipeline fixes
JENKINS-29711 was fixed and stage step now uses curly braces
2016-11-17 13:53:09 -08:00
R. Tyler Croy 68515effc7 Merge pull request #136 from bitwiseman/feature/simplepipe
Simplify Pipeline
2016-07-06 17:52:11 -07:00
Liam Newman 69af2e310f Simplify Pipeline 2016-07-06 17:01:35 -07:00
R. Tyler Croy 9473c8b24b Merge pull request #135 from bitwiseman/feature/jenkinsfile
Add Jenkins pipeline
2016-06-30 10:15:37 -07:00
Liam Newman e0225b6761 Add Jenkins pipeline 2016-06-29 17:36:30 -07:00
R. Tyler Croy a4d678deaf Merge pull request #134 from bitwiseman/patch-1
Update README.md
2016-06-17 13:12:51 -07:00
Liam Newman 5dd952145b Update README.md
Add simple build and unit test instructions.   Similar to what is in releasing document.
2016-06-17 13:09:26 -07:00
R. Tyler Croy 9f5cf12151
No sense supporting ree in any capacity any more 2016-05-29 18:32:37 -07:00
R. Tyler Croy f71ab11ea2 Merge pull request #132 from thedrow/patch-1
Added Ruby 2.2 and 2.3 to the matrix
2016-05-29 18:29:58 -07:00
R. Tyler Croy 436ec9be82 Merge pull request #133 from jfeltesse-mdsol/concurrent_ruby
depend on concurrent-ruby ~> 1.0.0
2016-02-24 08:29:11 -08:00
Julien Feltesse 587f3be16e depend on concurrent-ruby ~> 1.0.0 2016-02-24 18:19:06 +09:00
Omer Katz c2b2274af3 Specify 2.3.0 instead of 2.3 for travis. 2016-02-04 17:06:39 +02:00
Omer Katz c8fe7e06b5 Added Ruby 2.2 and 2.3 to the matrix. 2016-02-04 14:17:55 +02:00
R. Tyler Croy 246f27e76d
Add some code coverage while we're here 2016-02-03 05:32:57 -08:00
R. Tyler Croy bc560f21ea
Add support for generating JUnit-compatible output for Jenkins 2016-02-03 05:09:30 -08:00
R. Tyler Croy 9f8fbe381c Merge pull request #128 from rtyler/minor-doc-fixes
Minor doc fixes
2015-09-30 09:51:42 -07:00
R. Tyler Croy 5b669504af
Document the current janky release process
At least this way I won't forget each time

Fixes #126
2015-09-30 09:45:20 -07:00
R. Tyler Croy d4a350c65c
Update the gitter and travis badges after repo move 2015-09-30 09:39:10 -07:00
R. Tyler Croy 17b51359a9 Merge pull request #127 from rtyler/upgrade-scala-dependency
Upgrade scala dependency
2015-09-28 11:18:33 -07:00
R. Tyler Croy 0ad9be88f0
Upgrade bundler for our Travis build 2015-09-28 11:12:33 -07:00
R. Tyler Croy 8422e82125
Move hermann to the newer faster travis infra 2015-09-28 10:25:43 -07:00
R. Tyler Croy 1c7543f730
Upgrade to a Kafka (JVM) version which uses Scala 2.11
Scala 2.10 is three years old and it appears to me tha 2.10 is moderately
broken and old, at that 2.11 is the bare minimum folks should be using these
days (see also: https://github.com/twitter/scrooge/pull/198)
2015-09-28 08:43:50 -07:00
R. Tyler Croy 9d5b773542
Properly include hermann_jars.rb in the built gem but exclude from the tree 2015-09-23 13:19:40 -07:00
R. Tyler Croy ecae27dd65 Merge pull request #125 from rtyler/no-fixed-jars
Remove hard-coded hermann_jars.rb
2015-09-23 08:08:24 -07:00
R. Tyler Croy e9d301be3d
Remove hermann_jars.rb to gem installation to generate it properly
Fixes #124
2015-09-23 07:52:21 -07:00
R. Tyler Croy 4dd732dc63
Bump the version for a minor release 2015-09-22 12:08:28 -07:00
R. Tyler Croy 5c20c6c5ba
Another minor version bump to clean our dependency range up 2015-09-22 12:00:04 -07:00
R. Tyler Croy 9f1e16071f
Bump the minor for some new functionality, at least on MRI 2015-09-22 12:00:04 -07:00
R. Tyler Croy 74cb8656e1 Merge pull request #121 from braintree/stable-partition
Add support for passing partition_key in MRI
2015-09-22 11:59:32 -07:00
jakesandlund 6946d4d82c Fix java_producer_spec for modified push_single signature 2015-09-22 18:36:10 +00:00
jakesandlund c64d38cff3 Comment that rdcrc32.h and rd_kafka_msg_partitioner_consistent can be removed when librdkafka tags and Hermann updates 2015-09-16 16:04:35 +00:00
jakesandlund c29bb5e4d0 Move to_java into java_producer to make push_single signature match MRI 2015-09-16 15:54:42 +00:00
cory and jakesandlund e8703e1df4 Add support for passing partition_key in MRI 2015-09-14 16:11:43 +00:00
R. Tyler Croy cd58cb33cd Merge pull request #120 from mkristian/patch-1
use semantic versioning for jar-dependency runtime dependency
2015-09-08 07:31:58 -07:00
Christian Meier 763e2cce97 use semantic versioning for jar-dependency runtime dependency 2015-09-08 09:26:09 +02:00
R. Tyler Croy e276f60b27 Merge pull request #114 from zendesk/yield_key_and_offset
yield key and offset into Consumer#consume block
2015-07-01 04:49:18 -07:00
Ben Osheroff 2c99af440e surround if with braces 2015-06-30 10:51:19 -07:00
Ben Osheroff 5b8dd6feef yield key and offset into Consumer#consume block 2015-06-30 10:49:37 -07:00
R. Tyler Croy 60bc473fdd Merge pull request #118 from zendesk/namespace
REFACTOR ONLY: Namespace cleanup
2015-06-27 11:04:23 -07:00
Ben Osheroff 9edc4b9301 move hermann_lib -> hermann_rdkafka / Hermann::Provider::RDKafka 2015-06-23 09:01:31 -07:00
Ben Osheroff a9d80242dd rename hermann_lib -> hermann_rdkafka 2015-06-22 19:39:56 -07:00
R. Tyler Croy c6fe9838d7 Merge pull request #116 from rtyler/jar-dep-change
Change jar-dependencies to work with current jbundler installs
2015-06-18 18:15:51 -07:00
R. Tyler Croy 45fe45cb96
Remove the exclusion syntax from the kafka jar-dependency which breaks in the latest jbundler
The version of jar-dependencies we were testing against was yanked and with
jbundler and jar-dependencies (0.7.[3-4] and 0.1.10 respectively) I cannot get
Hermann to successfully install with jbundler
2015-06-18 17:11:36 -07:00
R. Tyler Croy c5707f5515 Merge pull request #110 from zendesk/cleanup_warnings
cleanup warnings
2015-06-17 08:05:47 -07:00
Ben Osheroff 783d7dac0d cleanup a few merge-induced warnings 2015-06-16 23:45:25 -07:00
Ben Osheroff 09df7ec1b9 Merge remote-tracking branch 'upstream/master' into cleanup_warnings 2015-06-16 23:44:13 -07:00
Ben Osheroff e7fce39f83 Merge remote-tracking branch 'upstream/master' into cleanup_warnings 2015-06-16 23:42:09 -07:00
Ben Osheroff d8b8f83690 1.8.7 has no version.h 2015-06-16 20:55:55 -07:00
R. Tyler Croy 7f63e3c0d3 Merge pull request #109 from zendesk/better_postloop_cleanup
Better postloop cleanup
2015-06-15 06:55:35 -07:00
R. Tyler Croy c272bff063 Merge pull request #108 from zendesk/metadata
Add the ability to request cluster / topic metadata from the brokers
2015-06-15 06:54:31 -07:00
Ben Osheroff 5c898144f2 cleanup warnings
- fix partition selection function
- get ruby >= 2 calling the correct no-gvl function
2015-06-13 19:47:07 -07:00
Ben Osheroff 17e5c5b31d ensure we call consumer_consume_loop_stop at the end of the loop
we need rb_ensure so that if the loop terminates in a "break" statement
we'll still call rd_kafka_consume_stop
2015-06-13 19:06:45 -07:00
Ben Osheroff 942fd87728 call rd_kafka_message_destroy right before rb_yield()
if rb_yield() ends in a "break" statement, it never returns control back
to the caller; thus we leak the message.
2015-06-13 19:05:25 -07:00
Ben Osheroff 3f0da9e0cd stop using named params in Partition#consumer 2015-06-13 14:12:57 -07:00
Ben Osheroff 0bc9e9d9ee don't destory metadata unless it's allocated 2015-06-12 18:11:34 -07:00
Ben Osheroff 83ea24a20e Merge remote-tracking branch 'upstream/master' into metadata 2015-06-11 16:22:32 -07:00
Ben Osheroff f3b6df06d3 update README.md with metadata request 2015-06-11 16:21:08 -07:00
Ben Osheroff c2540a8410 update api to have #topic(topic) as well as #topics
also add Partition#consumer to instantiate a coonsumer object on the
particular partition
2015-06-11 15:29:02 -07:00
Ben Osheroff 7edd297071 add Broker#to_s 2015-06-11 15:25:01 -07:00
Ben Osheroff e7e1a2a7ac raise default timeout and make it configurable 2015-05-19 13:51:59 -07:00
Ben Osheroff 314ea3d8f8 fetching an empty topic doesn't work 2015-05-19 13:44:57 -07:00
Ben Osheroff 326b428b34 slap a friendlier face on the returned metadata interface 2015-05-19 13:34:39 -07:00
Ben Osheroff 7779654f36 Merge remote-tracking branch 'upstream/master' into metadata 2015-05-19 11:38:08 -07:00
Ben Osheroff 5942e1810b fix bug, constantize TIMEOUT_MS, add README.md info 2015-05-19 11:33:13 -07:00
Ben Osheroff e38b626b71 Add metadata API to Hermann::Discovery module 2015-05-19 11:13:27 -07:00
23 changed files with 596 additions and 106 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ tmp/
Gemfile.lock
Jarfile.lock
.jbundler/
lib/hermann_jars.rb
coverage/
spec/reports

View File

@ -1,10 +1,17 @@
language: ruby
sudo: false
rvm:
- ruby-head
- 2.3.0
- 2.2
- 2.1
- 1.9.3
- jruby
- ree
# upgrading bundler since our gemspec relying on lib/hermann_jars.rb being
# present causes troubles with bundler .17.x
before_install:
- gem install bundler -N --version ">= 1.8.0"
#deploy:
# provider: rubygems
# gemspec: hermann.gemspec

View File

@ -4,7 +4,7 @@ gemspec
group :development do
gem 'jbundler', :platform => :jruby
gem 'rake'
gem 'rake', '~> 11.3.0'
gem 'i18n', '~> 0.6.11', :platform => :mri_18
gem 'activesupport', '~> 3.x', :platform => :mri_18
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
@ -23,4 +23,8 @@ group :test do
# Used for testing encoding protobufs in an out of Hermann in integration
# tests
gem 'protobuffy'
gem 'ci_reporter_rspec'
gem 'simplecov'
gem 'simplecov-rcov'
end

33
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,33 @@
#!groovy
/* Only keep the 10 most recent builds. */
properties([[$class: 'BuildDiscarderProperty',
strategy: [$class: 'LogRotator', numToKeepStr: '10']]])
stage ('Build') {
node {
// Checkout
checkout scm
// install required bundles
sh 'bundle install'
// build and run tests with coverage
sh 'bundle exec rake build spec'
// Archive the built artifacts
archive (includes: 'pkg/*.gem')
// publish html
publishHTML ([
allowMissing: false,
alwaysLinkToLastBuild: false,
keepAll: true,
reportDir: 'coverage',
reportFiles: 'index.html',
reportName: "RCov Report"
])
}
}

View File

@ -1,6 +1,6 @@
# Hermann
[![Gitter chat](https://badges.gitter.im/lookout/Hermann.png)](https://gitter.im/lookout/Hermann) [![Build Status](https://travis-ci.org/lookout/hermann.svg?branch=master)](https://travis-ci.org/lookout/hermann)
[![Gitter chat](https://badges.gitter.im/reiseburo/hermann.png)](https://gitter.im/reiseburo/hermann) [![Build Status](https://travis-ci.org/reiseburo/hermann.svg?branch=master)](https://travis-ci.org/reiseburo/hermann)
A Ruby gem implementing a Kafka Publisher and Consumer
@ -90,11 +90,38 @@ new_topic = 'other_topic'
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
puts "Recv: #{msg}"
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end
```
### Metadata request (MRI-only)
Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.
```ruby
require 'hermann'
require 'hermann/discovery/metadata'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
topic = c.topic("topic")
puts topic.partitions.first
consumers = topic.partitions.map do |partition|
partition.consumer
end
```
#### Build & Unit Test
First time (from a clean repository):
`bundle install && bundle exec rake`
Thereafter:
`bundle exec rake spec`
#### Testing
To run the integration tests:

17
RELEASING.md Normal file
View File

@ -0,0 +1,17 @@
# Releasing Hermann
Hermann is a multi-platform gem, which means that two actual `.gem` files need
to be built and uploaded to [rubygems.org](https://rubygems.org/gems/hermann).
Here's the current process that [I](https://github.com/rtyler) use:
* `rvm use ruby@rubygems` (*switch to MRI*)
* `bundle install && rake` (*ensure that MRI tests pass*)
* `rvm use jruby@rubygems` (*switch to JRuby*)
* `bundle install && rake` (*ensure that the JRuby tests pass*)
* `rake release` (*tag the release and upload the `-java` platform gem*)
* `rvm use ruby@rubygems` (*switch back to MRI*)
* `gem build hermann.gemspec` (*build the 'ruby' platform gem*)
* `gem push pkg/hermann-0.blah.gem` (*upload the ruby platform gem*)
This can certainly be cleaned up, but this is the process at it is right now

View File

@ -3,10 +3,10 @@ require 'fileutils'
require "bundler/gem_tasks"
require 'rspec/core/rake_task'
require 'rake/extensiontask'
require 'ci/reporter/rake/rspec'
Rake::ExtensionTask.new do |t|
t.name = 'hermann_lib'
t.name = 'hermann_rdkafka'
t.ext_dir = 'ext/hermann'
t.gem_spec = Gem::Specification.load('hermann.gemspec')
end

View File

@ -147,7 +147,8 @@ $LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a')
have_header('ruby/thread.h')
have_header('ruby/intern.h')
have_header('ruby/version.h')
have_func('rb_thread_blocking_region')
have_func('rb_thread_call_without_gvl')
create_makefile('hermann/hermann_lib')
create_makefile('hermann/hermann_rdkafka')

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.c - Ruby wrapper for the librdkafka library
* hermann_rdkafka.c - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -31,8 +31,16 @@
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
#include "hermann_lib.h"
#include "hermann_rdkafka.h"
/* This header file exposes the functions in librdkafka.a that are needed for
* consistent partitioning. After librdkafka releases a new tag and Hermann
* points to it, this can be removed. */
#include "rdcrc32.h"
#ifdef HAVE_RUBY_VERSION_H
#include <ruby/version.h>
#endif
/* how long to let librdkafka block on the socket before returning back to the interpreter.
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
@ -120,7 +128,7 @@ static void msg_delivered(rd_kafka_t *rk,
/* call back into our Hermann::Result if it exists, discarding the
* return value
*/
if (NULL != push_ctx->result) {
if (NULL != (void *)push_ctx->result) {
rb_funcall(push_ctx->result,
hermann_result_fulfill_method,
2,
@ -131,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
/* This function is in rdkafka.h on librdkafka master. As soon as a new
* version is released and Hermann points to it, this can be removed. */
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -151,13 +171,10 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
for (; retry < partition_cnt; retry++) {
int32_t partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
if (keylen) {
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
} else {
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
}
}
@ -258,8 +275,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
// Yield the data to the Consumer's block
if (rb_block_given_p()) {
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
rb_yield(value);
VALUE key, data, offset;
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
offset = rb_ll2inum(rkmessage->offset);
if ( rkmessage->key_len > 0 ) {
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
} else {
key = Qnil;
}
rd_kafka_message_destroy(rkmessage);
rb_yield_values(3, data, key, offset);
}
else {
if (DEBUG) {
@ -388,15 +416,19 @@ static void *consumer_recv_msg(void *ptr)
* after every message, to see if the ruby interpreter wants us to exit the
* loop.
*
* @param HermannInstanceConfig* The hermann configuration for this consumer
* @param self The consumer instance
*/
static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
static VALUE consumer_consume_loop(VALUE self) {
HermannInstanceConfig* consumerConfig;
rd_kafka_message_t *msg;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
TRACER("\n");
while (consumerConfig->run) {
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
@ -412,13 +444,28 @@ static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
if ( msg ) {
msg_consume(msg, consumerConfig);
rd_kafka_message_destroy(msg);
}
}
return Qnil;
}
/**
* consumer_consume_loop_stop
*
* called when we're done with the .consume() loop. lets rdkafa cleanup some internal structures
*/
static VALUE consumer_consume_loop_stop(VALUE self) {
HermannInstanceConfig* consumerConfig;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
return Qnil;
}
/**
* Hermann::Lib::Consumer.consume
* Hermann::Provider::RDKafka::Consumer.consume
*
* @param VALUE self the Ruby object for this consumer
* @param VALUE topic the Ruby string representing a topic to consume
@ -446,17 +493,12 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
rb_raise(rb_eRuntimeError,
rb_raise(rb_eRuntimeError, "%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return Qnil;
}
consumer_consume_loop(consumerConfig);
/* Stop consuming */
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
return Qnil;
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}
@ -558,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param key VALUE the ruby String containing the key to partition by
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
@ -569,13 +612,14 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = NULL;
delivery_ctx->result = (VALUE) NULL;
TRACER("producerConfig: %p\n", producerConfig);
@ -591,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
@ -614,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
@ -666,19 +716,56 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, conf->error);
rb_raise(rb_eStandardError, "%s", conf->error);
}
return rb_int_new(events);
}
/*
* producer_metadata_request_nogvl
*
* call rd_kafka_metadata without the GVL held. Note that rd_kafka_metadata is not interruptible,
* so in case of interrupt the thread will not respond until timeout_ms is reached.
*
* rd_kafka_metadata will fill in the ctx->data pointer on success
*
* @param ptr void* the hermann_metadata_ctx_t
*/
static void *producer_metadata_request_nogvl(void *ptr)
{
hermann_metadata_ctx_t *ctx = (hermann_metadata_ctx_t*)ptr;
return (void *) rd_kafka_metadata(ctx->rk,
ctx->topic ? 0 : 1,
ctx->topic,
(const struct rd_kafka_metadata **) &(ctx->data),
ctx->timeout_ms);
}
static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
{
int err;
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
NULL, NULL);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
err = (int) rb_thread_call_without_gvl(producer_metadata_request_nogvl, ctx, NULL, NULL);
#else
err = (int) producer_metadata_request_nogvl(ctx);
#endif
return err;
}
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
int timeout_ms = rb_num2int(timeout);
struct rd_kafka_metadata *data = NULL;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
@ -686,17 +773,19 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producer_init_kafka(self, producerConfig);
}
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
&data,
timeout_ms);
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.data = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,
data->topic_cnt);
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
@ -704,11 +793,118 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
if ( md_context.data )
rd_kafka_metadata_destroy(md_context.data);
return result;
}
/*
* producer_metadata_make_hash
*
* transform the rd_kafka_metadata structure into a ruby hash. eg:
* { :brokers => [ {:id=>0, :host=>"172.20.10.3", :port=>9092} ],
* :topics => { "maxwell" => [ {:id=>0, :leader_id=>0, :replica_ids=>[0], :isr_ids=>[0]}]} }
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata_make_hash(struct rd_kafka_metadata *data)
{
int i, j, k;
VALUE broker_hash, topic_hash, partition_ary, partition_hash, partition_replica_ary, partition_isr_ary;
VALUE hash = rb_hash_new();
VALUE brokers = rb_ary_new2(data->broker_cnt);
VALUE topics = rb_hash_new();
for ( i = 0; i < data->broker_cnt; i++ ) {
broker_hash = rb_hash_new();
rb_hash_aset(broker_hash, ID2SYM(rb_intern("id")), INT2FIX(data->brokers[i].id));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("host")), rb_str_new2(data->brokers[i].host));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("port")), INT2FIX(data->brokers[i].port));
rb_ary_push(brokers, broker_hash);
}
for ( i = 0; i < data->topic_cnt; i++ ) {
partition_ary = rb_ary_new2(data->topics[i].partition_cnt);
for ( j = 0 ; j < data->topics[i].partition_cnt ; j++ ) {
VALUE partition_hash = rb_hash_new();
rd_kafka_metadata_partition_t *partition = &(data->topics[i].partitions[j]);
/* id => 1, leader_id => 0 */
rb_hash_aset(partition_hash, ID2SYM(rb_intern("id")), INT2FIX(partition->id));
rb_hash_aset(partition_hash, ID2SYM(rb_intern("leader_id")), INT2FIX(partition->leader));
/* replica_ids => [1, 0] */
partition_replica_ary = rb_ary_new2(partition->replica_cnt);
for ( k = 0 ; k < partition->replica_cnt ; k++ ) {
rb_ary_push(partition_replica_ary, INT2FIX(partition->replicas[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("replica_ids")), partition_replica_ary);
/* isr_ids => [1, 0] */
partition_isr_ary = rb_ary_new2(partition->isr_cnt);
for ( k = 0 ; k < partition->isr_cnt ; k++ ) {
rb_ary_push(partition_isr_ary, INT2FIX(partition->isrs[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("isr_ids")), partition_isr_ary);
rb_ary_push(partition_ary, partition_hash);
}
rb_hash_aset(topics, rb_str_new2(data->topics[i].topic), partition_ary);
}
rb_hash_aset(hash, ID2SYM(rb_intern("brokers")), brokers);
rb_hash_aset(hash, ID2SYM(rb_intern("topics")), topics);
return hash;
}
/*
* producer_metadata
*
* make a metadata request to the kafka server, returning a hash
* containing a list of brokers and topics.
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
@ -1023,21 +1219,21 @@ static VALUE producer_init_copy(VALUE copy,
}
/**
* Init_hermann_lib
* Init_hermann_rdkafka
*
* Called by Ruby when the Hermann gem is loaded.
* Defines the Hermann module.
* Defines the Producer and Consumer classes.
*/
void Init_hermann_lib() {
VALUE lib_module, c_consumer, c_producer;
void Init_hermann_rdkafka() {
VALUE lib_module, provider_module, c_consumer, c_producer;
TRACER("setting up Hermann::Lib\n");
TRACER("setting up Hermann::Provider::RDKafka\n");
/* Define the module */
hermann_module = rb_define_module("Hermann");
lib_module = rb_define_module_under(hermann_module, "Lib");
provider_module = rb_define_module_under(hermann_module, "Provider");
lib_module = rb_define_module_under(provider_module, "RDKafka");
/* ---- Define the consumer class ---- */
c_consumer = rb_define_class_under(lib_module, "Consumer", rb_cObject);
@ -1063,7 +1259,7 @@ void Init_hermann_lib() {
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 3);
rb_define_method(c_producer, "push_single", producer_push_single, 4);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);
@ -1076,4 +1272,7 @@ void Init_hermann_lib() {
/* Producer.connect */
rb_define_method(c_producer, "connect", producer_connect, 1);
/* Producer.metadata */
rb_define_method(c_producer, "metadata", producer_metadata, 2);
}

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.h - Ruby wrapper for the librdkafka library
* hermann_rdkafka.h - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -107,10 +107,17 @@ typedef struct HermannInstanceConfig {
typedef HermannInstanceConfig hermann_conf_t;
typedef struct {
/* Hermann::Lib::Producer */
/* Hermann::Provider::RDKafka::Producer */
hermann_conf_t *producer;
/* Hermann::Result */
VALUE result;
} hermann_push_ctx_t;
typedef struct {
rd_kafka_t *rk;
rd_kafka_topic_t *topic;
struct rd_kafka_metadata *data;
int timeout_ms;
} hermann_metadata_ctx_t;
#endif

103
ext/hermann/rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

View File

@ -16,7 +16,7 @@ Gem::Specification.new do |s|
s.homepage = 'https://github.com/lookout/Hermann'
s.licenses = ['MIT']
s.files = [ "Rakefile"]
s.files = ['Rakefile']
s.files += `git ls-files -- lib`.split($\)
s.files += `git ls-files -- ext`.split($\)
@ -28,15 +28,16 @@ Gem::Specification.new do |s|
s.add_dependency 'thread_safe', '~> 0.3.4'
if RUBY_PLATFORM == "java"
s.add_dependency 'concurrent-ruby', '~> 0.7.0'
s.files << 'lib/hermann_jars.rb'
s.add_dependency 'concurrent-ruby', '~> 1.0.0'
# IMPORTANT: make sure that jar-dependencies is only a development
# dependency of your gem. if it is a runtime dependencies the require_jars
# file will be overwritten during installation.
s.add_dependency 'jar-dependencies', '~>0.1.9'
s.requirements << "jar org.apache.kafka:kafka_2.10, ~>0.8.1.1, ['junit:junit']"
s.add_dependency 'jar-dependencies', ['~> 0.1', '>= 0.1.10']
s.requirements << "jar org.apache.kafka:kafka_2.11, ~> 0.8.2.2"
# use log4j-1.2.16+ to as 1.2.15 declares deps which are not in maven central and causes the dep resolution to fail
s.requirements << "jar log4j:log4j, ~>1.2.16"
s.requirements << "jar log4j:log4j, ~> 1.2.16"
s.require_paths = ["lib"]
s.platform = 'java'
else

View File

@ -4,7 +4,7 @@ require 'hermann/errors'
if Hermann.jruby?
require 'hermann/provider/java_simple_consumer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
@ -37,7 +37,7 @@ module Hermann
else
brokers, partition = require_values_at(opts, :brokers, :partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset)
@internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset)
end
end

View File

@ -0,0 +1,77 @@
require 'hermann_rdkafka'
require 'hermann/consumer'
module Hermann
module Discovery
class Metadata
Broker = Struct.new(:id, :host, :port) do
def to_s
"#{host}:#{port}"
end
end
Topic = Struct.new(:name, :partitions)
Partition = Struct.new(:id, :leader, :replicas, :insync_replicas, :topic_name) do
def consumer(offset=:end)
Hermann::Consumer.new(topic_name, brokers: ([leader] + replicas).join(','), partition: id, offset: offset)
end
end
DEFAULT_TIMEOUT_MS = 2_000
def initialize(brokers, options = {})
raise "this is an MRI api only!" if Hermann.jruby?
@internal = Hermann::Provider::RDKafka::Producer.new(brokers)
@timeout = options[:timeout] || DEFAULT_TIMEOUT_MS
end
#
# @internal.metadata returns:
# {:brokers => [{:id=>3, :host=>"kafka3.alpha4.sac1.zdsys.com", :port=>9092}],
# :topics => {"testtopic"=>[{:id=>0, :leader_id=>3, :replica_ids=>[3, 1], :isr_ids=>[3, 1]}}}
#
def brokers
brokers_from_metadata(@internal.metadata(nil, @timeout))
end
def topic(t)
get_topics(t)[t]
end
def topics
get_topics
end
private
def get_topics(filter_topics = nil)
md = @internal.metadata(filter_topics, @timeout)
broker_hash = brokers_from_metadata(md).inject({}) do |h, broker|
h[broker.id] = broker
h
end
md[:topics].inject({}) do |topic_hash, arr|
topic_name, raw_partitions = *arr
partitions = raw_partitions.map do |p|
leader = broker_hash[p[:leader_id]]
all_replicas = p[:replica_ids].map { |i| broker_hash[i] }
isr_replicas = p[:isr_ids].map { |i| broker_hash[i] }
Partition.new(p[:id], leader, all_replicas, isr_replicas, topic_name)
end
topic_hash[topic_name] = Topic.new(topic_name, partitions)
topic_hash
end
end
def brokers_from_metadata(md)
md[:brokers].map do |h|
Broker.new(h[:id], h[:host], h[:port])
end
end
end
end
end

View File

@ -6,7 +6,7 @@ require 'hermann/result'
if RUBY_PLATFORM == "java"
require 'hermann/provider/java_producer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
@ -23,7 +23,7 @@ module Hermann
if Hermann.jruby?
@internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts)
else
@internal = Hermann::Lib::Producer.new(brokers.join(','))
@internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(','))
end
# We're tracking children so we can make sure that at Producer exit we
# make a reasonable attempt to clean up outstanding result objects
@ -63,8 +63,7 @@ module Hermann
end
if Hermann.jruby?
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
result = @internal.push_single(value, topic, opts[:partition_key], nil)
unless result.nil?
@children << result
end
@ -76,7 +75,7 @@ module Hermann
# librdkafka callback queue overflow
tick_reactor
result = create_result
@internal.push_single(value, topic, result)
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
end
return result
@ -103,7 +102,8 @@ module Hermann
@children.each do |child|
# Skip over any children that should already be reaped for other
# reasons
next if child.completed?
next if (Hermann.jruby? ? child.fulfilled? : child.completed?)
# Propagate errors to the remaining children
child.internal_set_error(ex)
end
@ -119,7 +119,7 @@ module Hermann
# Filter all children who are no longer pending/fulfilled
total_children = @children.size
@children = @children.reject { |c| c.completed? }
@children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }
return (total_children - children.size)
end

View File

@ -43,7 +43,8 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, key)
def push_single(msg, topic, key, _)
key = key && key.to_java
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin

View File

@ -1,3 +1,3 @@
module Hermann
VERSION = '0.24.0'
VERSION = '0.26.1'
end

View File

@ -1,13 +0,0 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
require_jar( 'jline', 'jline', '0.9.94' )
require_jar( 'com.101tec', 'zkclient', '0.3' )
require_jar( 'log4j', 'log4j', '1.2.17' )
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )

15
scripts/metadata_mri.rb Normal file
View File

@ -0,0 +1,15 @@
require 'bundler/setup'
require 'hermann'
require 'hermann/discovery/metadata'
require 'hermann/consumer'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
c.topic("maxwell")
puts c.topic("maxwell").inspect
puts c.brokers.inspect
consumer = Hermann::Consumer.new("maxwell", brokers: "localhost:9092, localhost:9092", partition: c.topic('maxwell').partitions.first.id, offset: :start)
consumer.consume do |c|
puts c
end

View File

@ -1,13 +1,13 @@
require 'spec_helper'
describe 'Hermann::Lib::Producer', :platform => :mri do
describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
before :all do
require 'hermann_lib'
require 'hermann_rdkafka'
end
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
subject(:producer) { Hermann::Provider::RDKafka::Producer.new(brokers) }
let(:timeout) { 3000 }
it { should respond_to :push_single }
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, 'test-topic', nil)
producer.push_single(example.full_description, 'test-topic', '', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, topic, nil) }
subject(:push) { producer.push_single(message, topic, '', nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', topic, nil)
producer.push_single('hello', topic, '', nil)
end
it 'should return successfully' do

View File

@ -11,10 +11,11 @@ describe Hermann::Producer do
describe '#initialize' do
context 'with C ruby', :platform => :mri do
it 'joins broker array' do
expect(Hermann::Lib::Producer).to receive(:new).with(brokers.first)
expect(Hermann::Provider::RDKafka::Producer).to receive(:new).with(brokers.first)
expect(producer).to be_a Hermann::Producer
end
end
context 'with Java', :platform => :java do
it 'joins broker array' do
expect(Hermann::Provider::JavaProducer).to receive(:new).with(brokers.first, opts)
@ -42,34 +43,34 @@ describe Hermann::Producer do
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
producer.push(msg, :topic => passed_topic)
end
context 'and an array of messags' do
it 'should propagate the topic' do
messages = 3.times.map { |i| msg }
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
producer.push(messages, :topic => passed_topic)
end
end
end
context 'with explicit partition key', :platform => :java do
context 'with explicit partition key' do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
producer.push(msg, :partition_key => partition_key)
end
end
@ -177,7 +178,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
end
expect(result).to be_instance_of Array
@ -190,7 +191,7 @@ describe Hermann::Producer do
describe '#tick_reactor' do
let(:timeout) { 0 }
let(:internal) { double('Hermann::Lib::Producer mock') }
let(:internal) { double('Hermann::Provider::RDKafka::Producer mock') }
subject(:tick) { producer.tick_reactor(timeout) }
before :each do

View File

@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
let(:part_key) { "key".to_java }
let(:part_key) { "key" }
let(:msg) { "bar" }
describe '#push_single' do
subject(:result) { producer.push_single(msg, topic, nil) }
subject(:result) { producer.push_single(msg, topic, nil, nil) }
let(:passed_topic) { 'foo' }
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
producer.push_single(msg, passed_topic, nil).wait(1)
producer.push_single(msg, passed_topic, nil, nil).wait(1)
end
it 'can change partition key' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
producer.push_single(msg, passed_topic, part_key).wait(1)
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
end
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec', topic, nil).wait(1)
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
expect(promise).to be_rejected
expect { promise.value! }.to raise_error
end

View File

@ -2,6 +2,13 @@ require 'rubygems'
require 'yaml'
require 'rspec'
require 'simplecov'
require 'simplecov-rcov'
SimpleCov.start do
formatter = SimpleCov::Formatter::RcovFormatter
end
# Add ext/ to the load path so we can load `hermann_lib`
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../lib/'))