Compare commits

...

44 Commits

Author SHA1 Message Date
R. Tyler Croy d822ae541a Merge pull request #138 from bitwiseman/patch-3
Locked rake version to ~11.3.0
2016-12-13 17:24:47 -08:00
Liam Newman f6f71c98e4 Locked rake version to ~11.3.0
Rake 12.0 replace "last_comment" with "last_description".  

This describes the issue and possible fix:
http://stackoverflow.com/questions/35893584/nomethoderror-undefined-method-last-comment-after-upgrading-to-rake-11

Rather than upgrade rspec to 3.5.x and risk changes to test behavior, I've locked rake to the last version before 12.0.
2016-12-13 15:53:55 -08:00
R. Tyler Croy f9957b5629 Merge pull request #137 from bitwiseman/patch-2
Updated for latest Pipeline fixes
2016-11-17 13:56:48 -08:00
Liam Newman 35eba84dc6 Updated for latest Pipeline fixes
JENKINS-29711 was fixed and stage step now uses curly braces
2016-11-17 13:53:09 -08:00
R. Tyler Croy 68515effc7 Merge pull request #136 from bitwiseman/feature/simplepipe
Simplify Pipeline
2016-07-06 17:52:11 -07:00
Liam Newman 69af2e310f Simplify Pipeline 2016-07-06 17:01:35 -07:00
R. Tyler Croy 9473c8b24b Merge pull request #135 from bitwiseman/feature/jenkinsfile
Add Jenkins pipeline
2016-06-30 10:15:37 -07:00
Liam Newman e0225b6761 Add Jenkins pipeline 2016-06-29 17:36:30 -07:00
R. Tyler Croy a4d678deaf Merge pull request #134 from bitwiseman/patch-1
Update README.md
2016-06-17 13:12:51 -07:00
Liam Newman 5dd952145b Update README.md
Add simple build and unit test instructions.   Similar to what is in releasing document.
2016-06-17 13:09:26 -07:00
R. Tyler Croy 9f5cf12151
No sense supporting ree in any capacity any more 2016-05-29 18:32:37 -07:00
R. Tyler Croy f71ab11ea2 Merge pull request #132 from thedrow/patch-1
Added Ruby 2.2 and 2.3 to the matrix
2016-05-29 18:29:58 -07:00
R. Tyler Croy 436ec9be82 Merge pull request #133 from jfeltesse-mdsol/concurrent_ruby
depend on concurrent-ruby ~> 1.0.0
2016-02-24 08:29:11 -08:00
Julien Feltesse 587f3be16e depend on concurrent-ruby ~> 1.0.0 2016-02-24 18:19:06 +09:00
Omer Katz c2b2274af3 Specify 2.3.0 instead of 2.3 for travis. 2016-02-04 17:06:39 +02:00
Omer Katz c8fe7e06b5 Added Ruby 2.2 and 2.3 to the matrix. 2016-02-04 14:17:55 +02:00
R. Tyler Croy 246f27e76d
Add some code coverage while we're here 2016-02-03 05:32:57 -08:00
R. Tyler Croy bc560f21ea
Add support for generating JUnit-compatible output for Jenkins 2016-02-03 05:09:30 -08:00
R. Tyler Croy 9f8fbe381c Merge pull request #128 from rtyler/minor-doc-fixes
Minor doc fixes
2015-09-30 09:51:42 -07:00
R. Tyler Croy 5b669504af
Document the current janky release process
At least this way I won't forget each time

