/* * hermann_lib.c - Ruby wrapper for the librdkafka library * * Copyright (c) 2014 Stan Campbell * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /* Much of the librdkafka library calls were lifted from rdkafka_example.c */ #include "hermann_lib.h" /** * Utility functions */ /** * Convenience function * * @param msg char* the string to be logged under debugging. */ void log_debug(char* msg) { if(DEBUG) { fprintf(stderr, "%s\n", msg); } } /** * Convenience function * * @param config HermannInstanceConfig * @param outputStream FILE* * * Log the contents of the configuration to the provided stream. */ void fprintf_hermann_instance_config(HermannInstanceConfig* config, FILE* outputStream) { const char* topic; const char* brokers; int isRkSet; int isRktSet; int partition; int isInitialized; if(config==NULL) { fprintf(outputStream, "NULL configuration"); } else { isRkSet = config->rk != NULL; isRktSet = config->rkt != NULL; if(config->topic == NULL) { topic = NULL; } else { topic = config->topic; } if(config->brokers == NULL) { brokers = "NULL"; } else { brokers = config->brokers; } partition = config->partition; isInitialized = config->isInitialized; } fprintf(outputStream, "{ topic: %s, brokers: %s, partition: %d, isInitialized: %d, rkSet: %d, rkTSet: %d }\n", topic, brokers, partition, isInitialized, isRkSet, isRktSet ); } /** * Message delivery report callback. * Called once for each message. * * @param rk rd_kafka_t* instance of producer or consumer * @param payload void* the payload of the message * @param len size_t the length of the payload in bytes * @param error_code int * @param opaque void* optional context * @param msg_opaque void* it's opaque */ static void msg_delivered (rd_kafka_t *rk, void *payload, size_t len, int error_code, void *opaque, void *msg_opaque) { if (error_code) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(error_code)); } /** * Producer partitioner callback. * Used to determine the target partition within a topic for production. * * Returns an integer partition number or RD_KAFKA_PARTITION_UA if no * available partition could be determined. * * @param rkt rd_kafka_topic_t* the topic * @param keydata void* key information for calculating the partition * @param keylen size_t key size * @param partition_cnt int32_t the count of the number of partitions * @param rkt_opaque void* opaque topic info * @param msg_opaque void* opaque message info */ static int32_t producer_paritioner_callback( const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) { /* Pick a random partition */ int retry; for(retry=0;retryerr) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { fprintf(stderr, "%% Consumer reached end of %s [%"PRId32"] " "message queue at offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); if (cfg->exit_eof) cfg->run = 0; return; } fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] " "offset %"PRId64": %s\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); return; } if (DEBUG && rkmessage->key_len) { if (output == OUTPUT_HEXDUMP) hexdump(stdout, "Message Key", rkmessage->key, rkmessage->key_len); else printf("Key: %.*s\n", (int)rkmessage->key_len, (char *)rkmessage->key); } if (output == OUTPUT_HEXDUMP) { if(DEBUG) hexdump(stdout, "Message Payload", rkmessage->payload, rkmessage->len); } else { if(DEBUG) printf("%.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); } // Yield the data to the Consumer's block if(rb_block_given_p()) { VALUE value = rb_str_new((char *)rkmessage->payload, rkmessage->len); rb_yield(value); } else { if(DEBUG) fprintf(stderr, "No block given\n"); // todo: should this be an error? } } /** * logger * * Kafka logger callback (optional) * * todo: introduce better logging * * @param rk rd_kafka_t the producer or consumer * @param level int the log level * @param fac char* something of which I am unaware * @param buf char* the log message */ static void logger (const rd_kafka_t *rk, int level, const char *fac, const char *buf) { struct timeval tv; gettimeofday(&tv, NULL); fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, (int)(tv.tv_usec / 1000), level, fac, rd_kafka_name(rk), buf); } /** * consumer_init_kafka * * Initialize the Kafka context and instantiate a consumer. * * @param config HermannInstanceConfig* pointer to the instance configuration for this producer or consumer */ void consumer_init_kafka(HermannInstanceConfig* config) { #ifdef TRACE fprintf(stderr, "consumer_init_kafka"); #endif config->quiet = !isatty(STDIN_FILENO); /* Kafka configuration */ config->conf = rd_kafka_conf_new(); /* Topic configuration */ config->topic_conf = rd_kafka_topic_conf_new(); /* Create Kafka handle */ if (!(config->rk = rd_kafka_new(RD_KAFKA_CONSUMER, config->conf, config->errstr, sizeof(config->errstr)))) { fprintf(stderr, "%% Failed to create new consumer: %s\n", config->errstr); exit(1); } /* Set logger */ rd_kafka_set_logger(config->rk, logger); rd_kafka_set_log_level(config->rk, LOG_DEBUG); /* TODO: offset calculation */ config->start_offset = RD_KAFKA_OFFSET_END; /* Add brokers */ if(rd_kafka_brokers_add(config->rk, config->brokers) == 0) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Create topic */ config->rkt = rd_kafka_topic_new(config->rk, config->topic, config->topic_conf); /* We're now initialized */ config->isInitialized = 1; } // Ruby gem extensions /** * Callback invoked if Ruby needs to stop our Consumer's IO loop for any reason (system exit, etc.) */ static void consumer_consume_stop_callback(void *ptr) { HermannInstanceConfig* config = (HermannInstanceConfig*)ptr; #ifdef TRACE fprintf(stderr, "consumer_consume_stop_callback"); #endif config->run = 0; } /** * Loop on a timeout to receive messages from Kafka. When the consumer_consume_stop_callback is invoked by Ruby, * we'll break out of our loop and return. */ void consumer_consume_loop(HermannInstanceConfig* consumerConfig) { #ifdef TRACE fprintf(stderr, "consumer_consume_loop"); #endif while (consumerConfig->run) { rd_kafka_message_t *rkmessage; if(rd_kafka_consume_callback(consumerConfig->rkt, consumerConfig->partition, 1000/*timeout*/, msg_consume, consumerConfig) < 0) { fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno))); } } } /** * Hermann::Consumer.consume * * Begin listening on the configured topic for messages. msg_consume will be called on each message received. * * @param VALUE self the Ruby object for this consumer */ static VALUE consumer_consume(VALUE self) { HermannInstanceConfig* consumerConfig; #ifdef TRACE fprintf(stderr, "consumer_consume"); #endif Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); if(consumerConfig->topic==NULL) { fprintf(stderr, "Topic is null!"); return; } if(!consumerConfig->isInitialized) { consumer_init_kafka(consumerConfig); } /* Start consuming */ if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1){ fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(rd_kafka_errno2err(errno))); exit(1); } #ifdef RB_THREAD_BLOCKING_REGION /** The consumer will listen for incoming messages in a loop, timing out and checking the consumerConfig->run * flag every second. * * Call rb_thread_blocking_region to release the GVM lock and allow Ruby to amuse itself while we wait on * IO from Kafka. * * If Ruby needs to interrupt the consumer loop, the stop callback will be invoked and the loop should exit. */ rb_thread_blocking_region(consumer_consume_loop, consumerConfig, consumer_consume_stop_callback, consumerConfig); #else consumer_consume_loop(consumerConfig); #endif /* Stop consuming */ rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition); return Qnil; } /** * producer_init_kafka * * Initialize the producer instance, setting up the Kafka topic and context. * * @param config HermannInstanceConfig* the instance configuration associated with this producer. */ void producer_init_kafka(HermannInstanceConfig* config) { #ifdef TRACE fprintf(stderr, "producer_init_kafka\n"); #endif config->quiet = !isatty(STDIN_FILENO); /* Kafka configuration */ config->conf = rd_kafka_conf_new(); /* Topic configuration */ config->topic_conf = rd_kafka_topic_conf_new(); /* Set up a message delivery report callback. * It will be called once for each message, either on successful * delivery to broker, or upon failure to deliver to broker. */ rd_kafka_conf_set_dr_cb(config->conf, msg_delivered); /* Create Kafka handle */ if (!(config->rk = rd_kafka_new(RD_KAFKA_PRODUCER, config->conf, config->errstr, sizeof(config->errstr)))) { fprintf(stderr, "%% Failed to create new producer: %s\n", config->errstr); exit(1); } /* Set logger */ rd_kafka_set_logger(config->rk, logger); rd_kafka_set_log_level(config->rk, LOG_DEBUG); if(rd_kafka_brokers_add(config->rk, config->brokers) == 0) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Create topic */ config->rkt = rd_kafka_topic_new(config->rk, config->topic, config->topic_conf); /* Set the partitioner callback */ rd_kafka_topic_conf_set_partitioner_cb( config->topic_conf, producer_paritioner_callback ); /* We're now initialized */ config->isInitialized = 1; #ifdef TRACE fprintf(stderr, "producer_init_kafka::END\n"); fprintf_hermann_instance_config(config, stderr); #endif } /** * producer_push_single * * @param self VALUE the Ruby producer instance * @param message VALUE the ruby String containing the outgoing message. */ static VALUE producer_push_single(VALUE self, VALUE message) { HermannInstanceConfig* producerConfig; char buf[2048]; #ifdef TRACE fprintf(stderr, "producer_push_single\n"); #endif Data_Get_Struct(self, HermannInstanceConfig, producerConfig); if(producerConfig->topic==NULL) { fprintf(stderr, "Topic is null!"); return self; } if(!producerConfig->isInitialized) { producer_init_kafka(producerConfig); } char *msg = StringValueCStr(message); strcpy(buf, msg); size_t len = strlen(buf); if (buf[len-1] == '\n') buf[--len] = '\0'; #ifdef TRACE fprintf(stderr, "producer_push_single::before_produce message1\n"); fprintf_hermann_instance_config(producerConfig, stderr); fprintf(stderr, "producer_push_single::before_produce_message2\n"); fflush(stderr); #endif /* Send/Produce message. */ if (rd_kafka_produce(producerConfig->rkt, producerConfig->partition, RD_KAFKA_MSG_F_COPY, /* Payload and length */ buf, len, /* Optional key and its length */ NULL, 0, /* Message opaque, provided in * delivery report callback as * msg_opaque. */ NULL) == -1) { fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n", rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition, rd_kafka_err2str(rd_kafka_errno2err(errno))); /* Poll to handle delivery reports */ rd_kafka_poll(producerConfig->rk, 10); } /* Must poll to handle delivery reports */ rd_kafka_poll(producerConfig->rk, 0); #ifdef TRACE fprintf(stderr, "producer_push_single::prior return\n"); #endif return self; } /** * producer_push_array * * Publish each of the messages in array on the configured topic. * * @param self VALUE the instance of the Ruby Producer object * @param length int the length of the outgoing messages array * @param array VALUE the Ruby array of messages */ static VALUE producer_push_array(VALUE self, int length, VALUE array) { int i; VALUE message; #ifdef TRACE fprintf(stderr, "producer_push_array\n"); #endif for(i=0;irkt != NULL) { rd_kafka_topic_destroy(config->rkt); } if(config->rk != NULL) { rd_kafka_destroy(config->rk); } // clean up the struct free(config); } /** * consumer_allocate * * Allocate and wrap an HermannInstanceConfig for this Consumer object. * * @param klass VALUE the class of the enclosing Ruby object. */ static VALUE consumer_allocate(VALUE klass) { VALUE obj; HermannInstanceConfig* consumerConfig; #ifdef TRACE fprintf(stderr, "consumer_free\n"); #endif consumerConfig = ALLOC(HermannInstanceConfig); // Make sure it's initialized consumerConfig->topic = NULL; consumerConfig->rk = NULL; consumerConfig->rkt = NULL; consumerConfig->brokers = NULL; consumerConfig->partition = -1; consumerConfig->topic_conf = NULL; consumerConfig->errstr[0] = 0; consumerConfig->conf = NULL; consumerConfig->debug = NULL; consumerConfig->start_offset = -1; consumerConfig->do_conf_dump = -1; consumerConfig->run = 0; consumerConfig->exit_eof = 0; consumerConfig->quiet = 0; consumerConfig->isInitialized = 0; obj = Data_Wrap_Struct(klass, 0, consumer_free, consumerConfig); return obj; } /** * consumer_initialize * * todo: configure the brokers through passed parameter, later through zk * * Set up the Consumer's HermannInstanceConfig context. * * @param self VALUE the Ruby instance of the Consumer * @param topic VALUE a Ruby string * @param brokers VALUE a Ruby string containing list of host:port * @param partition VALUE a Ruby number */ static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, VALUE partition) { HermannInstanceConfig* consumerConfig; char* topicPtr; char* brokersPtr; int partitionNo; #ifdef TRACE fprintf(stderr, "consumer_initialize\n"); #endif topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); partitionNo = FIX2INT(partition); Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); consumerConfig->topic = topicPtr; consumerConfig->brokers = brokersPtr; consumerConfig->partition = partitionNo; consumerConfig->run = 1; consumerConfig->exit_eof = 0; consumerConfig->quiet = 0; return self; } /** * consumer_init_copy * * When copying into a new instance of a Consumer, reproduce the configuration info. * * @param copy VALUE the Ruby Consumer instance (with configuration) as destination * @param orig VALUE the Ruby Consumer instance (with configuration) as source * */ static VALUE consumer_init_copy(VALUE copy, VALUE orig) { HermannInstanceConfig* orig_config; HermannInstanceConfig* copy_config; #ifdef TRACE fprintf(stderr, "consumer_init_copy\n"); #endif if(copy == orig) { return copy; } if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) { rb_raise(rb_eTypeError, "wrong argument type"); } Data_Get_Struct(orig, HermannInstanceConfig, orig_config); Data_Get_Struct(copy, HermannInstanceConfig, copy_config); // Copy over the data from one struct to the other MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1); return copy; } /** * producer_free * * Reclaim memory allocated to the Producer's configuration * * @param p void* the instance's configuration struct */ static void producer_free(void * p) { HermannInstanceConfig* config; #ifdef TRACE fprintf(stderr, "producer_free\n"); #endif config = (HermannInstanceConfig *)p; if(NULL==p) { return; } // Clean up the topic if(config->rkt != NULL) { rd_kafka_topic_destroy(config->rkt); } // Take care of the producer instance if(config->rk != NULL) { rd_kafka_destroy(config->rk); } // Free the struct free(config); } /** * producer_allocate * * Allocate the memory for a Producer's configuration * * @param klass VALUE the class of the Producer */ static VALUE producer_allocate(VALUE klass) { VALUE obj; HermannInstanceConfig* producerConfig; #ifdef TRACE fprintf(stderr, "producer_allocate\n"); #endif producerConfig = ALLOC(HermannInstanceConfig); producerConfig->topic = NULL; producerConfig->rk = NULL; producerConfig->rkt = NULL; producerConfig->brokers = NULL; producerConfig->partition = -1; producerConfig->topic_conf = NULL; producerConfig->errstr[0] = 0; producerConfig->conf = NULL; producerConfig->debug = NULL; producerConfig->start_offset = -1; producerConfig->do_conf_dump = -1; producerConfig->run = 0; producerConfig->exit_eof = 0; producerConfig->quiet = 0; producerConfig->isInitialized = 0; obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig); return obj; } /** * producer_initialize * * Set up the configuration context for the Producer instance * * @param self VALUE the Producer instance * @param topic VALUE the Ruby string naming the topic * @param brokers VALUE a Ruby string containing host:port pairs separated by commas */ static VALUE producer_initialize(VALUE self, VALUE topic, VALUE brokers) { HermannInstanceConfig* producerConfig; char* topicPtr; char* brokersPtr; #ifdef TRACE fprintf(stderr, "producer_initialize\n"); #endif topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); Data_Get_Struct(self, HermannInstanceConfig, producerConfig); producerConfig->topic = topicPtr; producerConfig->brokers = brokersPtr; /** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target * partition */ producerConfig->partition = RD_KAFKA_PARTITION_UA; producerConfig->run = 1; producerConfig->exit_eof = 0; producerConfig->quiet = 0; return self; } /** * producer_init_copy * * Copy the configuration information from orig into copy for the given Producer instances. * * @param copy VALUE destination Producer * @param orign VALUE source Producer */ static VALUE producer_init_copy(VALUE copy, VALUE orig) { HermannInstanceConfig* orig_config; HermannInstanceConfig* copy_config; #ifdef TRACE fprintf(stderr, "producer_init_copy\n"); #endif if(copy == orig) { return copy; } if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) { rb_raise(rb_eTypeError, "wrong argument type"); } Data_Get_Struct(orig, HermannInstanceConfig, orig_config); Data_Get_Struct(copy, HermannInstanceConfig, copy_config); // Copy over the data from one struct to the other MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1); return copy; } /** * Init_hermann_lib * * Called by Ruby when the Hermann gem is loaded. * Defines the Hermann module. * Defines the Producer and Consumer classes. */ void Init_hermann_lib() { #ifdef TRACE fprintf(stderr, "init_hermann_lib\n"); #endif /* Define the module */ m_hermann = rb_define_module("Hermann"); /* ---- Define the consumer class ---- */ VALUE c_consumer = rb_define_class_under(m_hermann, "Consumer", rb_cObject); /* Allocate */ rb_define_alloc_func(c_consumer, consumer_allocate); /* Initialize */ rb_define_method(c_consumer, "initialize", consumer_initialize, 3); rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1); /* Consumer has method 'consume' */ rb_define_method( c_consumer, "consume", consumer_consume, 0 ); /* ---- Define the producer class ---- */ VALUE c_producer = rb_define_class_under(m_hermann, "Producer", rb_cObject); /* Allocate */ rb_define_alloc_func(c_producer, producer_allocate); /* Initialize */ rb_define_method(c_producer, "initialize", producer_initialize, 2); rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1); /* Producer.push(msg) */ rb_define_method( c_producer, "push", producer_push, 1 ); }