Merge remote-tracking branch 'upstream/master' into cleanup_warnings

This commit is contained in:
Ben Osheroff 2015-06-16 23:44:13 -07:00
commit 09df7ec1b9
8 changed files with 307 additions and 84 deletions

View File

@ -5,8 +5,8 @@ rvm:
- 1.9.3
- jruby
- ree
deploy:
provider: rubygems
gemspec: hermann.gemspec
api_key:
secure: NQDoSKjV0bs2MSZHHwP6gsG3a8JXCCT5nCHiggTvVV2vvFS9WCyBtMDY3WxQzAU/Zbt+FcPobOvbd53HW5hQYkDOpc84j/utVyBBZCtew0wjEY+Z18ygr+oUQtoALoaRh+cr3MUEFA1Q68fsLlzpRH4M6ZQxbUNOQtwNHgLaZco=
#deploy:
# provider: rubygems
# gemspec: hermann.gemspec
# api_key:
# secure: NQDoSKjV0bs2MSZHHwP6gsG3a8JXCCT5nCHiggTvVV2vvFS9WCyBtMDY3WxQzAU/Zbt+FcPobOvbd53HW5hQYkDOpc84j/utVyBBZCtew0wjEY+Z18ygr+oUQtoALoaRh+cr3MUEFA1Q68fsLlzpRH4M6ZQxbUNOQtwNHgLaZco=

View File

