From c83dcd31f41bf62659d58d7466a938d0c3a4c34e Mon Sep 17 00:00:00 2001 From: Stan Campbell Date: Tue, 19 Aug 2014 13:48:24 -0700 Subject: [PATCH] Add bulletproofing to better protect during allocation/init/free. JIRA: PART-3886 --- ext/hermann_lib.c | 197 +++++++++++++++++++++++++++++++++++++++++++--- ext/hermann_lib.h | 2 + hermann.gemspec | 12 ++- 3 files changed, 200 insertions(+), 11 deletions(-) diff --git a/ext/hermann_lib.c b/ext/hermann_lib.c index a6089f7..5e91082 100644 --- a/ext/hermann_lib.c +++ b/ext/hermann_lib.c @@ -46,6 +46,50 @@ void log_debug(char* msg) { } } +/** + * Convenience function + * + * @param config HermannInstanceConfig + * @param outputStream FILE* + * + * Log the contents of the configuration to the provided stream. + */ +void fprintf_hermann_instance_config(HermannInstanceConfig* config, FILE* outputStream) { + + const char* topic; + const char* brokers; + int isRkSet; + int isRktSet; + int partition; + int isInitialized; + + if(config==NULL) { + fprintf(outputStream, "NULL configuration"); + } else { + + isRkSet = config->rk != NULL; + isRktSet = config->rkt != NULL; + + if(config->topic == NULL) { + topic = NULL; + } else { + topic = config->topic; + } + + if(config->brokers == NULL) { + brokers = "NULL"; + } else { + brokers = config->brokers; + } + + partition = config->partition; + isInitialized = config->isInitialized; + } + + fprintf(outputStream, "{ topic: %s, brokers: %s, partition: %d, isInitialized: %d, rkSet: %d, rkTSet: %d }\n", + topic, brokers, partition, isInitialized, isRkSet, isRktSet ); +} + /** * Message delivery report callback. * Called once for each message. @@ -226,7 +270,11 @@ static void logger (const rd_kafka_t *rk, int level, * * @param config HermannInstanceConfig* pointer to the instance configuration for this producer or consumer */ -void consumer_init_kafka(HermannInstanceConfig* config) { +void consumer_init_kafka(HermannInstanceConfig* config) +{ +#ifdef TRACE + fprintf(stderr, "consumer_init_kafka"); +#endif config->quiet = !isatty(STDIN_FILENO); @@ -271,6 +319,10 @@ void consumer_init_kafka(HermannInstanceConfig* config) { static void consumer_consume_stop_callback(void *ptr) { HermannInstanceConfig* config = (HermannInstanceConfig*)ptr; +#ifdef TRACE + fprintf(stderr, "consumer_consume_stop_callback"); +#endif + config->run = 0; } @@ -280,6 +332,10 @@ static void consumer_consume_stop_callback(void *ptr) { */ void consumer_consume_loop(HermannInstanceConfig* consumerConfig) { +#ifdef TRACE + fprintf(stderr, "consumer_consume_loop"); +#endif + while (consumerConfig->run) { rd_kafka_message_t *rkmessage; @@ -304,6 +360,10 @@ static VALUE consumer_consume(VALUE self) { HermannInstanceConfig* consumerConfig; +#ifdef TRACE + fprintf(stderr, "consumer_consume"); +#endif + Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); if(consumerConfig->topic==NULL) { @@ -352,6 +412,10 @@ static VALUE consumer_consume(VALUE self) { */ void producer_init_kafka(HermannInstanceConfig* config) { +#ifdef TRACE + fprintf(stderr, "producer_init_kafka\n"); +#endif + config->quiet = !isatty(STDIN_FILENO); /* Kafka configuration */ @@ -389,6 +453,11 @@ void producer_init_kafka(HermannInstanceConfig* config) { /* We're now initialized */ config->isInitialized = 1; + +#ifdef TRACE + fprintf(stderr, "producer_init_kafka::END\n"); + fprintf_hermann_instance_config(config, stderr); +#endif } /** @@ -402,6 +471,10 @@ static VALUE producer_push_single(VALUE self, VALUE message) { HermannInstanceConfig* producerConfig; char buf[2048]; +#ifdef TRACE + fprintf(stderr, "producer_push_single\n"); +#endif + Data_Get_Struct(self, HermannInstanceConfig, producerConfig); if(producerConfig->topic==NULL) { @@ -420,6 +493,13 @@ static VALUE producer_push_single(VALUE self, VALUE message) { if (buf[len-1] == '\n') buf[--len] = '\0'; +#ifdef TRACE + fprintf(stderr, "producer_push_single::before_produce message1\n"); + fprintf_hermann_instance_config(producerConfig, stderr); + fprintf(stderr, "producer_push_single::before_produce_message2\n"); + fflush(stderr); +#endif + /* Send/Produce message. */ if (rd_kafka_produce(producerConfig->rkt, producerConfig->partition, RD_KAFKA_MSG_F_COPY, /* Payload and length */ @@ -442,6 +522,10 @@ static VALUE producer_push_single(VALUE self, VALUE message) { /* Must poll to handle delivery reports */ rd_kafka_poll(producerConfig->rk, 0); +#ifdef TRACE + fprintf(stderr, "producer_push_single::prior return\n"); +#endif + return self; } @@ -459,6 +543,10 @@ static VALUE producer_push_array(VALUE self, int length, VALUE array) { int i; VALUE message; +#ifdef TRACE + fprintf(stderr, "producer_push_array\n"); +#endif + for(i=0;irkt); +#ifdef TRACE + fprintf(stderr, "consumer_free\n"); +#endif - rd_kafka_destroy(config->rk); + // the p *should* contain a pointer to the consumerConfig which also must be freed + if(config->rkt != NULL) { + rd_kafka_topic_destroy(config->rkt); + } + + if(config->rk != NULL) { + rd_kafka_destroy(config->rk); + } // clean up the struct free(config); @@ -516,8 +616,31 @@ static void consumer_free(void * p) { static VALUE consumer_allocate(VALUE klass) { VALUE obj; + HermannInstanceConfig* consumerConfig; + +#ifdef TRACE + fprintf(stderr, "consumer_free\n"); +#endif + + consumerConfig = ALLOC(HermannInstanceConfig); + + // Make sure it's initialized + consumerConfig->topic = NULL; + consumerConfig->rk = NULL; + consumerConfig->rkt = NULL; + consumerConfig->brokers = NULL; + consumerConfig->partition = -1; + consumerConfig->topic_conf = NULL; + consumerConfig->errstr[0] = 0; + consumerConfig->conf = NULL; + consumerConfig->debug = NULL; + consumerConfig->start_offset = -1; + consumerConfig->do_conf_dump = -1; + consumerConfig->run = 0; + consumerConfig->exit_eof = 0; + consumerConfig->quiet = 0; + consumerConfig->isInitialized = 0; - HermannInstanceConfig* consumerConfig = ALLOC(HermannInstanceConfig); obj = Data_Wrap_Struct(klass, 0, consumer_free, consumerConfig); return obj; @@ -542,6 +665,10 @@ static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, VALUE p char* brokersPtr; int partitionNo; +#ifdef TRACE + fprintf(stderr, "consumer_initialize\n"); +#endif + topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); partitionNo = FIX2INT(partition); @@ -570,6 +697,10 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) { HermannInstanceConfig* orig_config; HermannInstanceConfig* copy_config; +#ifdef TRACE + fprintf(stderr, "consumer_init_copy\n"); +#endif + if(copy == orig) { return copy; } @@ -596,13 +727,27 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) { */ static void producer_free(void * p) { - HermannInstanceConfig* config = (HermannInstanceConfig *)p; + HermannInstanceConfig* config; + +#ifdef TRACE + fprintf(stderr, "producer_free\n"); +#endif + + config = (HermannInstanceConfig *)p; + + if(NULL==p) { + return; + } // Clean up the topic - rd_kafka_topic_destroy(config->rkt); + if(config->rkt != NULL) { + rd_kafka_topic_destroy(config->rkt); + } // Take care of the producer instance - rd_kafka_destroy(config->rk); + if(config->rk != NULL) { + rd_kafka_destroy(config->rk); + } // Free the struct free(config); @@ -618,8 +763,30 @@ static void producer_free(void * p) { static VALUE producer_allocate(VALUE klass) { VALUE obj; + HermannInstanceConfig* producerConfig; + +#ifdef TRACE + fprintf(stderr, "producer_allocate\n"); +#endif + + producerConfig = ALLOC(HermannInstanceConfig); + + producerConfig->topic = NULL; + producerConfig->rk = NULL; + producerConfig->rkt = NULL; + producerConfig->brokers = NULL; + producerConfig->partition = -1; + producerConfig->topic_conf = NULL; + producerConfig->errstr[0] = 0; + producerConfig->conf = NULL; + producerConfig->debug = NULL; + producerConfig->start_offset = -1; + producerConfig->do_conf_dump = -1; + producerConfig->run = 0; + producerConfig->exit_eof = 0; + producerConfig->quiet = 0; + producerConfig->isInitialized = 0; - HermannInstanceConfig* producerConfig = ALLOC(HermannInstanceConfig); obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig); return obj; @@ -640,6 +807,10 @@ static VALUE producer_initialize(VALUE self, VALUE topic, VALUE brokers) { char* topicPtr; char* brokersPtr; +#ifdef TRACE + fprintf(stderr, "producer_initialize\n"); +#endif + topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); Data_Get_Struct(self, HermannInstanceConfig, producerConfig); @@ -669,6 +840,10 @@ static VALUE producer_init_copy(VALUE copy, VALUE orig) { HermannInstanceConfig* orig_config; HermannInstanceConfig* copy_config; +#ifdef TRACE + fprintf(stderr, "producer_init_copy\n"); +#endif + if(copy == orig) { return copy; } @@ -695,6 +870,10 @@ static VALUE producer_init_copy(VALUE copy, VALUE orig) { */ void Init_hermann_lib() { +#ifdef TRACE + fprintf(stderr, "init_hermann_lib\n"); +#endif + /* Define the module */ m_hermann = rb_define_module("Hermann"); diff --git a/ext/hermann_lib.h b/ext/hermann_lib.h index b5e4774..580ea3a 100644 --- a/ext/hermann_lib.h +++ b/ext/hermann_lib.h @@ -42,6 +42,8 @@ #include +#undef TRACE + // Holds the defined Ruby module for Hermann static VALUE m_hermann; diff --git a/hermann.gemspec b/hermann.gemspec index bf40933..ae37327 100644 --- a/hermann.gemspec +++ b/hermann.gemspec @@ -1,6 +1,6 @@ SPEC = Gem::Specification.new do |s| s.name = "hermann" - s.version = "0.11" + s.version = "0.13" s.default_executable = "hermann" s.authors = ["Stan Campbell"] @@ -15,8 +15,16 @@ SPEC = Gem::Specification.new do |s| s.require_paths = ["lib", "ext"] s.rubygems_version = %q{2.2.2} s.summary = %q{The Kafka consumer is based on the librdkafka C library.} + s.licenses = ['MIT'] s.platform = Gem::Platform::CURRENT - s.specification_version = 3 if s.respond_to?(:specification_version) + if s.respond_to? :specification_version then + s.specification_version = 3 + + if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then + else + end + else + end end