Fixes #126
2015-09-30 09:45:20 -07:00
R. Tyler Croy d4a350c65c
Update the gitter and travis badges after repo move 2015-09-30 09:39:10 -07:00
R. Tyler Croy 17b51359a9 Merge pull request #127 from rtyler/upgrade-scala-dependency
Upgrade scala dependency
2015-09-28 11:18:33 -07:00
R. Tyler Croy 0ad9be88f0
Upgrade bundler for our Travis build 2015-09-28 11:12:33 -07:00
R. Tyler Croy 8422e82125
Move hermann to the newer faster travis infra 2015-09-28 10:25:43 -07:00
R. Tyler Croy 1c7543f730
Upgrade to a Kafka (JVM) version which uses Scala 2.11
Scala 2.10 is three years old and it appears to me tha 2.10 is moderately
broken and old, at that 2.11 is the bare minimum folks should be using these
days (see also: https://github.com/twitter/scrooge/pull/198)
2015-09-28 08:43:50 -07:00
R. Tyler Croy 9d5b773542
Properly include hermann_jars.rb in the built gem but exclude from the tree 2015-09-23 13:19:40 -07:00
R. Tyler Croy ecae27dd65 Merge pull request #125 from rtyler/no-fixed-jars
Remove hard-coded hermann_jars.rb
2015-09-23 08:08:24 -07:00
R. Tyler Croy e9d301be3d
Remove hermann_jars.rb to gem installation to generate it properly
Fixes #124
2015-09-23 07:52:21 -07:00
R. Tyler Croy 4dd732dc63
Bump the version for a minor release 2015-09-22 12:08:28 -07:00
R. Tyler Croy 5c20c6c5ba
Another minor version bump to clean our dependency range up 2015-09-22 12:00:04 -07:00
R. Tyler Croy 9f1e16071f
Bump the minor for some new functionality, at least on MRI 2015-09-22 12:00:04 -07:00
R. Tyler Croy 74cb8656e1 Merge pull request #121 from braintree/stable-partition
Add support for passing partition_key in MRI
2015-09-22 11:59:32 -07:00
jakesandlund 6946d4d82c Fix java_producer_spec for modified push_single signature 2015-09-22 18:36:10 +00:00
jakesandlund c64d38cff3 Comment that rdcrc32.h and rd_kafka_msg_partitioner_consistent can be removed when librdkafka tags and Hermann updates 2015-09-16 16:04:35 +00:00
jakesandlund c29bb5e4d0 Move to_java into java_producer to make push_single signature match MRI 2015-09-16 15:54:42 +00:00
cory and jakesandlund e8703e1df4 Add support for passing partition_key in MRI 2015-09-14 16:11:43 +00:00
R. Tyler Croy cd58cb33cd Merge pull request #120 from mkristian/patch-1
use semantic versioning for jar-dependency runtime dependency
2015-09-08 07:31:58 -07:00
Christian Meier 763e2cce97 use semantic versioning for jar-dependency runtime dependency 2015-09-08 09:26:09 +02:00
R. Tyler Croy e276f60b27 Merge pull request #114 from zendesk/yield_key_and_offset
yield key and offset into Consumer#consume block
2015-07-01 04:49:18 -07:00
Ben Osheroff 2c99af440e surround if with braces 2015-06-30 10:51:19 -07:00
Ben Osheroff 5b8dd6feef yield key and offset into Consumer#consume block 2015-06-30 10:49:37 -07:00
R. Tyler Croy 60bc473fdd Merge pull request #118 from zendesk/namespace
REFACTOR ONLY: Namespace cleanup
2015-06-27 11:04:23 -07:00
Ben Osheroff 9edc4b9301 move hermann_lib -> hermann_rdkafka / Hermann::Provider::RDKafka 2015-06-23 09:01:31 -07:00
Ben Osheroff a9d80242dd rename hermann_lib -> hermann_rdkafka 2015-06-22 19:39:56 -07:00
22 changed files with 288 additions and 87 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ tmp/
Gemfile.lock
Jarfile.lock
.jbundler/
lib/hermann_jars.rb
coverage/
spec/reports

View File

@ -1,10 +1,17 @@
language: ruby
sudo: false
rvm:
- ruby-head
- 2.3.0
- 2.2
- 2.1
- 1.9.3
- jruby
- ree
# upgrading bundler since our gemspec relying on lib/hermann_jars.rb being
# present causes troubles with bundler .17.x
before_install:
- gem install bundler -N --version ">= 1.8.0"
#deploy:
# provider: rubygems
# gemspec: hermann.gemspec

View File

@ -4,7 +4,7 @@ gemspec
group :development do
gem 'jbundler', :platform => :jruby
gem 'rake'
gem 'rake', '~> 11.3.0'
gem 'i18n', '~> 0.6.11', :platform => :mri_18
gem 'activesupport', '~> 3.x', :platform => :mri_18
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
@ -23,4 +23,8 @@ group :test do
# Used for testing encoding protobufs in an out of Hermann in integration
# tests
gem 'protobuffy'
gem 'ci_reporter_rspec'
gem 'simplecov'
gem 'simplecov-rcov'
end

33
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,33 @@
#!groovy
/* Only keep the 10 most recent builds. */
properties([[$class: 'BuildDiscarderProperty',
strategy: [$class: 'LogRotator', numToKeepStr: '10']]])
stage ('Build') {
node {
// Checkout
checkout scm
// install required bundles
sh 'bundle install'
// build and run tests with coverage
sh 'bundle exec rake build spec'
// Archive the built artifacts
archive (includes: 'pkg/*.gem')
// publish html
publishHTML ([
allowMissing: false,
alwaysLinkToLastBuild: false,
keepAll: true,
reportDir: 'coverage',
reportFiles: 'index.html',
reportName: "RCov Report"
])
}
}

View File

@ -1,6 +1,6 @@
# Hermann
[![Gitter chat](https://badges.gitter.im/lookout/Hermann.png)](https://gitter.im/lookout/Hermann) [![Build Status](https://travis-ci.org/lookout/hermann.svg?branch=master)](https://travis-ci.org/lookout/hermann)
[![Gitter chat](https://badges.gitter.im/reiseburo/hermann.png)](https://gitter.im/reiseburo/hermann) [![Build Status](https://travis-ci.org/reiseburo/hermann.svg?branch=master)](https://travis-ci.org/reiseburo/hermann)
A Ruby gem implementing a Kafka Publisher and Consumer
@ -90,8 +90,8 @@ new_topic = 'other_topic'
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
puts "Recv: #{msg}"
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
end
```
@ -114,6 +114,14 @@ end
```
#### Build & Unit Test
First time (from a clean repository):
`bundle install && bundle exec rake`
Thereafter:
`bundle exec rake spec`
#### Testing
To run the integration tests:

17
RELEASING.md Normal file
View File

@ -0,0 +1,17 @@
# Releasing Hermann
Hermann is a multi-platform gem, which means that two actual `.gem` files need
to be built and uploaded to [rubygems.org](https://rubygems.org/gems/hermann).
Here's the current process that [I](https://github.com/rtyler) use:
* `rvm use ruby@rubygems` (*switch to MRI*)
* `bundle install && rake` (*ensure that MRI tests pass*)
* `rvm use jruby@rubygems` (*switch to JRuby*)
* `bundle install && rake` (*ensure that the JRuby tests pass*)
* `rake release` (*tag the release and upload the `-java` platform gem*)
* `rvm use ruby@rubygems` (*switch back to MRI*)
* `gem build hermann.gemspec` (*build the 'ruby' platform gem*)
* `gem push pkg/hermann-0.blah.gem` (*upload the ruby platform gem*)
This can certainly be cleaned up, but this is the process at it is right now

View File

@ -3,10 +3,10 @@ require 'fileutils'
require "bundler/gem_tasks"
require 'rspec/core/rake_task'
require 'rake/extensiontask'
require 'ci/reporter/rake/rspec'
Rake::ExtensionTask.new do |t|
t.name = 'hermann_lib'
t.name = 'hermann_rdkafka'
t.ext_dir = 'ext/hermann'
t.gem_spec = Gem::Specification.load('hermann.gemspec')
end

View File

@ -151,4 +151,4 @@ have_header('ruby/version.h')
have_func('rb_thread_blocking_region')
have_func('rb_thread_call_without_gvl')
create_makefile('hermann/hermann_lib')
create_makefile('hermann/hermann_rdkafka')

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.c - Ruby wrapper for the librdkafka library
* hermann_rdkafka.c - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -31,7 +31,12 @@
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
#include "hermann_lib.h"
#include "hermann_rdkafka.h"
/* This header file exposes the functions in librdkafka.a that are needed for
* consistent partitioning. After librdkafka releases a new tag and Hermann
* points to it, this can be removed. */
#include "rdcrc32.h"
#ifdef HAVE_RUBY_VERSION_H
#include <ruby/version.h>
@ -134,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
/* This function is in rdkafka.h on librdkafka master. As soon as a new
* version is released and Hermann points to it, this can be removed. */
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -154,17 +171,11 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
int32_t partition = RD_KAFKA_PARTITION_UA;
for (; retry < partition_cnt; retry++) {
partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
if (keylen) {
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
} else {
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
}
return partition;
}
/**
@ -264,9 +275,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
// Yield the data to the Consumer's block
if (rb_block_given_p()) {
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
VALUE key, data, offset;
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
offset = rb_ll2inum(rkmessage->offset);
if ( rkmessage->key_len > 0 ) {
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
} else {
key = Qnil;
}
rd_kafka_message_destroy(rkmessage);
rb_yield(value);
rb_yield_values(3, data, key, offset);
}
else {
if (DEBUG) {
@ -444,7 +465,7 @@ static VALUE consumer_consume_loop_stop(VALUE self) {
}
/**
* Hermann::Lib::Consumer.consume
* Hermann::Provider::RDKafka::Consumer.consume
*
* @param VALUE self the Ruby object for this consumer
* @param VALUE topic the Ruby string representing a topic to consume
@ -579,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
* @param message VALUE the ruby String containing the outgoing message.
* @param topic VALUE the ruby String containing the topic to use for the
* outgoing message.
* @param key VALUE the ruby String containing the key to partition by
* @param result VALUE the Hermann::Result object to be fulfilled when the
* push completes
*/
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
@ -590,6 +612,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
rd_kafka_topic_conf_t *rkt_conf = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
@ -612,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
TRACER("kafka initialized\n");
/* Topic configuration */
rkt_conf = rd_kafka_topic_conf_new();
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
rkt_conf);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
@ -635,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
RSTRING_PTR(partition_key),
RSTRING_LEN(partition_key),
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
@ -1190,21 +1219,21 @@ static VALUE producer_init_copy(VALUE copy,
}
/**
* Init_hermann_lib
* Init_hermann_rdkafka
*
* Called by Ruby when the Hermann gem is loaded.
* Defines the Hermann module.
* Defines the Producer and Consumer classes.
*/
void Init_hermann_lib() {
VALUE lib_module, c_consumer, c_producer;
void Init_hermann_rdkafka() {
VALUE lib_module, provider_module, c_consumer, c_producer;
TRACER("setting up Hermann::Lib\n");
TRACER("setting up Hermann::Provider::RDKafka\n");
/* Define the module */
hermann_module = rb_define_module("Hermann");
lib_module = rb_define_module_under(hermann_module, "Lib");
provider_module = rb_define_module_under(hermann_module, "Provider");
lib_module = rb_define_module_under(provider_module, "RDKafka");
/* ---- Define the consumer class ---- */
c_consumer = rb_define_class_under(lib_module, "Consumer", rb_cObject);
@ -1230,7 +1259,7 @@ void Init_hermann_lib() {
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
/* Producer.push_single(msg) */
rb_define_method(c_producer, "push_single", producer_push_single, 3);
rb_define_method(c_producer, "push_single", producer_push_single, 4);
/* Producer.tick */
rb_define_method(c_producer, "tick", producer_tick, 1);

View File

@ -1,5 +1,5 @@
/*
* hermann_lib.h - Ruby wrapper for the librdkafka library
* hermann_rdkafka.h - Ruby wrapper for the librdkafka library
*
* Copyright (c) 2014 Stan Campbell
* Copyright (c) 2014 Lookout, Inc.
@ -107,7 +107,7 @@ typedef struct HermannInstanceConfig {
typedef HermannInstanceConfig hermann_conf_t;
typedef struct {
/* Hermann::Lib::Producer */
/* Hermann::Provider::RDKafka::Producer */
hermann_conf_t *producer;
/* Hermann::Result */
VALUE result;

103
ext/hermann/rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

View File

@ -16,7 +16,7 @@ Gem::Specification.new do |s|
s.homepage = 'https://github.com/lookout/Hermann'
s.licenses = ['MIT']
s.files = [ "Rakefile"]
s.files = ['Rakefile']
s.files += `git ls-files -- lib`.split($\)
s.files += `git ls-files -- ext`.split($\)
@ -28,15 +28,16 @@ Gem::Specification.new do |s|
s.add_dependency 'thread_safe', '~> 0.3.4'
if RUBY_PLATFORM == "java"
s.add_dependency 'concurrent-ruby', '~> 0.7.0'
s.files << 'lib/hermann_jars.rb'
s.add_dependency 'concurrent-ruby', '~> 1.0.0'
# IMPORTANT: make sure that jar-dependencies is only a development
# dependency of your gem. if it is a runtime dependencies the require_jars
# file will be overwritten during installation.
s.add_dependency 'jar-dependencies', '~> 0.1.10'
s.requirements << "jar org.apache.kafka:kafka_2.10, ~>0.8.1.1"
s.add_dependency 'jar-dependencies', ['~> 0.1', '>= 0.1.10']
s.requirements << "jar org.apache.kafka:kafka_2.11, ~> 0.8.2.2"
# use log4j-1.2.16+ to as 1.2.15 declares deps which are not in maven central and causes the dep resolution to fail
s.requirements << "jar log4j:log4j, ~>1.2.16"
s.requirements << "jar log4j:log4j, ~> 1.2.16"
s.require_paths = ["lib"]
s.platform = 'java'
else

View File

@ -4,7 +4,7 @@ require 'hermann/errors'
if Hermann.jruby?
require 'hermann/provider/java_simple_consumer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
@ -37,7 +37,7 @@ module Hermann
else
brokers, partition = require_values_at(opts, :brokers, :partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset)
@internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset)
end
end

View File

@ -1,4 +1,4 @@
require 'hermann_lib'
require 'hermann_rdkafka'
require 'hermann/consumer'
module Hermann
@ -20,7 +20,7 @@ module Hermann
DEFAULT_TIMEOUT_MS = 2_000
def initialize(brokers, options = {})
raise "this is an MRI api only!" if Hermann.jruby?
@internal = Hermann::Lib::Producer.new(brokers)
@internal = Hermann::Provider::RDKafka::Producer.new(brokers)
@timeout = options[:timeout] || DEFAULT_TIMEOUT_MS
end

View File

@ -6,7 +6,7 @@ require 'hermann/result'
if RUBY_PLATFORM == "java"
require 'hermann/provider/java_producer'
else
require 'hermann_lib'
require 'hermann_rdkafka'
end
module Hermann
@ -23,7 +23,7 @@ module Hermann
if Hermann.jruby?
@internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts)
else
@internal = Hermann::Lib::Producer.new(brokers.join(','))
@internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(','))
end
# We're tracking children so we can make sure that at Producer exit we
# make a reasonable attempt to clean up outstanding result objects
@ -63,8 +63,7 @@ module Hermann
end
if Hermann.jruby?
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
result = @internal.push_single(value, topic, opts[:partition_key], nil)
unless result.nil?
@children << result
end
@ -76,7 +75,7 @@ module Hermann
# librdkafka callback queue overflow
tick_reactor
result = create_result
@internal.push_single(value, topic, result)
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
end
return result
@ -103,7 +102,8 @@ module Hermann
@children.each do |child|
# Skip over any children that should already be reaped for other
# reasons
next if child.completed?
next if (Hermann.jruby? ? child.fulfilled? : child.completed?)
# Propagate errors to the remaining children
child.internal_set_error(ex)
end
@ -119,7 +119,7 @@ module Hermann
# Filter all children who are no longer pending/fulfilled
total_children = @children.size
@children = @children.reject { |c| c.completed? }
@children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }
return (total_children - children.size)
end

View File

@ -43,7 +43,8 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, key)
def push_single(msg, topic, key, _)
key = key && key.to_java
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin

View File

@ -1,3 +1,3 @@
module Hermann
VERSION = '0.24.1'
VERSION = '0.26.1'
end

View File

@ -1,13 +0,0 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
require_jar( 'jline', 'jline', '0.9.94' )
require_jar( 'com.101tec', 'zkclient', '0.3' )
require_jar( 'log4j', 'log4j', '1.2.17' )
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )

View File

@ -1,13 +1,13 @@
require 'spec_helper'
describe 'Hermann::Lib::Producer', :platform => :mri do
describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
before :all do
require 'hermann_lib'
require 'hermann_rdkafka'
end
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
subject(:producer) { Hermann::Provider::RDKafka::Producer.new(brokers) }
let(:timeout) { 3000 }
it { should respond_to :push_single }
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, 'test-topic', nil)
producer.push_single(example.full_description, 'test-topic', '', nil)
begin
producer.tick(timeout)
rescue StandardError => ex
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
describe '#push_single', :type => :integration do
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, topic, nil) }
subject(:push) { producer.push_single(message, topic, '', nil) }
it 'should return' do
expect(push).not_to be_nil
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
context 'with a single queued request' do
before :each do
producer.push_single('hello', topic, nil)
producer.push_single('hello', topic, '', nil)
end
it 'should return successfully' do

