Add metadata API to Hermann::Discovery module

This commit is contained in:
Ben Osheroff 2015-05-19 11:13:27 -07:00
parent a362ce153e
commit e38b626b71
4 changed files with 187 additions and 10 deletions

View File

@ -672,13 +672,50 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
return rb_int_new(events);
}
/*
* producer_metadata_request_nogvl
*
* call rd_kafka_metadata without the GVL held. Note that rd_kafka_metadata is not interruptible,
* so in case of interrupt the thread will not respond until timeout_ms is reached.
*
* rd_kafka_metadata will fill in the ctx->data pointer on success
*
* @param ptr void* the hermann_metadata_ctx_t
*/
static void *producer_metadata_request_nogvl(void *ptr)
{
hermann_metadata_ctx_t *ctx = (hermann_metadata_ctx_t*)ptr;
return (void *) rd_kafka_metadata(ctx->rk,
ctx->topic ? 0 : 1,
ctx->topic,
&(ctx->data),
ctx->timeout_ms);
}
static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
{
int err;
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
NULL, NULL);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
err = (int) rb_thread_call_without_gvl(producer_metadata_request_nogvl, ctx, NULL, NULL);
#else
err = (int) producer_metadata_request_nogvl(ctx);
#endif
return err;
}
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
int timeout_ms = rb_num2int(timeout);
struct rd_kafka_metadata *data = NULL;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
@ -686,17 +723,18 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producer_init_kafka(self, producerConfig);
}
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
&data,
timeout_ms);
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,
data->topic_cnt);
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
@ -704,11 +742,117 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
/*
* producer_metadata_make_hash
*
* transform the rd_kafka_metadata structure into a ruby hash. eg:
* { :brokers => [ {:id=>0, :host=>"172.20.10.3", :port=>9092} ],
* :topics => { "maxwell" => [ {:id=>0, :leader_id=>0, :replica_ids=>[0], :isr_ids=>[0]}]} }
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata_make_hash(struct rd_kafka_metadata *data)
{
int i, j, k;
VALUE broker_hash, topic_hash, partition_ary, partition_hash, partition_replica_ary, partition_isr_ary;
VALUE hash = rb_hash_new();
VALUE brokers = rb_ary_new2(data->broker_cnt);
VALUE topics = rb_hash_new();
for ( i = 0; i < data->broker_cnt; i++ ) {
broker_hash = rb_hash_new();
rb_hash_aset(broker_hash, ID2SYM(rb_intern("id")), INT2FIX(data->brokers[i].id));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("host")), rb_str_new2(data->brokers[i].host));
rb_hash_aset(broker_hash, ID2SYM(rb_intern("port")), INT2FIX(data->brokers[i].port));
rb_ary_push(brokers, broker_hash);
}
for ( i = 0; i < data->topic_cnt; i++ ) {
partition_ary = rb_ary_new2(data->topics[i].partition_cnt);
for ( j = 0 ; j < data->topics[i].partition_cnt ; j++ ) {
VALUE partition_hash = rb_hash_new();
rd_kafka_metadata_partition_t *partition = &(data->topics[i].partitions[i]);
/* id => 1, leader_id => 0 */
rb_hash_aset(partition_hash, ID2SYM(rb_intern("id")), INT2FIX(partition->id));
rb_hash_aset(partition_hash, ID2SYM(rb_intern("leader_id")), INT2FIX(partition->leader));
/* replica_ids => [1, 0] */
partition_replica_ary = rb_ary_new2(partition->replica_cnt);
for ( k = 0 ; k < partition->replica_cnt ; k++ ) {
rb_ary_push(partition_replica_ary, INT2FIX(partition->replicas[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("replica_ids")), partition_replica_ary);
/* isr_ids => [1, 0] */
partition_isr_ary = rb_ary_new2(partition->isr_cnt);
for ( k = 0 ; k < partition->isr_cnt ; k++ ) {
rb_ary_push(partition_isr_ary, INT2FIX(partition->isrs[k]));
}
rb_hash_aset(partition_hash, ID2SYM(rb_intern("isr_ids")), partition_isr_ary);
rb_ary_push(partition_ary, partition_hash);
}
rb_hash_aset(topics, rb_str_new2(data->topics[i].topic), partition_ary);
}
rb_hash_aset(hash, ID2SYM(rb_intern("brokers")), brokers);
rb_hash_aset(hash, ID2SYM(rb_intern("topics")), topics);
return hash;
}
/*
* producer_metadata
*
* make a metadata request to the kafka server, returning a hash
* containing a list of brokers and topics.
*
* @param data struct rd_kafka_metadata* data returned from rd_kafka_metadata
*/
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
@ -1076,4 +1220,7 @@ void Init_hermann_lib() {
/* Producer.connect */
rb_define_method(c_producer, "connect", producer_connect, 1);
/* Producer.metadata */
rb_define_method(c_producer, "metadata", producer_metadata, 2);
}

View File

@ -113,4 +113,11 @@ typedef struct {
VALUE result;
} hermann_push_ctx_t;
typedef struct {
rd_kafka_t *rk;
rd_kafka_topic_t *topic;
struct rd_kafka_metadata *data;
int timeout_ms;
} hermann_metadata_ctx_t;
#endif

View File

@ -0,0 +1,16 @@
require 'hermann_lib'
module Hermann
module Discovery
class Metadata
def initialize(brokers)
raise "this is an MRI api only!" if Hermann.jruby?
@internal = Hermann::Lib::Producer.new(brokers)
end
def get_topics
@internal.metadata(nil, 200)
end
end
end
end

7
scripts/metadata_mri.rb Normal file
View File

@ -0,0 +1,7 @@
require 'bundler/setup'
require 'hermann'
require 'hermann/discovery/metadata'
c = Hermann::Discovery::Metadata.new( "localhost:9092" )
puts c.get_topics.inspect