librdkafka/src-cpp/rdkafkacpp.h

569 lines
17 KiB
C++

/*
* librdkafka - Apache Kafka C/C++ library
*
* Copyright (c) 2014 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka consumer & producer
*
* rdkafkacpp.h contains the public C++ API for librdkafka.
* The API isdocumented in this file as comments prefixing the class, function,
* type, enum, define, etc.
* For more information, see the C interface in rdkafka.h and read the
* manual in INTRODUCTION.md.
* The C++ interface is STD C++ '03 compliant.
*/
#include <string>
#include <list>
#include <stdint.h>
namespace RdKafka {
/**
* librdkafka version
*
* Interpreted as hex MM.mm.rr.xx:
* MM = Major
* mm = minor
* rr = revision
* xx = currently unused
*
* I.e.: 0x00080100 = 0.8.1
*/
#define RD_KAFKA_VERSION 0x00080400
/**
* Returns the librdkafka version as integer.
*/
int version ();
/**
* Returns the librdkafka version as string.
*/
std::string version_str ();
/**
* ErrorCode
* Returned throughout the library to signal errors and events.
*/
enum ErrorCode {
ERR__BEGIN = -200, /* begin internal error codes */
ERR__BAD_MSG = -199, /* Received message is incorrect */
ERR__BAD_COMPRESSION = -198, /* Bad/unknown compression */
ERR__DESTROY = -197, /* Broker is going away */
ERR__FAIL = -196, /* Generic failure */
ERR__TRANSPORT = -195, /* Broker transport error */
ERR__CRIT_SYS_RESOURCE = -194, /* Critical system resource
* failure */
ERR__RESOLVE = -193, /* Failed to resolve broker */
ERR__MSG_TIMED_OUT = -192, /* Produced message timed out*/
ERR__PARTITION_EOF = -191, /* Reached the end of the
* topic+partition queue on
* the broker.
* Not really an error. */
ERR__UNKNOWN_PARTITION = -190, /* Permanent:
* Partition does not
* exist in cluster. */
ERR__FS = -189, /* File or filesystem error */
ERR__UNKNOWN_TOPIC = -188, /* Permanent:
* Topic does not exist
* in cluster. */
ERR__ALL_BROKERS_DOWN = -187, /* All broker connections
* are down. */
ERR__INVALID_ARG = -186, /* Invalid argument, or
* invalid configuration */
ERR__TIMED_OUT = -185, /* Operation timed out */
ERR__QUEUE_FULL = -184, /* Queue is full */
ERR__ISR_INSUFF = -183, /* ISR count < required.acks */
ERR__END = -100, /* end internal error codes */
/* Standard Kafka errors: */
ERR_UNKNOWN = -1,
ERR_NO_ERROR = 0,
ERR_OFFSET_OUT_OF_RANGE = 1,
ERR_INVALID_MSG = 2,
ERR_UNKNOWN_TOPIC_OR_PART = 3,
ERR_INVALID_MSG_SIZE = 4,
ERR_LEADER_NOT_AVAILABLE = 5,
ERR_NOT_LEADER_FOR_PARTITION = 6,
ERR_REQUEST_TIMED_OUT = 7,
ERR_BROKER_NOT_AVAILABLE = 8,
ERR_REPLICA_NOT_AVAILABLE = 9,
ERR_MSG_SIZE_TOO_LARGE = 10,
ERR_STALE_CTRL_EPOCH = 11,
ERR_OFFSET_METADATA_TOO_LARGE = 12
};
/**
* Returns a human readable representation of a kafka error.
*/
std::string err2str (RdKafka::ErrorCode err);
/**
* Wait for all rd_kafka_t objects to be destroyed.
* Returns 0 if all kafka objects are now destroyed, or -1 if the
* timeout was reached.
* Since RdKafka handle deletion is an asynch opration the
* `wait_destroyed()` function can be used for applications where
* a clean shutdown is required.
*/
int wait_destroyed (int timeout_ms);
/* Forward declarations */
class Producer;
class Message;
class Event;
class Topic;
/**
* Delivery Report callback class
*/
class DeliveryReportCb {
public:
virtual void dr_cb (Message &message) = 0;
};
/**
* Partitioner callback class
*/
class PartitionerCb {
public:
virtual int32_t partitioner_cb (const Topic *topic,
const std::string *key,
int32_t partition_cnt,
void *msg_opaque) = 0;
};
/**
* SocketCb callback class
*/
class SocketCb {
public:
virtual int socket_cb (int domain, int type, int protocol) = 0;
};
/**
* OpenCb callback class
*/
class OpenCb {
public:
virtual int open_cb (const std::string &path, int flags, int mode) = 0;
};
/**
* Event callback class
* Events propogate errors, stats and logs to the application.
*/
class EventCb {
public:
virtual void event_cb (Event &event) = 0;
};
/**
* Event class as provided to the EventCb callback.
*/
class Event {
public:
enum Type {
EVENT_ERROR,
EVENT_STATS,
EVENT_LOG
};
enum Severity {
/* These match the syslog(3) severities */
EVENT_SEVERITY_EMERG = 0,
EVENT_SEVERITY_ALERT = 1,
EVENT_SEVERITY_CRITICAL = 2,
EVENT_SEVERITY_ERROR = 3,
EVENT_SEVERITY_WARNING = 4,
EVENT_SEVERITY_NOTICE = 5,
EVENT_SEVERITY_INFO = 6,
EVENT_SEVERITY_DEBUG = 7
};
~Event () {};
/* Accessor functions */
virtual Type type () const = 0;
virtual ErrorCode err () const = 0;
virtual Severity severity () const = 0;
virtual std::string fac () const = 0;
virtual std::string str () const = 0;
};
/**
* Configuration object.
* Holds either global or topic configuration.
* Created through Conf::create() factory.
*/
class Conf {
public:
enum ConfType {
CONF_GLOBAL,
CONF_TOPIC
};
enum ConfResult {
CONF_UNKNOWN = -2,
CONF_INVALID = -1,
CONF_OK = 0
};
static const std::string DEBUG_CONTEXTS;
/**
* Create conf object
*/
static Conf *create (ConfType type);
virtual ~Conf () { };
/**
* Set configuration property 'name' to value 'value'.
* Returns CONF_OK on success.
* 'errstr' is set to a human readable error description on error.
*/
virtual Conf::ConfResult set (const std::string &name,
const std::string &value,
std::string &errstr) = 0;
/* Use with 'name' = "dr_cb" */
virtual Conf::ConfResult set (const std::string &name,
DeliveryReportCb *dr_cb,
std::string &errstr) = 0;
/* Use with 'name' = "event_cb" */
virtual Conf::ConfResult set (const std::string &name,
EventCb *event_cb,
std::string &errstr) = 0;
/* Use with 'name' = "partitioner_cb" */
virtual Conf::ConfResult set (const std::string &name,
PartitionerCb *partitioner_cb,
std::string &errstr) = 0;
/* Use with 'name' = "socket_cb" */
virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
std::string &errstr) = 0;
/* Use with 'name' = "open_cb" */
virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
std::string &errstr) = 0;
/**
* Dump configuration names and values to list containing name,value tuples
*/
virtual std::list<std::string> *dump () = 0;
};
/**
* Base handle, super class for Consumer and Producer.
*/
class Handle {
public:
virtual ~Handle() {};
/**
* Returns the name of the handle
*/
virtual const std::string name () = 0;
/**
* Polls the provided kafka handle for events.
*
* Events will cause application provided callbacks to be called.
*
* The 'timeout_ms' argument specifies the minimum amount of time
* (in milliseconds) that the call will block waiting for events.
* For non-blocking calls, provide 0 as 'timeout_ms'.
* To wait indefinately for an event, provide -1.
*
* Events:
* - delivery report callbacks (if dr_cb is configured) [producer]
* - event callbacks (if event_cb is configured) [producer & consumer]
* - stats callbacks (if stats_cb is configured) [producer & consumer]
*
* Returns the number of events served.
*/
virtual int poll (int timeout_ms) = 0;
/**
* Returns the current out queue length:
* messages waiting to be sent to, or acknowledged by, the broker.
*/
virtual int outq_len () = 0;
};
/**
* Topic
*/
class Topic {
public:
/**
* Unassigned partition.
*
* The unassigned partition is used by the producer API for messages
* that should be partitioned using the configured or default partitioner.
*/
static const int32_t PARTITION_UA = -1;
/* Special offsets */
static const int64_t OFFSET_BEGINNING = -2; /* Consume from beginning */
static const int64_t OFFSET_END = -1; /* Consume from end */
static const int64_t OFFSET_STORED = -1000; /* Consume from offset
* retrived from offset store*/
/**
* Creates a new topic handle for topic named 'topic_str'.
*
* 'conf' is an optional configuration for the topic that will be used
* instead of the default topic configuration.
* The 'conf' object is reusable after this call.
*
* Returns the new topic handle or NULL on error (see `errstr`).
*/
static Topic *create (Handle *base, const std::string &topic_str,
Conf *conf, std::string &errstr);
virtual ~Topic () = 0;
/**
* Returns the topic name
*/
virtual const std::string name () = 0;
/**
* Returns true if 'partition' is available for the topic (has leader).
* NOTE: MUST ONLY be called from within a PartitionerCb callback.
*/
virtual bool partition_available (int32_t partition) = 0;
/**
* Store offset 'offset' for topic partition 'partition'.
* The offset will be commited (written) to the offset store according
* to `auto.commit.interval.ms`.
*
* NOTE: `auto.commit.enable` must be set to "false" when using this API.
*
* Returns RD_KAFKA_RESP_ERR_NO_ERROR on success or an error code on error.
*/
virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
};
/**
* Message, as provided to DeliveryReportCb, PartitionerCb callbacks, etc.
*/
class Message {
public:
/* Accessor functions,
* Not all fields are present in all types of callbacks. */
virtual std::string errstr() const = 0;
virtual ErrorCode err () const = 0;
virtual Topic *topic () const = 0;
virtual int32_t partition () const = 0;
virtual void *payload () const = 0 ;
virtual size_t len () const = 0;
virtual const std::string *key () const = 0;
virtual int64_t offset () const = 0;
virtual void *msg_opaque () const = 0;
virtual ~Message () = 0;
};
/**
* Consumer
*/
class Consumer : public virtual Handle {
public:
/**
* Creates a new Kafka consumer handle.
*
* 'conf' is an optional object that will be used instead of the default
* configuration.
* The 'conf' object is reusable after this call.
*
* Returns the new handle on success or NULL on error in which case
* 'errstr' is set to a human readable error message.
*/
static Consumer *create (Conf *conf, std::string &errstr);
virtual ~Consumer () = 0;
/**
* Start consuming messages for topic and 'partition'
* at offset 'offset' which may either be a proper offset (0..N)
* or one of the the special offsets:
* `OFFSET_BEGINNING` or `OFFSET_END`.
*
* rdkafka will attempt to keep 'queued.min.messages' (config property)
* messages in the local queue by repeatedly fetching batches of messages
* from the broker until the threshold is reached.
*
* The application shall use one of the `..->consume*()` functions
* to consume messages from the local queue, each kafka message being
* represented as a `RdKafka::Message *` object.
*
* `..->start()` must not be called multiple times for the same
* topic and partition without stopping consumption first with
* `..->stop()`.
*
* Returns an ErrorCode to indicate success or failure.
*/
virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
/**
* Stop consuming messages for topic and 'partition', purging
* all messages currently in the local queue.
*
* The application needs to be stop all consumers before destroying
* the Consumer handle.
*
* Returns 0 on success or -1 on error (see `errno`).
*/
virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
/**
* Consume a single message from topic and 'partition'.
*
* 'timeout_ms' is maximum amount of time to wait for a message to be
* received.
* Consumer must have been previously started with `..->start()`.
*
* Returns a Message object, the application needs to check if message
* is an error or a proper message `Message->err()` and checking for
* `ERR_NO_ERROR`.
*
* The message object must be destroyed when the application is done with it.
*
* Errors (in Message->err()):
* ERR__TIMED_OUT - 'timeout_ms' was reached with no new messages fetched.
*/
virtual Message *consume (Topic *topic, int32_t partition,
int timeout_ms) = 0;
};
/**
* Producer
*/
class Producer : public virtual Handle {
public:
/**
* Creates a new Kafka producer handle.
*
* 'conf' is an optional object that will be used instead of the default
* configuration.
* The 'conf' object is reusable after this call.
*
* Returns the new handle on success or NULL on error in which case
* 'errstr' is set to a human readable error message.
*/
static Producer *create (Conf *conf, std::string &errstr);
virtual ~Producer () = 0;
/* Produce msgflags */
static const int MSG_FREE = 0x1;
static const int MSG_COPY = 0x2;
/**
* Produce and send a single message to broker.
*
* This is an asynch non-blocking API.
*
* 'partition' is the target partition, either:
* - RdKafka::Topic::PARTITION_UA (unassigned) for
* automatic partitioning using the topic's partitioner function, or
* - a fixed partition (0..N)
*
* 'msgflags' is zero or more of the following flags OR:ed together:
* MSG_FREE - rdkafka will free(3) 'payload' when it is done with it.
* MSG_COPY - the 'payload' data will be copied and the 'payload'
* pointer will not be used by rdkafka after the
* call returns.
*
* NOTE: MSG_FREE and MSG_COPY are mutually exclusive.
*
* 'payload' is the message payload of size 'len' bytes.
*
* 'key' is an optional message key, if non-NULL it
* will be passed to the topic partitioner as well as be sent with the
* message to the broker and passed on to the consumer.
*
* 'msg_opaque' is an optional application-provided per-message opaque
* pointer that will provided in the delivery report callback (`dr_cb`) for
* referencing this message.
*
* Returns an ErrorCode to indicate success or failure.
* ERR__QUEUE_FULL - maximum number of outstanding messages has been reached:
* "queue.buffering.max.message"
*
* ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size:
* "messages.max.bytes".
*
* ERR__UNKNOWN_PARTITION - requested 'partition' is unknown in the
* Kafka cluster.
*
* ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster.
*/
virtual ErrorCode produce (Topic *topic, int32_t partition,
int msgflags,
void *payload, size_t len,
const std::string *key,
void *msg_opaque) = 0;
};
};