View File

@ -11,10 +11,11 @@ describe Hermann::Producer do
describe '#initialize' do
context 'with C ruby', :platform => :mri do
it 'joins broker array' do
expect(Hermann::Lib::Producer).to receive(:new).with(brokers.first)
expect(Hermann::Provider::RDKafka::Producer).to receive(:new).with(brokers.first)
expect(producer).to be_a Hermann::Producer
end
end
context 'with Java', :platform => :java do
it 'joins broker array' do
expect(Hermann::Provider::JavaProducer).to receive(:new).with(brokers.first, opts)
@ -42,34 +43,34 @@ describe Hermann::Producer do
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
producer.push(msg, :topic => passed_topic)
end
context 'and an array of messags' do
it 'should propagate the topic' do
messages = 3.times.map { |i| msg }
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
producer.push(messages, :topic => passed_topic)
end
end
end
context 'with explicit partition key', :platform => :java do
context 'with explicit partition key' do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
producer.push(msg, :partition_key => partition_key)
end
end
@ -177,7 +178,7 @@ describe Hermann::Producer do
it 'should invoke #push_single for each element' do
value.each do |v|
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
end
expect(result).to be_instance_of Array
@ -190,7 +191,7 @@ describe Hermann::Producer do
describe '#tick_reactor' do
let(:timeout) { 0 }
let(:internal) { double('Hermann::Lib::Producer mock') }
let(:internal) { double('Hermann::Provider::RDKafka::Producer mock') }
subject(:tick) { producer.tick_reactor(timeout) }
before :each do

View File

@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
let(:part_key) { "key".to_java }
let(:part_key) { "key" }
let(:msg) { "bar" }
describe '#push_single' do
subject(:result) { producer.push_single(msg, topic, nil) }
subject(:result) { producer.push_single(msg, topic, nil, nil) }
let(:passed_topic) { 'foo' }
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
producer.push_single(msg, passed_topic, nil).wait(1)
producer.push_single(msg, passed_topic, nil, nil).wait(1)
end
it 'can change partition key' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
producer.push_single(msg, passed_topic, part_key).wait(1)
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
end
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec', topic, nil).wait(1)
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
expect(promise).to be_rejected
expect { promise.value! }.to raise_error
end

View File

@ -2,6 +2,13 @@ require 'rubygems'
require 'yaml'
require 'rspec'
require 'simplecov'
require 'simplecov-rcov'
SimpleCov.start do
formatter = SimpleCov::Formatter::RcovFormatter
end
# Add ext/ to the load path so we can load `hermann_lib`
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../lib/'))