Added Producer configuration to struct. Simplified prodcfg and conscfg to common.

This commit is contained in:
Stan Campbell 2014-06-05 09:03:00 -07:00
parent 9a9d377863
commit 66c7cbdbea
4 changed files with 36 additions and 56 deletions

Binary file not shown.

View File

@ -9,12 +9,12 @@
*/
// Allocate a new Producer Configuration struct
HermannProducerConfig newProducerConfig() {
HermannInstanceConfig newProducerConfig() {
}
// Allocate a new Consumer Configuration struct
HermannConsumerConfig newConsumerConfig() {
HermannInstanceConfig newConsumerConfig() {
}
/**
@ -144,9 +144,9 @@ void actAsProducer(char* topic) {
// Main entry point for Consumer behavior
void actAsConsumer(VALUE self) {
HermannConsumerConfig* consumerConfig;
HermannInstanceConfig* consumerConfig;
Data_Get_Struct(self, HermannConsumerConfig, consumerConfig);
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
if(consumerConfig->topic==NULL) {
fprintf(stderr, "Topic is null!");
@ -253,9 +253,9 @@ static VALUE consume(VALUE self) {
/* Hermann::Producer.push */
static VALUE push(VALUE self, VALUE message) {
HermannProducerConfig* producerConfig;
HermannInstanceConfig* producerConfig;
Data_Get_Struct(self, HermannProducerConfig, producerConfig);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if(producerConfig->topic==NULL) {
fprintf(stderr, "Topic is null!");
@ -265,11 +265,11 @@ static VALUE push(VALUE self, VALUE message) {
quiet = !isatty(STDIN_FILENO);
/* Kafka configuration */
conf = rd_kafka_conf_new();
producerConfig->conf = rd_kafka_conf_new();
fprintf(stderr, "Kafka configuration created\n");
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();
producerConfig->topic_conf = rd_kafka_topic_conf_new();
fprintf(stderr, "Topic configuration created\n");
/* todo: This will need to be adapted to maintain the state through multiple invocations. */
@ -280,12 +280,12 @@ static VALUE push(VALUE self, VALUE message) {
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
rd_kafka_conf_set_dr_cb(producerConfig->conf, msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) {
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, producerConfig->conf, producerConfig->errstr, sizeof(producerConfig->errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n", errstr);
"%% Failed to create new producer: %s\n", producerConfig->errstr);
exit(1);
}
@ -293,13 +293,13 @@ static VALUE push(VALUE self, VALUE message) {
rd_kafka_set_logger(rk, logger);
rd_kafka_set_log_level(rk, LOG_DEBUG);
if(rd_kafka_brokers_add(rk, brokers) == 0) {
if(rd_kafka_brokers_add(rk, producerConfig->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
producerConfig->rkt = rd_kafka_topic_new(rk, producerConfig->topic, producerConfig->topic_conf);
/* todo: copy message into buf */
@ -308,7 +308,7 @@ static VALUE push(VALUE self, VALUE message) {
buf[--len] = '\0';
/* Send/Produce message. */
if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
if (rd_kafka_produce(producerConfig->rkt, producerConfig->partition, RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
@ -319,13 +319,13 @@ static VALUE push(VALUE self, VALUE message) {
NULL) == -1) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
} else {
fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n", len, rd_kafka_topic_name(rkt), partition);
fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n", len, rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition);
}
/* Wait for messages to be delivered */
@ -354,7 +354,7 @@ static VALUE consumer_allocate(VALUE klass) {
VALUE obj;
printf("consumer_allocate\n");
HermannConsumerConfig* consumerConfig = ALLOC(HermannConsumerConfig);
HermannInstanceConfig* consumerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, consumer_free, consumerConfig);
@ -365,7 +365,7 @@ static VALUE consumer_allocate(VALUE klass) {
static VALUE consumer_initialize(VALUE self, VALUE topic) {
HermannConsumerConfig* consumerConfig;
HermannInstanceConfig* consumerConfig;
char* topicPtr;
printf("consumer_initialize\n");
@ -373,7 +373,7 @@ static VALUE consumer_initialize(VALUE self, VALUE topic) {
topicPtr = StringValuePtr(topic);
fprintf(stderr, "Topic is:%s\n", topicPtr);
Data_Get_Struct(self, HermannConsumerConfig, consumerConfig);
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
/* todo: actually initialize the configuration options */
consumerConfig->topic = topicPtr;
@ -386,8 +386,8 @@ static VALUE consumer_initialize(VALUE self, VALUE topic) {
}
static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
HermannConsumerConfig* orig_config;
HermannConsumerConfig* copy_config;
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if(copy == orig) {
return copy;
@ -397,11 +397,11 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannConsumerConfig, orig_config);
Data_Get_Struct(copy, HermannConsumerConfig, copy_config);
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannConsumerConfig, 1);
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
@ -415,7 +415,7 @@ static VALUE producer_allocate(VALUE klass) {
VALUE obj;
printf("producer_allocate\n");
HermannProducerConfig* producerConfig = ALLOC(HermannProducerConfig);
HermannInstanceConfig* producerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig);
@ -426,7 +426,7 @@ static VALUE producer_allocate(VALUE klass) {
static VALUE producer_initialize(VALUE self, VALUE topic) {
HermannProducerConfig* producerConfig;
HermannInstanceConfig* producerConfig;
char* topicPtr;
printf("producer_initialize\n");
@ -434,7 +434,7 @@ static VALUE producer_initialize(VALUE self, VALUE topic) {
topicPtr = StringValuePtr(topic);
fprintf(stderr, "Topic is:%s\n", topicPtr);
Data_Get_Struct(self, HermannProducerConfig, producerConfig);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
/* todo: actually initialize the configuration options */
producerConfig->topic = topicPtr;
@ -447,8 +447,8 @@ static VALUE producer_initialize(VALUE self, VALUE topic) {
}
static VALUE producer_init_copy(VALUE copy, VALUE orig) {
HermannProducerConfig* orig_config;
HermannProducerConfig* copy_config;
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if(copy == orig) {
return copy;
@ -458,11 +458,11 @@ static VALUE producer_init_copy(VALUE copy, VALUE orig) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannProducerConfig, orig_config);
Data_Get_Struct(copy, HermannProducerConfig, copy_config);
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannProducerConfig, 1);
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}

View File

@ -19,7 +19,7 @@ static VALUE m_hermann;
// From rdkafka_example.c
static int run = 1;
static rd_kafka_t *rk;
static rd_kafka_t *rk; // todo: this is the instance of the Prod or Consumer, move into struct
static int exit_eof = 0;
static int quiet = 0;
static enum {
@ -27,7 +27,7 @@ static enum {
OUTPUT_RAW,
} output = OUTPUT_HEXDUMP;
typedef struct HermannProducerConfig {
typedef struct HermannInstanceConfig {
char* topic;
@ -41,26 +41,6 @@ typedef struct HermannProducerConfig {
const char *debug;
int64_t start_offset;
int do_conf_dump;
} HermannInstanceConfig;
} HermannProducerConfig;
typedef struct HermannConsumerConfig {
char* topic;
/* Kafka configuration */
rd_kafka_topic_t *rkt;
char *brokers;
int partition;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
rd_kafka_conf_t *conf;
const char *debug;
int64_t start_offset;
int do_conf_dump;
} HermannConsumerConfig;
#endif
#endif

Binary file not shown.