@ -95,6 +95,25 @@ the_consumer.consume(new_topic) do |msg| # can change topic with optional argu
end
```
### Metadata request (MRI-only)
Topic and cluster metadata may be retrieved in the MRI version by querying the Kafka brokers.
```ruby
require 'hermann'
require 'hermann/discovery/metadata'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
topic = c.topic("topic")
puts topic.partitions.first
consumers = topic.partitions.map do |partition|
partition.consumer
end
```
#### Testing
To run the integration tests:

View File

@ -122,10 +122,9 @@ class RdKafkaRecipe < MiniPortile
end
################################################################################
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.4')
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.6')
librdkafka.files = ["https://github.com/edenhill/librdkafka/archive/#{librdkafka.version}.tar.gz"]
librdkafka.checksum = '28a3252fd0f31d4a38bea9cd25083a06'
librdkafka.patch_files = Dir["#{File.join(BASE_DIR, 'ext', 'patches', 'librdkafka')}/*.patch"]
librdkafka.checksum = '1b77543f9be82d3f700c0ef98f494990'
checkpoint = ".librdkafka.#{librdkafka.version}.cooked"
unless File.exists?(checkpoint)

View File

@ -265,6 +265,7 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
// Yield the data to the Consumer's block
if (rb_block_given_p()) {
VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len);
rd_kafka_message_destroy(rkmessage);
rb_yield(value);
}
else {
@ -394,11 +395,15 @@ static void *consumer_recv_msg(void *ptr)
* after every message, to see if the ruby interpreter wants us to exit the
* loop.
*
* @param HermannInstanceConfig* The hermann configuration for this consumer
* @param self The consumer instance
*/
static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
static VALUE consumer_consume_loop(VALUE self) {
HermannInstanceConfig* consumerConfig;
rd_kafka_message_t *msg;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
TRACER("\n");
while (consumerConfig->run) {
@ -418,9 +423,23 @@ static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
if ( msg ) {
msg_consume(msg, consumerConfig);
rd_kafka_message_destroy(msg);
}
}
return Qnil;
}
/**
* consumer_consume_loop_stop
*
* called when we're done with the .consume() loop. lets rdkafa cleanup some internal structures
*/
static VALUE consumer_consume_loop_stop(VALUE self) {
HermannInstanceConfig* consumerConfig;
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
}
/**
@ -457,12 +476,7 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
return Qnil;
}
consumer_consume_loop(consumerConfig);
/* Stop consuming */
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
return Qnil;
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}
@ -678,13 +692,50 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
return rb_int_new(events);
}
/*
* producer_metadata_request_nogvl
*
* call rd_kafka_metadata without the GVL held. Note that rd_kafka_metadata is not interruptible,
* so in case of interrupt the thread will not respond until timeout_ms is reached.
*
* rd_kafka_metadata will fill in the ctx->data pointer on success
*
* @param ptr void* the hermann_metadata_ctx_t
*/
static void *producer_metadata_request_nogvl(void *ptr)
{
hermann_metadata_ctx_t *ctx = (hermann_metadata_ctx_t*)ptr;
return (void *) rd_kafka_metadata(ctx->rk,
ctx->topic ? 0 : 1,
ctx->topic,
&(ctx->data),
ctx->timeout_ms);
}
static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
{
int err;
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
NULL, NULL);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
err = (int) rb_thread_call_without_gvl(producer_metadata_request_nogvl, ctx, NULL, NULL);
#else
err = (int) producer_metadata_request_nogvl(ctx);
#endif
return err;
}
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
int timeout_ms = rb_num2int(timeout);
struct rd_kafka_metadata *data = NULL;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
@ -692,17 +743,19 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producer_init_kafka(self, producerConfig);
}
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
(const struct rd_kafka_metadata **)&data,
timeout_ms);
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.data = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,
data->topic_cnt);
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
@ -710,11 +763,118 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
if ( md_context.data )
rd_kafka_metadata_destroy(md_context.data);
return result;
}
/*
* producer_metadata_make_hash
*
* transform the rd_kafka_metadata structure into a ruby hash. eg:
* { :brokers => [ {:id=>0, :host=>"172.20.10.3", :port=>9092} ],
* :topics => { "maxwell" => [ {:id=>0, :leader_id=>0, :replica_ids=>[0], :isr_ids=>[0]}]} }
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata_make_hash(struct rd_kafka_metadata *data)
{
int i, j, k;
VALUE broker_hash, topic_hash, partition_ary, partition_hash, partition_replica_ary, partition_isr_ary;
VALUE hash = rb_hash_new();
VALUE brokers = rb_ary_new2(data->broker_cnt);
VALUE topics = rb_hash_new();
for ( i = 0; i < data->broker_cnt; i++ ) {
broker_hash = rb_hash_new();
rb_hash_aset(broker_hash, ID2SYM(rb_intern("id")), INT2FIX(data->brokers[i].id));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("host")), rb_str_new2(data->brokers[i].host));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("port")), INT2FIX(data->brokers[i].port));
rb_ary_push(brokers, broker_hash);
}
for ( i = 0; i < data->topic_cnt; i++ ) {
partition_ary = rb_ary_new2(data->topics[i].partition_cnt);
for ( j = 0 ; j < data->topics[i].partition_cnt ; j++ ) {
VALUE partition_hash = rb_hash_new();
rd_kafka_metadata_partition_t *partition = &(data->topics[i].partitions[j]);
/* id => 1, leader_id => 0 */
rb_hash_aset(partition_hash, ID2SYM(rb_intern("id")), INT2FIX(partition->id));
rb_hash_aset(partition_hash, ID2SYM(rb_intern("leader_id")), INT2FIX(partition->leader));
/* replica_ids => [1, 0] */
partition_replica_ary = rb_ary_new2(partition->replica_cnt);
for ( k = 0 ; k < partition->replica_cnt ; k++ ) {
rb_ary_push(partition_replica_ary, INT2FIX(partition->replicas[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("replica_ids")), partition_replica_ary);
/* isr_ids => [1, 0] */
partition_isr_ary = rb_ary_new2(partition->isr_cnt);
for ( k = 0 ; k < partition->isr_cnt ; k++ ) {
rb_ary_push(partition_isr_ary, INT2FIX(partition->isrs[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("isr_ids")), partition_isr_ary);
rb_ary_push(partition_ary, partition_hash);
}
rb_hash_aset(topics, rb_str_new2(data->topics[i].topic), partition_ary);
}
rb_hash_aset(hash, ID2SYM(rb_intern("brokers")), brokers);
rb_hash_aset(hash, ID2SYM(rb_intern("topics")), topics);
return hash;
}
/*
* producer_metadata
*
* make a metadata request to the kafka server, returning a hash
* containing a list of brokers and topics.
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
@ -1082,4 +1242,7 @@ void Init_hermann_lib() {
/* Producer.connect */
rb_define_method(c_producer, "connect", producer_connect, 1);
/* Producer.metadata */
rb_define_method(c_producer, "metadata", producer_metadata, 2);
}

View File

@ -113,4 +113,11 @@ typedef struct {
VALUE result;
} hermann_push_ctx_t;
typedef struct {
rd_kafka_t *rk;
rd_kafka_topic_t *topic;
struct rd_kafka_metadata *data;
int timeout_ms;
} hermann_metadata_ctx_t;
#endif

View File

@ -1,57 +0,0 @@
From 888ca33b571d99e877d665235b822f7c961c8fdb Mon Sep 17 00:00:00 2001
From: "R. Tyler Croy" <tyler@monkeypox.org>
Date: Thu, 28 Aug 2014 16:24:04 -0700
Subject: [PATCH 6/8] Update some headers to include the right headers to build
on FreeBSD
---
src/rd.h | 9 +++++++++
src/rdaddr.h | 4 ++++
2 files changed, 13 insertions(+)
diff --git a/src/rd.h b/src/rd.h
index c31501e..4789493 100644
--- a/src/rd.h
+++ b/src/rd.h
@@ -37,7 +37,11 @@
#include <errno.h>
#include <time.h>
#include <sys/time.h>
+
+#ifndef __FreeBSD__
+/* alloca(3) is in stdlib on FreeBSD */
#include <alloca.h>
+#endif
#include <assert.h>
#include <pthread.h>
@@ -110,6 +114,11 @@
# endif
#endif /* sun */
+#ifdef __FreeBSD__
+/* FreeBSD defines be64toh() in sys/endian.h */
+#include <sys/endian.h>
+#endif
+
#ifndef be64toh
#ifndef __APPLE__
#ifndef sun
diff --git a/src/rdaddr.h b/src/rdaddr.h
index 0b37354..e55bd55 100644
--- a/src/rdaddr.h
+++ b/src/rdaddr.h
@@ -32,6 +32,10 @@
#include <arpa/inet.h>
#include <netdb.h>
+#ifdef __FreeBSD__
+#include <sys/socket.h>
+#endif
+
/**
* rd_sockaddr_inx_t is a union for either ipv4 or ipv6 sockaddrs.
* It provides conveniant abstraction of AF_INET* agnostic operations.
--
1.9.0

View File

@ -0,0 +1,77 @@
require 'hermann_lib'
require 'hermann/consumer'
module Hermann
module Discovery
class Metadata
Broker = Struct.new(:id, :host, :port) do
def to_s
"#{host}:#{port}"
end
end
Topic = Struct.new(:name, :partitions)
Partition = Struct.new(:id, :leader, :replicas, :insync_replicas, :topic_name) do
def consumer(offset=:end)
Hermann::Consumer.new(topic_name, brokers: ([leader] + replicas).join(','), partition: id, offset: offset)
end
end
DEFAULT_TIMEOUT_MS = 2_000
def initialize(brokers, options = {})
raise "this is an MRI api only!" if Hermann.jruby?
@internal = Hermann::Lib::Producer.new(brokers)
@timeout = options[:timeout] || DEFAULT_TIMEOUT_MS
end
#
# @internal.metadata returns:
# {:brokers => [{:id=>3, :host=>"kafka3.alpha4.sac1.zdsys.com", :port=>9092}],
# :topics => {"testtopic"=>[{:id=>0, :leader_id=>3, :replica_ids=>[3, 1], :isr_ids=>[3, 1]}}}
#
def brokers
brokers_from_metadata(@internal.metadata(nil, @timeout))
end
def topic(t)
get_topics(t)[t]
end
def topics
get_topics
end
private
def get_topics(filter_topics = nil)
md = @internal.metadata(filter_topics, @timeout)
broker_hash = brokers_from_metadata(md).inject({}) do |h, broker|
h[broker.id] = broker
h
end
md[:topics].inject({}) do |topic_hash, arr|
topic_name, raw_partitions = *arr
partitions = raw_partitions.map do |p|
leader = broker_hash[p[:leader_id]]
all_replicas = p[:replica_ids].map { |i| broker_hash[i] }
isr_replicas = p[:isr_ids].map { |i| broker_hash[i] }
Partition.new(p[:id], leader, all_replicas, isr_replicas, topic_name)
end
topic_hash[topic_name] = Topic.new(topic_name, partitions)
topic_hash
end
end
def brokers_from_metadata(md)
md[:brokers].map do |h|
Broker.new(h[:id], h[:host], h[:port])
end
end
end
end
end

15
scripts/metadata_mri.rb Normal file
View File

@ -0,0 +1,15 @@
require 'bundler/setup'
require 'hermann'
require 'hermann/discovery/metadata'
require 'hermann/consumer'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
c.topic("maxwell")
puts c.topic("maxwell").inspect
puts c.brokers.inspect
consumer = Hermann::Consumer.new("maxwell", brokers: "localhost:9092, localhost:9092", partition: c.topic('maxwell').partitions.first.id, offset: :start)
consumer.consume do |c|
puts c
end