mirror of https://github.com/reiseburo/hermann
Compare commits
68 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | d822ae541a | |
Liam Newman | f6f71c98e4 | |
R. Tyler Croy | f9957b5629 | |
Liam Newman | 35eba84dc6 | |
R. Tyler Croy | 68515effc7 | |
Liam Newman | 69af2e310f | |
R. Tyler Croy | 9473c8b24b | |
Liam Newman | e0225b6761 | |
R. Tyler Croy | a4d678deaf | |
Liam Newman | 5dd952145b | |
R. Tyler Croy | 9f5cf12151 | |
R. Tyler Croy | f71ab11ea2 | |
R. Tyler Croy | 436ec9be82 | |
Julien Feltesse | 587f3be16e | |
Omer Katz | c2b2274af3 | |
Omer Katz | c8fe7e06b5 | |
R. Tyler Croy | 246f27e76d | |
R. Tyler Croy | bc560f21ea | |
R. Tyler Croy | 9f8fbe381c | |
R. Tyler Croy | 5b669504af | |
R. Tyler Croy | d4a350c65c | |
R. Tyler Croy | 17b51359a9 | |
R. Tyler Croy | 0ad9be88f0 | |
R. Tyler Croy | 8422e82125 | |
R. Tyler Croy | 1c7543f730 | |
R. Tyler Croy | 9d5b773542 | |
R. Tyler Croy | ecae27dd65 | |
R. Tyler Croy | e9d301be3d | |
R. Tyler Croy | 4dd732dc63 | |
R. Tyler Croy | 5c20c6c5ba | |
R. Tyler Croy | 9f1e16071f | |
R. Tyler Croy | 74cb8656e1 | |
jakesandlund | 6946d4d82c | |
jakesandlund | c64d38cff3 | |
jakesandlund | c29bb5e4d0 | |
cory and jakesandlund | e8703e1df4 | |
R. Tyler Croy | cd58cb33cd | |
Christian Meier | 763e2cce97 | |
R. Tyler Croy | e276f60b27 | |
Ben Osheroff | 2c99af440e | |
Ben Osheroff | 5b8dd6feef | |
R. Tyler Croy | 60bc473fdd | |
Ben Osheroff | 9edc4b9301 | |
Ben Osheroff | a9d80242dd | |
R. Tyler Croy | c6fe9838d7 | |
R. Tyler Croy | 45fe45cb96 | |
R. Tyler Croy | c5707f5515 | |
Ben Osheroff | 783d7dac0d | |
Ben Osheroff | 09df7ec1b9 | |
Ben Osheroff | e7fce39f83 | |
Ben Osheroff | d8b8f83690 | |
R. Tyler Croy | 7f63e3c0d3 | |
R. Tyler Croy | c272bff063 | |
Ben Osheroff | 5c898144f2 | |
Ben Osheroff | 17e5c5b31d | |
Ben Osheroff | 942fd87728 | |
Ben Osheroff | 3f0da9e0cd | |
Ben Osheroff | 0bc9e9d9ee | |
Ben Osheroff | 83ea24a20e | |
Ben Osheroff | f3b6df06d3 | |
Ben Osheroff | c2540a8410 | |
Ben Osheroff | 7edd297071 | |
Ben Osheroff | e7e1a2a7ac | |
Ben Osheroff | 314ea3d8f8 | |
Ben Osheroff | 326b428b34 | |
Ben Osheroff | 7779654f36 | |
Ben Osheroff | 5942e1810b | |
Ben Osheroff | e38b626b71 |
|
@ -13,3 +13,6 @@ tmp/
|
|||
Gemfile.lock
|
||||
Jarfile.lock
|
||||
.jbundler/
|
||||
lib/hermann_jars.rb
|
||||
coverage/
|
||||
spec/reports
|
||||
|
|
|
@ -1,10 +1,17 @@
|
|||
language: ruby
|
||||
sudo: false
|
||||
rvm:
|
||||
- ruby-head
|
||||
- 2.3.0
|
||||
- 2.2
|
||||
- 2.1
|
||||
- 1.9.3
|
||||
- jruby
|
||||
- ree
|
||||
|
||||
# upgrading bundler since our gemspec relying on lib/hermann_jars.rb being
|
||||
# present causes troubles with bundler .17.x
|
||||
before_install:
|
||||
- gem install bundler -N --version ">= 1.8.0"
|
||||
#deploy:
|
||||
# provider: rubygems
|
||||
# gemspec: hermann.gemspec
|
||||
|
|
6
Gemfile
6
Gemfile
|
@ -4,7 +4,7 @@ gemspec
|
|||
|
||||
group :development do
|
||||
gem 'jbundler', :platform => :jruby
|
||||
gem 'rake'
|
||||
gem 'rake', '~> 11.3.0'
|
||||
gem 'i18n', '~> 0.6.11', :platform => :mri_18
|
||||
gem 'activesupport', '~> 3.x', :platform => :mri_18
|
||||
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
|
||||
|
@ -23,4 +23,8 @@ group :test do
|
|||
# Used for testing encoding protobufs in an out of Hermann in integration
|
||||
# tests
|
||||
gem 'protobuffy'
|
||||
|
||||
gem 'ci_reporter_rspec'
|
||||
gem 'simplecov'
|
||||
gem 'simplecov-rcov'
|
||||
end
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
#!groovy
|
||||
|
||||
/* Only keep the 10 most recent builds. */
|
||||
properties([[$class: 'BuildDiscarderProperty',
|
||||
strategy: [$class: 'LogRotator', numToKeepStr: '10']]])
|
||||
|
||||
stage ('Build') {
|
||||
|
||||
node {
|
||||
// Checkout
|
||||
checkout scm
|
||||
|
||||
// install required bundles
|
||||
sh 'bundle install'
|
||||
|
||||
// build and run tests with coverage
|
||||
sh 'bundle exec rake build spec'
|
||||
|
||||
// Archive the built artifacts
|
||||
archive (includes: 'pkg/*.gem')
|
||||
|
||||
// publish html
|
||||
publishHTML ([
|
||||
allowMissing: false,
|
||||
alwaysLinkToLastBuild: false,
|
||||
keepAll: true,
|
||||
reportDir: 'coverage',
|
||||
reportFiles: 'index.html',
|
||||
reportName: "RCov Report"
|
||||
])
|
||||
|
||||
}
|
||||
}
|
33
README.md
33
README.md
|
@ -1,6 +1,6 @@
|
|||
# Hermann
|
||||
|
||||
[![Gitter chat](https://badges.gitter.im/lookout/Hermann.png)](https://gitter.im/lookout/Hermann) [![Build Status](https://travis-ci.org/lookout/hermann.svg?branch=master)](https://travis-ci.org/lookout/hermann)
|
||||
[![Gitter chat](https://badges.gitter.im/reiseburo/hermann.png)](https://gitter.im/reiseburo/hermann) [![Build Status](https://travis-ci.org/reiseburo/hermann.svg?branch=master)](https://travis-ci.org/reiseburo/hermann)
|
||||
|
||||
A Ruby gem implementing a Kafka Publisher and Consumer
|
||||
|
||||
|
@ -90,11 +90,38 @@ new_topic = 'other_topic'
|
|||
|
||||
the_consumer = Hermann::Consumer.new(topic, brokers: "localhost:9092", partition: 1)
|
||||
|
||||
the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to .consume
|
||||
puts "Recv: #{msg}"
|
||||
the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to .consume
|
||||
puts "Recv: #{msg}, key: #{key}, offset: #{offset}"
|
||||
end
|
||||
```
|
||||
|
||||
### Metadata request (MRI-only)
|
||||
|
||||
Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.
|
||||
|
||||
```ruby
|
||||
require 'hermann'
|
||||
require 'hermann/discovery/metadata'
|
||||
|
||||
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
|
||||
topic = c.topic("topic")
|
||||
|
||||
puts topic.partitions.first
|
||||
|
||||
consumers = topic.partitions.map do |partition|
|
||||
partition.consumer
|
||||
end
|
||||
|
||||
```
|
||||
|
||||
#### Build & Unit Test
|
||||
|
||||
First time (from a clean repository):
|
||||
`bundle install && bundle exec rake`
|
||||
|
||||
Thereafter:
|
||||
`bundle exec rake spec`
|
||||
|
||||
#### Testing
|
||||
|
||||
To run the integration tests:
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
# Releasing Hermann
|
||||
|
||||
Hermann is a multi-platform gem, which means that two actual `.gem` files need
|
||||
to be built and uploaded to [rubygems.org](https://rubygems.org/gems/hermann).
|
||||
|
||||
Here's the current process that [I](https://github.com/rtyler) use:
|
||||
|
||||
* `rvm use ruby@rubygems` (*switch to MRI*)
|
||||
* `bundle install && rake` (*ensure that MRI tests pass*)
|
||||
* `rvm use jruby@rubygems` (*switch to JRuby*)
|
||||
* `bundle install && rake` (*ensure that the JRuby tests pass*)
|
||||
* `rake release` (*tag the release and upload the `-java` platform gem*)
|
||||
* `rvm use ruby@rubygems` (*switch back to MRI*)
|
||||
* `gem build hermann.gemspec` (*build the 'ruby' platform gem*)
|
||||
* `gem push pkg/hermann-0.blah.gem` (*upload the ruby platform gem*)
|
||||
|
||||
This can certainly be cleaned up, but this is the process at it is right now
|
4
Rakefile
4
Rakefile
|
@ -3,10 +3,10 @@ require 'fileutils'
|
|||
require "bundler/gem_tasks"
|
||||
require 'rspec/core/rake_task'
|
||||
require 'rake/extensiontask'
|
||||
|
||||
require 'ci/reporter/rake/rspec'
|
||||
|
||||
Rake::ExtensionTask.new do |t|
|
||||
t.name = 'hermann_lib'
|
||||
t.name = 'hermann_rdkafka'
|
||||
t.ext_dir = 'ext/hermann'
|
||||
t.gem_spec = Gem::Specification.load('hermann.gemspec')
|
||||
end
|
||||
|
|
|
@ -147,7 +147,8 @@ $LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a')
|
|||
|
||||
have_header('ruby/thread.h')
|
||||
have_header('ruby/intern.h')
|
||||
have_header('ruby/version.h')
|
||||
have_func('rb_thread_blocking_region')
|
||||
have_func('rb_thread_call_without_gvl')
|
||||
|
||||
create_makefile('hermann/hermann_lib')
|
||||
create_makefile('hermann/hermann_rdkafka')
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* hermann_lib.c - Ruby wrapper for the librdkafka library
|
||||
* hermann_rdkafka.c - Ruby wrapper for the librdkafka library
|
||||
*
|
||||
* Copyright (c) 2014 Stan Campbell
|
||||
* Copyright (c) 2014 Lookout, Inc.
|
||||
|
@ -31,8 +31,16 @@
|
|||
|
||||
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
|
||||
|
||||
#include "hermann_lib.h"
|
||||
#include "hermann_rdkafka.h"
|
||||
|
||||
/* This header file exposes the functions in librdkafka.a that are needed for
|
||||
* consistent partitioning. After librdkafka releases a new tag and Hermann
|
||||
* points to it, this can be removed. */
|
||||
#include "rdcrc32.h"
|
||||
|
||||
#ifdef HAVE_RUBY_VERSION_H
|
||||
#include <ruby/version.h>
|
||||
#endif
|
||||
|
||||
/* how long to let librdkafka block on the socket before returning back to the interpreter.
|
||||
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
|
||||
|
@ -120,7 +128,7 @@ static void msg_delivered(rd_kafka_t *rk,
|
|||
/* call back into our Hermann::Result if it exists, discarding the
|
||||
* return value
|
||||
*/
|
||||
if (NULL != push_ctx->result) {
|
||||
if (NULL != (void *)push_ctx->result) {
|
||||
rb_funcall(push_ctx->result,
|
||||
hermann_result_fulfill_method,
|
||||
2,
|
||||
|
@ -131,6 +139,18 @@ static void msg_delivered(rd_kafka_t *rk,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* This function is in rdkafka.h on librdkafka master. As soon as a new
|
||||
* version is released and Hermann points to it, this can be removed. */
|
||||
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
|
||||
const void *key, size_t keylen,
|
||||
int32_t partition_cnt,
|
||||
void *rkt_opaque,
|
||||
void *msg_opaque) {
|
||||
return rd_crc32(key, keylen) % partition_cnt;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Producer partitioner callback.
|
||||
* Used to determine the target partition within a topic for production.
|
||||
|
@ -151,13 +171,10 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
|
|||
int32_t partition_cnt,
|
||||
void *rkt_opaque,
|
||||
void *msg_opaque) {
|
||||
/* Pick a random partition */
|
||||
int retry = 0;
|
||||
for (; retry < partition_cnt; retry++) {
|
||||
int32_t partition = rand() % partition_cnt;
|
||||
if (rd_kafka_topic_partition_available(rkt, partition)) {
|
||||
break; /* this one will do */
|
||||
}
|
||||
if (keylen) {
|
||||
return rd_kafka_msg_partitioner_consistent(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
|
||||
} else {
|
||||
return rd_kafka_msg_partitioner_random(rkt, keydata, keylen, partition_cnt, rkt_opaque, msg_opaque);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,8 +275,19 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
|
|||
|
||||
// Yield the data to the Consumer's block
|
||||
if (rb_block_given_p()) {
|
||||
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
|
||||
rb_yield(value);
|
||||
VALUE key, data, offset;
|
||||
|
||||
data = rb_str_new((char *)rkmessage->payload, rkmessage->len);
|
||||
offset = rb_ll2inum(rkmessage->offset);
|
||||
|
||||
if ( rkmessage->key_len > 0 ) {
|
||||
key = rb_str_new((char*) rkmessage->key, (int)rkmessage->key_len);
|
||||
} else {
|
||||
key = Qnil;
|
||||
}
|
||||
|
||||
rd_kafka_message_destroy(rkmessage);
|
||||
rb_yield_values(3, data, key, offset);
|
||||
}
|
||||
else {
|
||||
if (DEBUG) {
|
||||
|
@ -388,15 +416,19 @@ static void *consumer_recv_msg(void *ptr)
|
|||
* after every message, to see if the ruby interpreter wants us to exit the
|
||||
* loop.
|
||||
*
|
||||
* @param HermannInstanceConfig* The hermann configuration for this consumer
|
||||
* @param self The consumer instance
|
||||
*/
|
||||
|
||||
static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
|
||||
static VALUE consumer_consume_loop(VALUE self) {
|
||||
HermannInstanceConfig* consumerConfig;
|
||||
rd_kafka_message_t *msg;
|
||||
|
||||
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
|
||||
|
||||
TRACER("\n");
|
||||
|
||||
while (consumerConfig->run) {
|
||||
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
|
||||
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
|
||||
msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg,
|
||||
consumerConfig,
|
||||
consumer_consume_stop_callback,
|
||||
|
@ -412,13 +444,28 @@ static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
|
|||
|
||||
if ( msg ) {
|
||||
msg_consume(msg, consumerConfig);
|
||||
rd_kafka_message_destroy(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* consumer_consume_loop_stop
|
||||
*
|
||||
* called when we're done with the .consume() loop. lets rdkafa cleanup some internal structures
|
||||
*/
|
||||
static VALUE consumer_consume_loop_stop(VALUE self) {
|
||||
HermannInstanceConfig* consumerConfig;
|
||||
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
|
||||
|
||||
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hermann::Lib::Consumer.consume
|
||||
* Hermann::Provider::RDKafka::Consumer.consume
|
||||
*
|
||||
* @param VALUE self the Ruby object for this consumer
|
||||
* @param VALUE topic the Ruby string representing a topic to consume
|
||||
|
@ -446,17 +493,12 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
|
|||
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
|
||||
fprintf(stderr, "%% Failed to start consuming: %s\n",
|
||||
rd_kafka_err2str(rd_kafka_errno2err(errno)));
|
||||
rb_raise(rb_eRuntimeError,
|
||||
rb_raise(rb_eRuntimeError, "%s",
|
||||
rd_kafka_err2str(rd_kafka_errno2err(errno)));
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
consumer_consume_loop(consumerConfig);
|
||||
|
||||
/* Stop consuming */
|
||||
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
|
||||
|
||||
return Qnil;
|
||||
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
|
||||
}
|
||||
|
||||
|
||||
|
@ -558,10 +600,11 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
|
|||
* @param message VALUE the ruby String containing the outgoing message.
|
||||
* @param topic VALUE the ruby String containing the topic to use for the
|
||||
* outgoing message.
|
||||
* @param key VALUE the ruby String containing the key to partition by
|
||||
* @param result VALUE the Hermann::Result object to be fulfilled when the
|
||||
* push completes
|
||||
*/
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {
|
||||
|
||||
HermannInstanceConfig* producerConfig;
|
||||
/* Context pointer, pointing to `result`, for the librdkafka delivery
|
||||
|
@ -569,13 +612,14 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
*/
|
||||
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
|
||||
rd_kafka_topic_t *rkt = NULL;
|
||||
rd_kafka_topic_conf_t *rkt_conf = NULL;
|
||||
|
||||
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
|
||||
|
||||
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
|
||||
|
||||
delivery_ctx->producer = producerConfig;
|
||||
delivery_ctx->result = NULL;
|
||||
delivery_ctx->result = (VALUE) NULL;
|
||||
|
||||
TRACER("producerConfig: %p\n", producerConfig);
|
||||
|
||||
|
@ -591,9 +635,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
|
||||
TRACER("kafka initialized\n");
|
||||
|
||||
/* Topic configuration */
|
||||
rkt_conf = rd_kafka_topic_conf_new();
|
||||
|
||||
/* Set the partitioner callback */
|
||||
rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);
|
||||
|
||||
rkt = rd_kafka_topic_new(producerConfig->rk,
|
||||
RSTRING_PTR(topic),
|
||||
NULL);
|
||||
rkt_conf);
|
||||
|
||||
if (NULL == rkt) {
|
||||
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
|
||||
|
@ -614,8 +664,8 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
RD_KAFKA_MSG_F_COPY,
|
||||
RSTRING_PTR(message),
|
||||
RSTRING_LEN(message),
|
||||
NULL,
|
||||
0,
|
||||
RSTRING_PTR(partition_key),
|
||||
RSTRING_LEN(partition_key),
|
||||
delivery_ctx)) {
|
||||
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
|
||||
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
|
||||
|
@ -666,19 +716,56 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
|
|||
events = rd_kafka_poll(conf->rk, timeout_ms);
|
||||
|
||||
if (conf->isErrored) {
|
||||
rb_raise(rb_eStandardError, conf->error);
|
||||
rb_raise(rb_eStandardError, "%s", conf->error);
|
||||
}
|
||||
|
||||
return rb_int_new(events);
|
||||
}
|
||||
|
||||
/*
|
||||
* producer_metadata_request_nogvl
|
||||
*
|
||||
* call rd_kafka_metadata without the GVL held. Note that rd_kafka_metadata is not interruptible,
|
||||
* so in case of interrupt the thread will not respond until timeout_ms is reached.
|
||||
*
|
||||
* rd_kafka_metadata will fill in the ctx->data pointer on success
|
||||
*
|
||||
* @param ptr void* the hermann_metadata_ctx_t
|
||||
*/
|
||||
|
||||
static void *producer_metadata_request_nogvl(void *ptr)
|
||||
{
|
||||
hermann_metadata_ctx_t *ctx = (hermann_metadata_ctx_t*)ptr;
|
||||
|
||||
return (void *) rd_kafka_metadata(ctx->rk,
|
||||
ctx->topic ? 0 : 1,
|
||||
ctx->topic,
|
||||
(const struct rd_kafka_metadata **) &(ctx->data),
|
||||
ctx->timeout_ms);
|
||||
}
|
||||
|
||||
|
||||
static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
|
||||
{
|
||||
int err;
|
||||
|
||||
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
|
||||
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
|
||||
NULL, NULL);
|
||||
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
|
||||
err = (int) rb_thread_call_without_gvl(producer_metadata_request_nogvl, ctx, NULL, NULL);
|
||||
#else
|
||||
err = (int) producer_metadata_request_nogvl(ctx);
|
||||
#endif
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
static VALUE producer_connect(VALUE self, VALUE timeout) {
|
||||
HermannInstanceConfig *producerConfig;
|
||||
rd_kafka_resp_err_t err;
|
||||
VALUE result = Qfalse;
|
||||
int timeout_ms = rb_num2int(timeout);
|
||||
struct rd_kafka_metadata *data = NULL;
|
||||
hermann_metadata_ctx_t md_context;
|
||||
|
||||
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
|
||||
|
||||
|
@ -686,17 +773,19 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
|
|||
producer_init_kafka(self, producerConfig);
|
||||
}
|
||||
|
||||
err = rd_kafka_metadata(producerConfig->rk,
|
||||
0,
|
||||
producerConfig->rkt,
|
||||
&data,
|
||||
timeout_ms);
|
||||
md_context.rk = producerConfig->rk;
|
||||
md_context.topic = NULL;
|
||||
md_context.data = NULL;
|
||||
md_context.timeout_ms = rb_num2int(timeout);
|
||||
|
||||
err = producer_metadata_request(&md_context);
|
||||
|
||||
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
|
||||
|
||||
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
|
||||
TRACER("brokers: %i, topics: %i\n",
|
||||
data->broker_cnt,
|
||||
data->topic_cnt);
|
||||
md_context.data->broker_cnt,
|
||||
md_context.data->topic_cnt);
|
||||
producerConfig->isConnected = 1;
|
||||
result = Qtrue;
|
||||
}
|
||||
|
@ -704,11 +793,118 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
|
|||
producerConfig->isErrored = err;
|
||||
}
|
||||
|
||||
rd_kafka_metadata_destroy(data);
|
||||
if ( md_context.data )
|
||||
rd_kafka_metadata_destroy(md_context.data);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* producer_metadata_make_hash
|
||||
*
|
||||
* transform the rd_kafka_metadata structure into a ruby hash. eg:
|
||||
* { :brokers => [ {:id=>0, :host=>"172.20.10.3", :port=>9092} ],
|
||||
* :topics => { "maxwell" => [ {:id=>0, :leader_id=>0, :replica_ids=>[0], :isr_ids=>[0]}]} }
|
||||
*
|
||||
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
|
||||
*/
|
||||
|
||||
static VALUE producer_metadata_make_hash(struct rd_kafka_metadata *data)
|
||||
{
|
||||
int i, j, k;
|
||||
VALUE broker_hash, topic_hash, partition_ary, partition_hash, partition_replica_ary, partition_isr_ary;
|
||||
VALUE hash = rb_hash_new();
|
||||
VALUE brokers = rb_ary_new2(data->broker_cnt);
|
||||
VALUE topics = rb_hash_new();
|
||||
|
||||
for ( i = 0; i < data->broker_cnt; i++ ) {
|
||||
broker_hash = rb_hash_new();
|
||||
rb_hash_aset(broker_hash, ID2SYM(rb_intern("id")), INT2FIX(data->brokers[i].id));
|
||||
rb_hash_aset(broker_hash, ID2SYM(rb_intern("host")), rb_str_new2(data->brokers[i].host));
|
||||
rb_hash_aset(broker_hash, ID2SYM(rb_intern("port")), INT2FIX(data->brokers[i].port));
|
||||
rb_ary_push(brokers, broker_hash);
|
||||
}
|
||||
|
||||
for ( i = 0; i < data->topic_cnt; i++ ) {
|
||||
partition_ary = rb_ary_new2(data->topics[i].partition_cnt);
|
||||
|
||||
for ( j = 0 ; j < data->topics[i].partition_cnt ; j++ ) {
|
||||
VALUE partition_hash = rb_hash_new();
|
||||
rd_kafka_metadata_partition_t *partition = &(data->topics[i].partitions[j]);
|
||||
|
||||
/* id => 1, leader_id => 0 */
|
||||
rb_hash_aset(partition_hash, ID2SYM(rb_intern("id")), INT2FIX(partition->id));
|
||||
rb_hash_aset(partition_hash, ID2SYM(rb_intern("leader_id")), INT2FIX(partition->leader));
|
||||
|
||||
/* replica_ids => [1, 0] */
|
||||
partition_replica_ary = rb_ary_new2(partition->replica_cnt);
|
||||
for ( k = 0 ; k < partition->replica_cnt ; k++ ) {
|
||||
rb_ary_push(partition_replica_ary, INT2FIX(partition->replicas[k]));
|
||||
}
|
||||
rb_hash_aset(partition_hash, ID2SYM(rb_intern("replica_ids")), partition_replica_ary);
|
||||
|
||||
/* isr_ids => [1, 0] */
|
||||
partition_isr_ary = rb_ary_new2(partition->isr_cnt);
|
||||
for ( k = 0 ; k < partition->isr_cnt ; k++ ) {
|
||||
rb_ary_push(partition_isr_ary, INT2FIX(partition->isrs[k]));
|
||||
}
|
||||
rb_hash_aset(partition_hash, ID2SYM(rb_intern("isr_ids")), partition_isr_ary);
|
||||
|
||||
rb_ary_push(partition_ary, partition_hash);
|
||||
}
|
||||
|
||||
rb_hash_aset(topics, rb_str_new2(data->topics[i].topic), partition_ary);
|
||||
}
|
||||
|
||||
rb_hash_aset(hash, ID2SYM(rb_intern("brokers")), brokers);
|
||||
rb_hash_aset(hash, ID2SYM(rb_intern("topics")), topics);
|
||||
return hash;
|
||||
}
|
||||
|
||||
/*
|
||||
* producer_metadata
|
||||
*
|
||||
* make a metadata request to the kafka server, returning a hash
|
||||
* containing a list of brokers and topics.
|
||||
*
|
||||
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
|
||||
*/
|
||||
|
||||
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
|
||||
HermannInstanceConfig *producerConfig;
|
||||
rd_kafka_resp_err_t err;
|
||||
hermann_metadata_ctx_t md_context;
|
||||
VALUE result;
|
||||
|
||||
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
|
||||
|
||||
if (!producerConfig->isInitialized) {
|
||||
producer_init_kafka(self, producerConfig);
|
||||
}
|
||||
|
||||
md_context.rk = producerConfig->rk;
|
||||
md_context.timeout_ms = rb_num2int(timeout);
|
||||
|
||||
if ( !NIL_P(topicStr) ) {
|
||||
Check_Type(topicStr, T_STRING);
|
||||
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
|
||||
} else {
|
||||
md_context.topic = NULL;
|
||||
}
|
||||
|
||||
err = producer_metadata_request(&md_context);
|
||||
|
||||
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
|
||||
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
|
||||
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
|
||||
} else {
|
||||
result = producer_metadata_make_hash(md_context.data);
|
||||
rd_kafka_metadata_destroy(md_context.data);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static VALUE producer_is_connected(VALUE self) {
|
||||
HermannInstanceConfig *producerConfig;
|
||||
|
||||
|
@ -1023,21 +1219,21 @@ static VALUE producer_init_copy(VALUE copy,
|
|||
}
|
||||
|
||||
/**
|
||||
* Init_hermann_lib
|
||||
* Init_hermann_rdkafka
|
||||
*
|
||||
* Called by Ruby when the Hermann gem is loaded.
|
||||
* Defines the Hermann module.
|
||||
* Defines the Producer and Consumer classes.
|
||||
*/
|
||||
void Init_hermann_lib() {
|
||||
VALUE lib_module, c_consumer, c_producer;
|
||||
void Init_hermann_rdkafka() {
|
||||
VALUE lib_module, provider_module, c_consumer, c_producer;
|
||||
|
||||
TRACER("setting up Hermann::Lib\n");
|
||||
TRACER("setting up Hermann::Provider::RDKafka\n");
|
||||
|
||||
/* Define the module */
|
||||
hermann_module = rb_define_module("Hermann");
|
||||
lib_module = rb_define_module_under(hermann_module, "Lib");
|
||||
|
||||
provider_module = rb_define_module_under(hermann_module, "Provider");
|
||||
lib_module = rb_define_module_under(provider_module, "RDKafka");
|
||||
|
||||
/* ---- Define the consumer class ---- */
|
||||
c_consumer = rb_define_class_under(lib_module, "Consumer", rb_cObject);
|
||||
|
@ -1063,7 +1259,7 @@ void Init_hermann_lib() {
|
|||
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
|
||||
|
||||
/* Producer.push_single(msg) */
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 3);
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 4);
|
||||
|
||||
/* Producer.tick */
|
||||
rb_define_method(c_producer, "tick", producer_tick, 1);
|
||||
|
@ -1076,4 +1272,7 @@ void Init_hermann_lib() {
|
|||
|
||||
/* Producer.connect */
|
||||
rb_define_method(c_producer, "connect", producer_connect, 1);
|
||||
|
||||
/* Producer.metadata */
|
||||
rb_define_method(c_producer, "metadata", producer_metadata, 2);
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* hermann_lib.h - Ruby wrapper for the librdkafka library
|
||||
* hermann_rdkafka.h - Ruby wrapper for the librdkafka library
|
||||
*
|
||||
* Copyright (c) 2014 Stan Campbell
|
||||
* Copyright (c) 2014 Lookout, Inc.
|
||||
|
@ -107,10 +107,17 @@ typedef struct HermannInstanceConfig {
|
|||
typedef HermannInstanceConfig hermann_conf_t;
|
||||
|
||||
typedef struct {
|
||||
/* Hermann::Lib::Producer */
|
||||
/* Hermann::Provider::RDKafka::Producer */
|
||||
hermann_conf_t *producer;
|
||||
/* Hermann::Result */
|
||||
VALUE result;
|
||||
} hermann_push_ctx_t;
|
||||
|
||||
typedef struct {
|
||||
rd_kafka_t *rk;
|
||||
rd_kafka_topic_t *topic;
|
||||
struct rd_kafka_metadata *data;
|
||||
int timeout_ms;
|
||||
} hermann_metadata_ctx_t;
|
||||
|
||||
#endif
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* \file rdcrc32.h
|
||||
* Functions and types for CRC checks.
|
||||
*
|
||||
* Generated on Tue May 8 17:36:59 2012,
|
||||
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
|
||||
*
|
||||
* NOTE: Contains librd modifications:
|
||||
* - rd_crc32() helper.
|
||||
* - __RDCRC32___H__ define (was missing the '32' part).
|
||||
*
|
||||
* using the configuration:
|
||||
* Width = 32
|
||||
* Poly = 0x04c11db7
|
||||
* XorIn = 0xffffffff
|
||||
* ReflectIn = True
|
||||
* XorOut = 0xffffffff
|
||||
* ReflectOut = True
|
||||
* Algorithm = table-driven
|
||||
*****************************************************************************/
|
||||
#ifndef __RDCRC32___H__
|
||||
#define __RDCRC32___H__
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
* The definition of the used algorithm.
|
||||
*****************************************************************************/
|
||||
#define CRC_ALGO_TABLE_DRIVEN 1
|
||||
|
||||
|
||||
/**
|
||||
* The type of the CRC values.
|
||||
*
|
||||
* This type must be big enough to contain at least 32 bits.
|
||||
*****************************************************************************/
|
||||
typedef uint32_t rd_crc32_t;
|
||||
|
||||
|
||||
/**
|
||||
* Reflect all bits of a \a data word of \a data_len bytes.
|
||||
*
|
||||
* \param data The data word to be reflected.
|
||||
* \param data_len The width of \a data expressed in number of bits.
|
||||
* \return The reflected data.
|
||||
*****************************************************************************/
|
||||
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
|
||||
|
||||
|
||||
/**
|
||||
* Calculate the initial crc value.
|
||||
*
|
||||
* \return The initial crc value.
|
||||
*****************************************************************************/
|
||||
static inline rd_crc32_t rd_crc32_init(void)
|
||||
{
|
||||
return 0xffffffff;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update the crc value with new data.
|
||||
*
|
||||
* \param crc The current crc value.
|
||||
* \param data Pointer to a buffer of \a data_len bytes.
|
||||
* \param data_len Number of bytes in the \a data buffer.
|
||||
* \return The updated crc value.
|
||||
*****************************************************************************/
|
||||
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
|
||||
|
||||
|
||||
/**
|
||||
* Calculate the final crc value.
|
||||
*
|
||||
* \param crc The current crc value.
|
||||
* \return The final crc value.
|
||||
*****************************************************************************/
|
||||
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
|
||||
{
|
||||
return crc ^ 0xffffffff;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Wrapper for performing CRC32 on the provided buffer.
|
||||
*/
|
||||
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
|
||||
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
|
||||
(const unsigned char *)data,
|
||||
data_len));
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
} /* closing brace for extern "C" */
|
||||
#endif
|
||||
|
||||
#endif /* __RDCRC32___H__ */
|
|
@ -16,7 +16,7 @@ Gem::Specification.new do |s|
|
|||
s.homepage = 'https://github.com/lookout/Hermann'
|
||||
s.licenses = ['MIT']
|
||||
|
||||
s.files = [ "Rakefile"]
|
||||
s.files = ['Rakefile']
|
||||
s.files += `git ls-files -- lib`.split($\)
|
||||
s.files += `git ls-files -- ext`.split($\)
|
||||
|
||||
|
@ -28,15 +28,16 @@ Gem::Specification.new do |s|
|
|||
s.add_dependency 'thread_safe', '~> 0.3.4'
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
s.add_dependency 'concurrent-ruby', '~> 0.7.0'
|
||||
s.files << 'lib/hermann_jars.rb'
|
||||
s.add_dependency 'concurrent-ruby', '~> 1.0.0'
|
||||
|
||||
# IMPORTANT: make sure that jar-dependencies is only a development
|
||||
# dependency of your gem. if it is a runtime dependencies the require_jars
|
||||
# file will be overwritten during installation.
|
||||
s.add_dependency 'jar-dependencies', '~>0.1.9'
|
||||
s.requirements << "jar org.apache.kafka:kafka_2.10, ~>0.8.1.1, ['junit:junit']"
|
||||
s.add_dependency 'jar-dependencies', ['~> 0.1', '>= 0.1.10']
|
||||
s.requirements << "jar org.apache.kafka:kafka_2.11, ~> 0.8.2.2"
|
||||
# use log4j-1.2.16+ to as 1.2.15 declares deps which are not in maven central and causes the dep resolution to fail
|
||||
s.requirements << "jar log4j:log4j, ~>1.2.16"
|
||||
s.requirements << "jar log4j:log4j, ~> 1.2.16"
|
||||
s.require_paths = ["lib"]
|
||||
s.platform = 'java'
|
||||
else
|
||||
|
|
|
@ -4,7 +4,7 @@ require 'hermann/errors'
|
|||
if Hermann.jruby?
|
||||
require 'hermann/provider/java_simple_consumer'
|
||||
else
|
||||
require 'hermann_lib'
|
||||
require 'hermann_rdkafka'
|
||||
end
|
||||
|
||||
module Hermann
|
||||
|
@ -37,7 +37,7 @@ module Hermann
|
|||
else
|
||||
brokers, partition = require_values_at(opts, :brokers, :partition)
|
||||
|
||||
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset)
|
||||
@internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
require 'hermann_rdkafka'
|
||||
require 'hermann/consumer'
|
||||
|
||||
module Hermann
|
||||
module Discovery
|
||||
class Metadata
|
||||
Broker = Struct.new(:id, :host, :port) do
|
||||
def to_s
|
||||
"#{host}:#{port}"
|
||||
end
|
||||
end
|
||||
Topic = Struct.new(:name, :partitions)
|
||||
|
||||
Partition = Struct.new(:id, :leader, :replicas, :insync_replicas, :topic_name) do
|
||||
def consumer(offset=:end)
|
||||
Hermann::Consumer.new(topic_name, brokers: ([leader] + replicas).join(','), partition: id, offset: offset)
|
||||
end
|
||||
end
|
||||
|
||||
DEFAULT_TIMEOUT_MS = 2_000
|
||||
def initialize(brokers, options = {})
|
||||
raise "this is an MRI api only!" if Hermann.jruby?
|
||||
@internal = Hermann::Provider::RDKafka::Producer.new(brokers)
|
||||
@timeout = options[:timeout] || DEFAULT_TIMEOUT_MS
|
||||
end
|
||||
|
||||
#
|
||||
# @internal.metadata returns:
|
||||
# {:brokers => [{:id=>3, :host=>"kafka3.alpha4.sac1.zdsys.com", :port=>9092}],
|
||||
# :topics => {"testtopic"=>[{:id=>0, :leader_id=>3, :replica_ids=>[3, 1], :isr_ids=>[3, 1]}}}
|
||||
#
|
||||
def brokers
|
||||
brokers_from_metadata(@internal.metadata(nil, @timeout))
|
||||
end
|
||||
|
||||
def topic(t)
|
||||
get_topics(t)[t]
|
||||
end
|
||||
|
||||
def topics
|
||||
get_topics
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_topics(filter_topics = nil)
|
||||
md = @internal.metadata(filter_topics, @timeout)
|
||||
|
||||
broker_hash = brokers_from_metadata(md).inject({}) do |h, broker|
|
||||
h[broker.id] = broker
|
||||
h
|
||||
end
|
||||
|
||||
md[:topics].inject({}) do |topic_hash, arr|
|
||||
topic_name, raw_partitions = *arr
|
||||
partitions = raw_partitions.map do |p|
|
||||
leader = broker_hash[p[:leader_id]]
|
||||
all_replicas = p[:replica_ids].map { |i| broker_hash[i] }
|
||||
isr_replicas = p[:isr_ids].map { |i| broker_hash[i] }
|
||||
Partition.new(p[:id], leader, all_replicas, isr_replicas, topic_name)
|
||||
end
|
||||
|
||||
topic_hash[topic_name] = Topic.new(topic_name, partitions)
|
||||
topic_hash
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def brokers_from_metadata(md)
|
||||
md[:brokers].map do |h|
|
||||
Broker.new(h[:id], h[:host], h[:port])
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
|
@ -6,7 +6,7 @@ require 'hermann/result'
|
|||
if RUBY_PLATFORM == "java"
|
||||
require 'hermann/provider/java_producer'
|
||||
else
|
||||
require 'hermann_lib'
|
||||
require 'hermann_rdkafka'
|
||||
end
|
||||
|
||||
module Hermann
|
||||
|
@ -23,7 +23,7 @@ module Hermann
|
|||
if Hermann.jruby?
|
||||
@internal = Hermann::Provider::JavaProducer.new(brokers.join(','), opts)
|
||||
else
|
||||
@internal = Hermann::Lib::Producer.new(brokers.join(','))
|
||||
@internal = Hermann::Provider::RDKafka::Producer.new(brokers.join(','))
|
||||
end
|
||||
# We're tracking children so we can make sure that at Producer exit we
|
||||
# make a reasonable attempt to clean up outstanding result objects
|
||||
|
@ -63,8 +63,7 @@ module Hermann
|
|||
end
|
||||
|
||||
if Hermann.jruby?
|
||||
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
|
||||
result = @internal.push_single(value, topic, key)
|
||||
result = @internal.push_single(value, topic, opts[:partition_key], nil)
|
||||
unless result.nil?
|
||||
@children << result
|
||||
end
|
||||
|
@ -76,7 +75,7 @@ module Hermann
|
|||
# librdkafka callback queue overflow
|
||||
tick_reactor
|
||||
result = create_result
|
||||
@internal.push_single(value, topic, result)
|
||||
@internal.push_single(value, topic, opts[:partition_key].to_s, result)
|
||||
end
|
||||
|
||||
return result
|
||||
|
@ -103,7 +102,8 @@ module Hermann
|
|||
@children.each do |child|
|
||||
# Skip over any children that should already be reaped for other
|
||||
# reasons
|
||||
next if child.completed?
|
||||
next if (Hermann.jruby? ? child.fulfilled? : child.completed?)
|
||||
|
||||
# Propagate errors to the remaining children
|
||||
child.internal_set_error(ex)
|
||||
end
|
||||
|
@ -119,7 +119,7 @@ module Hermann
|
|||
# Filter all children who are no longer pending/fulfilled
|
||||
total_children = @children.size
|
||||
|
||||
@children = @children.reject { |c| c.completed? }
|
||||
@children = @children.reject { |c| Hermann.jruby? ? c.fulfilled? : c.completed? }
|
||||
|
||||
return (total_children - children.size)
|
||||
end
|
||||
|
|
|
@ -43,7 +43,8 @@ module Hermann
|
|||
# @return +Concurrent::Promise+ Representa a promise to send the
|
||||
# data to the kafka broker. Upon execution the Promise's status
|
||||
# will be set
|
||||
def push_single(msg, topic, key)
|
||||
def push_single(msg, topic, key, _)
|
||||
key = key && key.to_java
|
||||
Concurrent::Promise.execute {
|
||||
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
|
||||
begin
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
module Hermann
|
||||
VERSION = '0.24.0'
|
||||
VERSION = '0.26.1'
|
||||
end
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
# this is a generated file, to avoid over-writing it just delete this comment
|
||||
require 'jar_dependencies'
|
||||
|
||||
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
|
||||
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
|
||||
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
|
||||
require_jar( 'jline', 'jline', '0.9.94' )
|
||||
require_jar( 'com.101tec', 'zkclient', '0.3' )
|
||||
require_jar( 'log4j', 'log4j', '1.2.17' )
|
||||
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
|
||||
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
|
||||
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
|
||||
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )
|
|
@ -0,0 +1,15 @@
|
|||
require 'bundler/setup'
|
||||
require 'hermann'
|
||||
require 'hermann/discovery/metadata'
|
||||
require 'hermann/consumer'
|
||||
|
||||
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
|
||||
c.topic("maxwell")
|
||||
puts c.topic("maxwell").inspect
|
||||
|
||||
|
||||
puts c.brokers.inspect
|
||||
consumer = Hermann::Consumer.new("maxwell", brokers: "localhost:9092, localhost:9092", partition: c.topic('maxwell').partitions.first.id, offset: :start)
|
||||
consumer.consume do |c|
|
||||
puts c
|
||||
end
|
|
@ -1,13 +1,13 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe 'Hermann::Lib::Producer', :platform => :mri do
|
||||
describe 'Hermann::Provider::RDKafka::Producer', :platform => :mri do
|
||||
before :all do
|
||||
require 'hermann_lib'
|
||||
require 'hermann_rdkafka'
|
||||
end
|
||||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
|
||||
subject(:producer) { Hermann::Provider::RDKafka::Producer.new(brokers) }
|
||||
let(:timeout) { 3000 }
|
||||
|
||||
it { should respond_to :push_single }
|
||||
|
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
let(:brokers) { 'localhost:13337' }
|
||||
|
||||
it 'should error after attempting to connect' do |example|
|
||||
producer.push_single(example.full_description, 'test-topic', nil)
|
||||
producer.push_single(example.full_description, 'test-topic', '', nil)
|
||||
begin
|
||||
producer.tick(timeout)
|
||||
rescue StandardError => ex
|
||||
|
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
|
||||
describe '#push_single', :type => :integration do
|
||||
let(:message) { |example| example.full_description }
|
||||
subject(:push) { producer.push_single(message, topic, nil) }
|
||||
subject(:push) { producer.push_single(message, topic, '', nil) }
|
||||
|
||||
it 'should return' do
|
||||
expect(push).not_to be_nil
|
||||
|
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
|
||||
context 'with a single queued request' do
|
||||
before :each do
|
||||
producer.push_single('hello', topic, nil)
|
||||
producer.push_single('hello', topic, '', nil)
|
||||
end
|
||||
|
||||
it 'should return successfully' do
|
||||
|
|
|
@ -11,10 +11,11 @@ describe Hermann::Producer do
|
|||
describe '#initialize' do
|
||||
context 'with C ruby', :platform => :mri do
|
||||
it 'joins broker array' do
|
||||
expect(Hermann::Lib::Producer).to receive(:new).with(brokers.first)
|
||||
expect(Hermann::Provider::RDKafka::Producer).to receive(:new).with(brokers.first)
|
||||
expect(producer).to be_a Hermann::Producer
|
||||
end
|
||||
end
|
||||
|
||||
context 'with Java', :platform => :java do
|
||||
it 'joins broker array' do
|
||||
expect(Hermann::Provider::JavaProducer).to receive(:new).with(brokers.first, opts)
|
||||
|
@ -42,34 +43,34 @@ describe Hermann::Producer do
|
|||
|
||||
context 'without topic passed' do
|
||||
it 'uses initialized topic' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, anything, anything)
|
||||
producer.push(msg)
|
||||
end
|
||||
end
|
||||
context 'without topic passed', :platform => :java do
|
||||
it 'uses initialized topic and does not have a partition key' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, nil, anything)
|
||||
producer.push(msg)
|
||||
end
|
||||
end
|
||||
context 'with topic passed' do
|
||||
it 'can change topic' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything)
|
||||
producer.push(msg, :topic => passed_topic)
|
||||
end
|
||||
|
||||
context 'and an array of messags' do
|
||||
it 'should propagate the topic' do
|
||||
messages = 3.times.map { |i| msg }
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything).exactly(messages.size).times
|
||||
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything, anything).exactly(messages.size).times
|
||||
producer.push(messages, :topic => passed_topic)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'with explicit partition key', :platform => :java do
|
||||
context 'with explicit partition key' do
|
||||
it 'uses the partition key' do
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
|
||||
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key, anything)
|
||||
producer.push(msg, :partition_key => partition_key)
|
||||
end
|
||||
end
|
||||
|
@ -177,7 +178,7 @@ describe Hermann::Producer do
|
|||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(v, topic, anything, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
|
@ -190,7 +191,7 @@ describe Hermann::Producer do
|
|||
|
||||
describe '#tick_reactor' do
|
||||
let(:timeout) { 0 }
|
||||
let(:internal) { double('Hermann::Lib::Producer mock') }
|
||||
let(:internal) { double('Hermann::Provider::RDKafka::Producer mock') }
|
||||
subject(:tick) { producer.tick_reactor(timeout) }
|
||||
|
||||
before :each do
|
||||
|
|
|
@ -8,11 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { '0:1337'}
|
||||
let(:opts) { {} }
|
||||
let(:part_key) { "key".to_java }
|
||||
let(:part_key) { "key" }
|
||||
let(:msg) { "bar" }
|
||||
|
||||
describe '#push_single' do
|
||||
subject(:result) { producer.push_single(msg, topic, nil) }
|
||||
subject(:result) { producer.push_single(msg, topic, nil, nil) }
|
||||
|
||||
let(:passed_topic) { 'foo' }
|
||||
|
||||
|
@ -22,18 +22,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
|
||||
it 'can change topic' do
|
||||
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
|
||||
producer.push_single(msg, passed_topic, nil).wait(1)
|
||||
producer.push_single(msg, passed_topic, nil, nil).wait(1)
|
||||
end
|
||||
|
||||
it 'can change partition key' do
|
||||
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
|
||||
producer.push_single(msg, passed_topic, part_key).wait(1)
|
||||
producer.push_single(msg, passed_topic, part_key, nil).wait(1)
|
||||
end
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should be rejected' do
|
||||
promise = producer.push_single('rspec', topic, nil).wait(1)
|
||||
promise = producer.push_single('rspec', topic, nil, nil).wait(1)
|
||||
expect(promise).to be_rejected
|
||||
expect { promise.value! }.to raise_error
|
||||
end
|
||||
|
|
|
@ -2,6 +2,13 @@ require 'rubygems'
|
|||
require 'yaml'
|
||||
require 'rspec'
|
||||
|
||||
require 'simplecov'
|
||||
require 'simplecov-rcov'
|
||||
|
||||
SimpleCov.start do
|
||||
formatter = SimpleCov::Formatter::RcovFormatter
|
||||
end
|
||||
|
||||
# Add ext/ to the load path so we can load `hermann_lib`
|
||||
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))
|
||||
$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../lib/'))
|
||||
|
|
Loading…
Reference in New Issue