mirror of https://github.com/reiseburo/hermann
Compare commits
36 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | d822ae541a | |
Liam Newman | f6f71c98e4 | |
R. Tyler Croy | f9957b5629 | |
Liam Newman | 35eba84dc6 | |
R. Tyler Croy | 68515effc7 | |
Liam Newman | 69af2e310f | |
R. Tyler Croy | 9473c8b24b | |
Liam Newman | e0225b6761 | |
R. Tyler Croy | a4d678deaf | |
Liam Newman | 5dd952145b | |
R. Tyler Croy | 9f5cf12151 | |
R. Tyler Croy | f71ab11ea2 | |
R. Tyler Croy | 436ec9be82 | |
Julien Feltesse | 587f3be16e | |
Omer Katz | c2b2274af3 | |
Omer Katz | c8fe7e06b5 | |
R. Tyler Croy | 246f27e76d | |
R. Tyler Croy | bc560f21ea | |
R. Tyler Croy | 9f8fbe381c | |
R. Tyler Croy | 5b669504af | |
R. Tyler Croy | d4a350c65c | |
R. Tyler Croy | 17b51359a9 | |
R. Tyler Croy | 0ad9be88f0 | |
R. Tyler Croy | 8422e82125 | |
R. Tyler Croy | 1c7543f730 | |
R. Tyler Croy | 9d5b773542 | |
R. Tyler Croy | ecae27dd65 | |
R. Tyler Croy | e9d301be3d | |
R. Tyler Croy | 4dd732dc63 | |
R. Tyler Croy | 5c20c6c5ba | |
R. Tyler Croy | 9f1e16071f | |
R. Tyler Croy | 74cb8656e1 | |
jakesandlund | 6946d4d82c | |
jakesandlund | c64d38cff3 | |
jakesandlund | c29bb5e4d0 | |
cory and jakesandlund | e8703e1df4 |
|
@ -13,3 +13,6 @@ tmp/
|
|||
Gemfile.lock
|
||||
Jarfile.lock
|
||||
.jbundler/
|
||||
lib/hermann_jars.rb
|
||||
coverage/
|
||||
spec/reports
|
||||
|
|
|
@ -1,10 +1,17 @@
|
|||
language: ruby
|
||||
sudo: false
|
||||
rvm:
|
||||
- ruby-head
|
||||
- 2.3.0
|
||||
- 2.2
|
||||
- 2.1
|
||||
- 1.9.3
|
||||
- jruby
|
||||
- ree
|
||||
|
||||
# upgrading bundler since our gemspec relying on lib/hermann_jars.rb being
|
||||
# present causes troubles with bundler .17.x
|
||||
before_install:
|
||||
- gem install bundler -N --version ">= 1.8.0"
|
||||
#deploy:
|
||||
# provider: rubygems
|
||||
# gemspec: hermann.gemspec
|
||||
|
|
6
Gemfile
6
Gemfile
|
@ -4,7 +4,7 @@ gemspec
|
|||
|
||||
group :development do
|
||||
gem 'jbundler', :platform => :jruby
|
||||
gem 'rake'
|
||||
gem 'rake', '~> 11.3.0'
|
||||
gem 'i18n', '~> 0.6.11', :platform => :mri_18
|
||||
gem 'activesupport', '~> 3.x', :platform => :mri_18
|
||||
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
|
||||
|
@ -23,4 +23,8 @@ group :test do
|
|||
# Used for testing encoding protobufs in an out of Hermann in integration
|
||||
# tests
|
||||
gem 'protobuffy'
|
||||
|
||||
gem 'ci_reporter_rspec'
|
||||
gem 'simplecov'
|
||||
gem 'simplecov-rcov'
|
||||
end
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
#!groovy
|
||||
|
||||
/* Only keep the 10 most recent builds. */
|
||||
properties([[$class: 'BuildDiscarderProperty',
|
||||
strategy: [$class: 'LogRotator', numToKeepStr: '10']]])
|
||||
|
||||
stage ('Build') {
|
||||
|
||||
node {
|
||||
// Checkout
|
||||
checkout scm
|
||||
|
||||
// install required bundles
|
||||
sh 'bundle install'
|
||||
|
||||
// build and run tests with coverage
|
||||
sh 'bundle exec rake build spec'
|
||||
|
||||
// Archive the built artifacts
|
||||
archive (includes: 'pkg/*.gem')
|
||||
|
||||
// publish html
|
||||
publishHTML ([
|
||||
allowMissing: false,
|
||||
alwaysLinkToLastBuild: false,
|
||||
keepAll: true,
|
||||
reportDir: 'coverage',
|
||||
reportFiles: 'index.html',
|
||||
reportName: "RCov Report"
|
||||
])
|
||||
|
||||
}
|
||||
}
|
10
README.md
10
README.md
|
@ -1,6 +1,6 @@
|
|||
# Hermann
|
||||
|
||||
[![Gitter chat](https://badges.gitter.im/lookout/Hermann.png)](https://gitter.im/lookout/Hermann) [![Build Status](https://travis-ci.org/lookout/hermann.svg?branch=master)](https://travis-ci.org/lookout/hermann)
|
||||
[![Gitter chat](https://badges.gitter.im/reiseburo/hermann.png)](https://gitter.im/reiseburo/hermann) [![Build Status](https://travis-ci.org/reiseburo/hermann.svg?branch=master)](https://travis-ci.org/reiseburo/hermann)
|
||||
|
||||
A Ruby gem implementing a Kafka Publisher and Consumer
|
||||
|
||||
|
@ -114,6 +114,14 @@ end
|
|||
|
||||
```
|
||||
|
||||
#### Build & Unit Test
|
||||
|
||||
First time (from a clean repository):
|
||||
`bundle install && bundle exec rake`
|
||||
|
||||
Thereafter:
|
||||
`bundle exec rake spec`
|
||||
|
||||
#### Testing
|
||||
|
||||
To run the integration tests:
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
# Releasing Hermann
|
||||
|
||||
Hermann is a multi-platform gem, which means that two actual `.gem` files need
|
||||
to be built and uploaded to [rubygems.org](https://rubygems.org/gems/hermann).
|
||||
|
||||
Here's the current process that [I](https://github.com/rtyler) use:
|
||||
|
||||
* `rvm use ruby@rubygems` (*switch to MRI*)
|
||||
* `bundle install && rake` (*ensure that MRI tests pass*)
|
||||
* `rvm use jruby@rubygems` (*switch to JRuby*)
|
||||
* `bundle install && rake` (*ensure that the JRuby tests pass*)
|
||||
* `rake release` (*tag the release and upload the `-java` platform gem*)
|
||||
* `rvm use ruby@rubygems` (*switch back to MRI*)
|
||||
* `gem build hermann.gemspec` (*build the 'ruby' platform gem*)
|
||||
* `gem push pkg/hermann-0.blah.gem` (*upload the ruby platform gem*)
|
||||
|
||||
This can certainly be cleaned up, but this is the process at it is right now
|
2
Rakefile
2
Rakefile
|
@ -3,7 +3,7 @@ require 'fileutils'
|
|||
require "bundler/gem_tasks"
|
||||
require 'rspec/core/rake_task'
|
||||
require 'rake/extensiontask'
|
||||
|
||||
require 'ci/reporter/rake/rspec'
|
||||
|
||||
Rake::ExtensionTask.new do |t|
|
||||
t.name = 'hermann_rdkafka'
|
||||
|
|
|
@ -33,6 +33,11 @@
|
|||
|
||||
#include "hermann_rdkafka.h"
|
||||
|
||||
/* This header file exposes the functions in librdkafka.a that are needed for
|
||||
* consistent partitioning. After librdkafka releases a new tag and Hermann
|
||||
* points to it, this can be removed. */
|
||||
#include "rdcrc32.h"
|
||||
|
||||
#ifdef HAVE_RUBY_VERSION_H
|
||||
#include <ruby/version.h>
|
||||
#endif
|
||||
|
@ -134,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* This function is in rdkafka.h on librdkafka master. As soon as a new
|
||||
* version is released and Hermann points to it, this can be removed. */
|
||||
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
|
||||
const void *key, size_t keylen,
|
||||
int32_t partition_cnt,
|
||||
void *rkt_opaque,
|
||||
void *msg_opaque) {
|
||||
return rd_crc32(key, keylen) % partition_cnt;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Producer partitioner callback.
|
||||
* Used to determine the target partition within a topic for production.
|
||||
|
@ -154,17 +171,11 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
|
|||
int32_t partition_cnt,
|
||||
void *rkt_opaque,
|
||||
void *msg_opaque) {
|
||||
/* Pick a random partition */
|
||||
int retry = 0;
|
||||
int32_t partition = RD_KAFKA_PARTITION_UA;
|
||||
|
||||
for (; retry < partition_cnt; retry++) {
|
||||
partition = rand() % partition_cnt;
|
||||
if (rd_kafka_topic_partition_available(rkt, partition)) {
|
||||
break; /* this one will do */
|
||||
}
|
||||
if (keylen) {
|
||||
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
|
||||
} else {
|
||||
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
|
||||
}
|
||||
return partition;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -589,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
|
|||
* @param message VALUE the ruby String containing the outgoing message.
|
||||
* @param topic VALUE the ruby String containing the topic to use for the
|
||||
* outgoing message.
|
||||
* @param key VALUE the ruby String containing the key to partition by
|
||||
* @param result VALUE the Hermann::Result object to be fulfilled when the
|
||||
* push completes
|
||||
*/
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
|
||||
|
||||
HermannInstanceConfig* producerConfig;
|
||||
/* Context pointer, pointing to `result`, for the librdkafka delivery
|
||||
|
@ -600,6 +612,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
*/
|
||||
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
|
||||
rd_kafka_topic_t *rkt = NULL;
|
||||
rd_kafka_topic_conf_t *rkt_conf = NULL;
|
||||
|
||||
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
|
||||
|
||||
|
@ -622,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
|
||||
TRACER("kafka initialized\n");
|
||||
|
||||
/* Topic configuration */
|
||||
rkt_conf = rd_kafka_topic_conf_new();
|
||||
|
||||
/* Set the partitioner callback */
|
||||
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
|
||||
|
||||
rkt = rd_kafka_topic_new(producerConfig->rk,
|
||||
RSTRING_PTR(topic),
|
||||
NULL);
|
||||
rkt_conf);
|
||||
|
||||
if (NULL == rkt) {
|
||||
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
|
||||
|
@ -645,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
RD_KAFKA_MSG_F_COPY,
|
||||
RSTRING_PTR(message),
|
||||
RSTRING_LEN(message),
|
||||
NULL,
|
||||
0,
|
||||
RSTRING_PTR(partition_key),
|
||||
RSTRING_LEN(partition_key),
|
||||
delivery_ctx)) {
|
||||
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
|
||||
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
|
||||
|
@ -1240,7 +1259,7 @@ void Init_hermann_rdkafka() {
|
|||
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
|
||||
|
||||
/* Producer.push_single(msg) */
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 3);
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 4);
|
||||
|
||||
/* Producer.tick */
|
||||
rb_define_method(c_producer, "tick", producer_tick, 1);
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* \file rdcrc32.h
|
||||
* Functions and types for CRC checks.
|
||||
*
|
||||
* Generated on Tue May 8 17:36:59 2012,
|
||||
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
|
||||
*
|
||||
* NOTE: Contains librd modifications:
|
||||
* - rd_crc32() helper.
|
||||
* - __RDCRC32___H__ define (was missing the '32' part).
|
||||
*
|
||||
* using the configuration:
|
||||
* Width = 32
|
||||
* Poly = 0x04c11db7
|
||||
* XorIn = 0xffffffff
|
||||
* ReflectIn = True
|
||||
* XorOut = 0xffffffff
|
||||
* ReflectOut = True
|
||||
* Algorithm = table-driven
|
||||
*****************************************************************************/
|
||||
#ifndef __RDCRC32___H__
|
||||
#define __RDCRC32___H__
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
* The definition of the used algorithm.
|
||||
*****************************************************************************/
|
||||
#define CRC_ALGO_TABLE_DRIVEN 1
|
||||
|
||||
|
||||
/**
|
||||
* The type of the CRC values.
|
||||
*
|
||||
* This type must be big enough to contain at least 32 bits.
|
||||
*****************************************************************************/
|
||||
typedef uint32_t rd_crc32_t;
|
||||
|
||||
|
||||
/**
|
||||
* Reflect all bits of a \a data word of \a data_len bytes.
|
||||
*
|
||||
* \param data The data word to be reflected.
|
||||
* \param data_len The width of \a data expressed in number of bits.
|
||||
* \return The reflected data.
|
||||
*****************************************************************************/
|
||||
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
|
||||
|
||||
|
||||
/**
|
||||
* Calculate the initial crc value.
|
||||
*
|
||||
* \return The initial crc value.
|
||||
*****************************************************************************/
|
||||
static inline rd_crc32_t rd_crc32_init(void)
|
||||
{
|
||||
return 0xffffffff;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update the crc value with new data.
|
||||
*
|
||||
* \param crc The current crc value.
|
||||
* \param data Pointer to a buffer of \a data_len bytes.
|
||||
* \param data_len Number of bytes in the \a data buffer.
|
||||
* \return The updated crc value.
|
||||
*****************************************************************************/
|
||||
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
|
||||
|
||||
|
||||
/**
|
||||
* Calculate the final crc value.
|
||||
*
|
||||
* \param crc The current crc value.
|
||||
* \return The final crc value.
|
||||
*****************************************************************************/
|
||||
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
|
||||
{
|
||||
return crc ^ 0xffffffff;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wrapper for performing CRC32 on the provided buffer.
|
||||
*/
|
||||
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
|
||||
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
|
||||
(const unsigned char *)data,
|
||||
data_len));
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
} /* closing brace for extern "C" */
|
||||
#endif
|
||||
|
||||
#endif /* __RDCRC32___H__ */
|
|
@ -16,7 +16,7 @@ Gem::Specification.new do |s|
|
|||
s.homepage = 'https://github.com/lookout/Hermann'
|
||||
s.licenses = ['MIT']
|
||||
|
||||
s.files = [ "Rakefile"]
|
||||
s.files = ['Rakefile']
|
||||
s.files += `git ls-files -- lib`.split($\)
|
||||
s.files += `git ls-files -- ext`.split($\)
|
||||
|
||||
|
@ -28,15 +28,16 @@ Gem::Specification.new do |s|
|
|||
s.add_dependency 'thread_safe', '~> 0.3.4'
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
s.add_dependency 'concurrent-ruby', '~> 0.7.0'
|
||||
s.files << 'lib/hermann_jars.rb'
|
||||
s.add_dependency 'concurrent-ruby', '~> 1.0.0'
|
||||
|
||||
# IMPORTANT: make sure that jar-dependencies is only a development
|
||||
# dependency of your gem. if it is a runtime dependencies the require_jars
|
||||
# file will be overwritten during installation.
|
||||
s.add_dependency 'jar-dependencies', ['~> 0.1', '>= 0.1.10']
|
||||
s.requirements << "jar org.apache.kafka:kafka_2.10, ~>0.8.1.1"
|
||||
s.requirements << "jar org.apache.kafka:kafka_2.11, ~> 0.8.2.2"
|
||||
# use log4j-1.2.16+ to as 1.2.15 declares deps which are not in maven central and causes the dep resolution to fail
|
||||
s.requirements << "jar log4j:log4j, ~>1.2.16"
|
||||
s.requirements << "jar log4j:log4j, ~> 1.2.16"
|
||||
s.require_paths = ["lib"]
|
||||
s.platform = 'java'
|
||||
else
|
||||
|
|
|
@ -63,8 +63,7 @@ module Hermann
|
|||
end
|
||||
|
||||
if Hermann.jruby?
|
||||
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
|
||||
result = @internal.push_single(value, topic, key)
|
||||
result = @internal.push_single(value, topic, opts[:partition_key], nil)
|
||||
unless result.nil?
|
||||
@children << result
|
||||
end
|
||||
|
@ -76,7 +75,7 @@ module Hermann
|
|||
# librdkafka callback queue overflow
|
||||
tick_reactor
|
||||
result = create_result
|
||||
@internal.push_single(value, topic, result)
|
||||
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
|
||||
end
|
||||
|
||||
return result
|
||||
|
@ -103,7 +102,8 @@ module Hermann
|
|||
@children.each do |child|
|
||||
# Skip over any children that should already be reaped for other
|
||||
# reasons
|
||||
next if child.completed?
|
||||
next if (Hermann.jruby? ? child.fulfilled? : child.completed?)
|
||||
|
||||
# Propagate errors to the remaining children
|
||||
child.internal_set_error(ex)
|
||||
end
|
||||
|
@ -119,7 +119,7 @@ module Hermann
|
|||
# Filter all children who are no longer pending/fulfilled
|
||||
total_children = @children.size
|
||||
|
||||
@children = @children.reject { |c| c.completed? }
|
||||
@children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }
|
||||
|
||||
return (total_children - children.size)
|
||||
end
|
||||
|
|
|
@ -43,7 +43,8 @@ module Hermann
|
|||
# @return +Concurrent::Promise+ Representa a promise to send the
|
||||
# data to the kafka broker. Upon execution the Promise's status
|
||||
# will be set
|
||||
def push_single(msg, topic, key)
|
||||
def push_single(msg, topic, key, _)
|
||||
key = key && key.to_java
|
||||
Concurrent::Promise.execute {
|
||||
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
|
||||
begin
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
module Hermann
|
||||
VERSION = '0.24.1'
|
||||
VERSION = '0.26.1'
|
||||
end
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
# this is a generated file, to avoid over-writing it just delete this comment
|
||||
require 'jar_dependencies'
|
||||
|
||||
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
|
||||
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
|
||||
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
|
||||
require_jar( 'jline', 'jline', '0.9.94' )
|
||||
require_jar( 'com.101tec', 'zkclient', '0.3' )
|
||||
require_jar( 'log4j', 'log4j', '1.2.17' )
|
||||
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
|
||||
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
|
||||
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
|
||||
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )
|
|
@ -41,7 +41,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
|
|||
let(:brokers) { 'localhost:13337' }
|
||||
|
||||
it 'should error after attempting to connect' do |example|
|
||||
producer.push_single(example.full_description, 'test-topic', nil)
|
||||
producer.push_single(example.full_description, 'test-topic', '', nil)
|
||||
begin
|
||||
producer.tick(timeout)
|
||||
rescue StandardError => ex
|
||||
|
@ -62,7 +62,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
|
|||
|
||||
describe '#push_single', :type => :integration do
|
||||
let(:message) { |example| example.full_description }
|
||||
subject(:push) { producer.push_single(message, topic, nil) }
|
||||
subject(:push) { producer.push_single(message, topic, '', nil) }
|
||||
|
||||
it 'should return' do
|
||||
expect(push).not_to be_nil
|
||||
|
@ -105,7 +105,7 @@ describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
|
|||
|
||||
context 'with a single queued request' do
|
||||
before :each do
|
||||
producer.push_single('hello', topic, nil)
|
||||
producer.push_single('hello', topic, '', nil)
|
||||
end
|
||||
|
||||
it 'should return successfully' do
|
||||
|
|
|
@ -43,34 +43,34 @@ describe Hermann::Producer do
|
|||
|
||||
context 'without topic passed' do
|
||||
it 'uses initialized topic' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
|
||||
producer.push(msg)
|
||||
end
|
||||
end
|
||||
context 'without topic passed', :platform => :java do
|
||||
it 'uses initialized topic and does not have a partition key' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
|
||||
producer.push(msg)
|
||||
end
|
||||
end
|
||||
context 'with topic passed' do
|
||||
it 'can change topic' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
|
||||
producer.push(msg, :topic => passed_topic)
|
||||
end
|
||||
|
||||
context 'and an array of messags' do
|
||||
it 'should propagate the topic' do
|
||||
messages = 3.times.map { |i| msg }
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
|
||||
producer.push(messages, :topic => passed_topic)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'with explicit partition key', :platform => :java do
|
||||
context 'with explicit partition key' do
|
||||
it 'uses the partition key' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
|
||||
producer.push(msg, :partition_key => partition_key)
|
||||
end
|
||||
end
|
||||
|
@ -178,7 +178,7 @@ describe Hermann::Producer do
|
|||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
|
|
|
@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { '0:1337'}
|
||||
let(:opts) { {} }
|
||||
let(:part_key) { "key".to_java }
|
||||
let(:part_key) { "key" }
|
||||
let(:msg) { "bar" }
|
||||
|
||||
describe '#push_single' do
|
||||
subject(:result) { producer.push_single(msg, topic, nil) }
|
||||
subject(:result) { producer.push_single(msg, topic, nil, nil) }
|
||||
|
||||
let(:passed_topic) { 'foo' }
|
||||
|
||||
|
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
|
||||
it 'can change topic' do
|
||||
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
|
||||
producer.push_single(msg, passed_topic, nil).wait(1)
|
||||
producer.push_single(msg, passed_topic, nil, nil).wait(1)
|
||||
end
|
||||
|
||||
it 'can change partition key' do
|
||||
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
|
||||
producer.push_single(msg, passed_topic, part_key).wait(1)
|
||||
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
|
||||
end
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should be rejected' do
|
||||
promise = producer.push_single('rspec', topic, nil).wait(1)
|
||||
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
|
||||
expect(promise).to be_rejected
|
||||
expect { promise.value! }.to raise_error
|
||||
end
|
||||
|
|
|
@ -2,6 +2,13 @@ require 'rubygems'
|
|||
require 'yaml'
|
||||
require 'rspec'
|
||||
|
||||
require 'simplecov'
|
||||
require 'simplecov-rcov'
|
||||
|
||||
SimpleCov.start do
|
||||
formatter = SimpleCov::Formatter::RcovFormatter
|
||||
end
|
||||
|
||||
# Add ext/ to the load path so we can load `hermann_lib`
|
||||
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))
|
||||
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../lib/'))
|
||||
|
|
Loading…
Reference in New Issue