Roll back to using 0.8.4 APIs since we're bundling now

This commit is contained in:
R. Tyler Croy 2014-09-12 16:15:04 -07:00
parent 5de9612882
commit 44efb9e2b5
1 changed files with 10 additions and 14 deletions

View File

@ -86,31 +86,28 @@ void fprintf_hermann_instance_config(HermannInstanceConfig *config,
*
*/
static void msg_delivered(rd_kafka_t *rk,
void *payload,
size_t len,
int error_code,
void *opaque,
void *msg_opaque) {
const rd_kafka_message_t *message,
void *ctx) {
hermann_push_ctx_t *push_ctx;
VALUE is_error = Qfalse;
ID hermann_result_fulfill_method = rb_intern("internal_set_value");
TRACER("ctx: %p, err: %i\n", ctx, error_code);
TRACER("ctx: %p, err: %i\n", ctx, message->err);
if (error_code) {
if (message->err) {
is_error = Qtrue;
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(error_code));
rd_kafka_err2str(message->err));
/* todo: should raise an error? */
}
/* according to @edenhill rd_kafka_message_t._private is ABI safe to call
* and represents the `msg_opaque` argument passed into `rd_kafka_produce`
*/
if (NULL != msg_opaque) {
push_ctx = (hermann_push_ctx_t *)msg_opaque;
if (NULL != message->_private) {
push_ctx = (hermann_push_ctx_t *)message->_private;
if (!error_code) {
if (!message->err) {
/* if we have not errored, great! let's say we're connected */
push_ctx->producer->isConnected = 1;
}
@ -122,10 +119,9 @@ static void msg_delivered(rd_kafka_t *rk,
rb_funcall(push_ctx->result,
hermann_result_fulfill_method,
2,
rb_str_new((char *)payload, len), /* value */
rb_str_new((char *)message->payload, message->len), /* value */
is_error /* is_error */ );
}
/* we malloc()d a push context in producer_push_single, get rid of it */
free(push_ctx);
}
}
@ -497,7 +493,7 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
rd_kafka_conf_set_dr_cb(config->conf, msg_delivered);
rd_kafka_conf_set_dr_msg_cb(config->conf, msg_delivered);
/* Create Kafka handle */
if (!(config->rk = rd_kafka_new(RD_KAFKA_PRODUCER,