Use librdkafka 0.9.2

Upgrades the underlying `librdkafka` to 0.9.2. This removes the copy of
`rd_kafka_msg_partitioner_consistent` which is now included. It also
changes the `rd_kafka_set_logger`, which were deprecated, to
`rd_kafka_conf_set_log_cb`.
This commit is contained in:
Thijs Cadier 2017-01-15 22:51:22 +01:00
parent d822ae541a
commit 9110298a18
3 changed files with 12 additions and 22 deletions

1
.gitignore vendored
View File

@ -13,6 +13,7 @@ tmp/
Gemfile.lock
Jarfile.lock
.jbundler/
lib/hermann_rdkafka.bundle
lib/hermann_jars.rb
coverage/
spec/reports

View File

@ -33,15 +33,16 @@ class RdKafkaRecipe < MiniPortile
# Overriding this from MiniPortile because it includes autoconf defaults that
# don't apply to librdkafka's mklove-based configure script
def configure_defaults
[]
['--disable-sasl']
end
def download_file(url, full_path, count=3)
super(url, full_path, count)
# Support some simple checksumming
unless Digest::MD5.hexdigest(File.read(full_path)) == checksum
raise 'Checksum error!'
file_checksum = Digest::MD5.hexdigest(File.read(full_path))
unless file_checksum == checksum
raise "Checksum error: #{file_checksum} did not match #{checksum}"
end
end
@ -122,9 +123,9 @@ class RdKafkaRecipe < MiniPortile
end
################################################################################
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.6')
librdkafka = RdKafkaRecipe.new('librdkafka', 'v0.9.2')
librdkafka.files = ["https://github.com/edenhill/librdkafka/archive/#{librdkafka.version}.tar.gz"]
librdkafka.checksum = '1b77543f9be82d3f700c0ef98f494990'
librdkafka.checksum = 'f2cc5ca6a149928c3cb34398379a5024'
checkpoint = ".librdkafka.#{librdkafka.version}.cooked"
unless File.exists?(checkpoint)

View File

@ -139,18 +139,6 @@ static void msg_delivered(rd_kafka_t *rk,
}
}
/* This function is in rdkafka.h on librdkafka master. As soon as a new
* version is released and Hermann points to it, this can be removed. */
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
/**
* Producer partitioner callback.
* Used to determine the target partition within a topic for production.
@ -308,7 +296,7 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
* @param fac char* something of which I am unaware
* @param buf char* the log message
*/
static void logger(const rd_kafka_t *rk,
static void log_cb(const rd_kafka_t *rk,
int level,
const char *fac,
const char *buf) {
@ -345,8 +333,8 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
rb_raise(rb_eRuntimeError, "%% Failed to create new consumer: %s\n", config->errstr);
}
/* Set logger */
rd_kafka_set_logger(config->rk, logger);
/* Set log callback */
rd_kafka_conf_set_log_cb(config->conf, log_cb);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
/* Add brokers */
@ -570,8 +558,8 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
rb_raise(rb_eRuntimeError, "%% Failed to create new producer: %s\n", config->errstr);
}
/* Set logger */
rd_kafka_set_logger(config->rk, logger);
/* Set log_cb */
rd_kafka_conf_set_log_cb(config->conf, log_cb);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {