mirror of https://github.com/reiseburo/hermann
Merge 9110298a18
into d822ae541a
This commit is contained in:
commit
655b11d255
|
@ -13,6 +13,7 @@ tmp/
|
|||
Gemfile.lock
|
||||
Jarfile.lock
|
||||
.jbundler/
|
||||
lib/hermann_rdkafka.bundle
|
||||
lib/hermann_jars.rb
|
||||
coverage/
|
||||
spec/reports
|
||||
|
|
|
@ -33,15 +33,16 @@ class RdKafkaRecipe < MiniPortile
|
|||
# Overriding this from MiniPortile because it includes autoconf defaults that
|
||||
# don't apply to librdkafka's mklove-based configure script
|
||||
def configure_defaults
|
||||
[]
|
||||
['--disable-sasl']
|
||||
end
|
||||
|
||||
def download_file(url, full_path, count=3)
|
||||
super(url, full_path, count)
|
||||
|
||||
# Support some simple checksumming
|
||||
unless Digest::MD5.hexdigest(File.read(full_path)) == checksum
|
||||
raise 'Checksum error!'
|
||||
file_checksum = Digest::MD5.hexdigest(File.read(full_path))
|
||||
unless file_checksum == checksum
|
||||
raise "Checksum error: #{file_checksum} did not match #{checksum}"
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -122,9 +123,9 @@ class RdKafkaRecipe < MiniPortile
|
|||
end
|
||||
################################################################################
|
||||
|
||||
librdkafka = RdKafkaRecipe.new('librdkafka', '0.8.6')
|
||||
librdkafka = RdKafkaRecipe.new('librdkafka', 'v0.9.2')
|
||||
librdkafka.files = ["https://github.com/edenhill/librdkafka/archive/#{librdkafka.version}.tar.gz"]
|
||||
librdkafka.checksum = '1b77543f9be82d3f700c0ef98f494990'
|
||||
librdkafka.checksum = 'f2cc5ca6a149928c3cb34398379a5024'
|
||||
checkpoint = ".librdkafka.#{librdkafka.version}.cooked"
|
||||
|
||||
unless File.exists?(checkpoint)
|
||||
|
|
|
@ -139,18 +139,6 @@ static void msg_delivered(rd_kafka_t *rk,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* This function is in rdkafka.h on librdkafka master. As soon as a new
|
||||
* version is released and Hermann points to it, this can be removed. */
|
||||
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
|
||||
const void *key, size_t keylen,
|
||||
int32_t partition_cnt,
|
||||
void *rkt_opaque,
|
||||
void *msg_opaque) {
|
||||
return rd_crc32(key, keylen) % partition_cnt;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Producer partitioner callback.
|
||||
* Used to determine the target partition within a topic for production.
|
||||
|
@ -308,7 +296,7 @@ static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cf
|
|||
* @param fac char* something of which I am unaware
|
||||
* @param buf char* the log message
|
||||
*/
|
||||
static void logger(const rd_kafka_t *rk,
|
||||
static void log_cb(const rd_kafka_t *rk,
|
||||
int level,
|
||||
const char *fac,
|
||||
const char *buf) {
|
||||
|
@ -345,8 +333,8 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
|
|||
rb_raise(rb_eRuntimeError, "%% Failed to create new consumer: %s\n", config->errstr);
|
||||
}
|
||||
|
||||
/* Set logger */
|
||||
rd_kafka_set_logger(config->rk, logger);
|
||||
/* Set log callback */
|
||||
rd_kafka_conf_set_log_cb(config->conf, log_cb);
|
||||
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
|
||||
|
||||
/* Add brokers */
|
||||
|
@ -570,8 +558,8 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
|
|||
rb_raise(rb_eRuntimeError, "%% Failed to create new producer: %s\n", config->errstr);
|
||||
}
|
||||
|
||||
/* Set logger */
|
||||
rd_kafka_set_logger(config->rk, logger);
|
||||
/* Set log_cb */
|
||||
rd_kafka_conf_set_log_cb(config->conf, log_cb);
|
||||
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
|
||||
|
||||
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {
|
||||
|
|
Loading…
Reference in New Issue