Dynamic topic naming in unit-tests

This commit is contained in:
Magnus Edenhill 2014-08-25 15:26:58 +02:00
parent d424a13453
commit c52ac9f563
13 changed files with 125 additions and 36 deletions

View File

@ -42,11 +42,11 @@
#include "rdkafka.h" /* for Kafka driver */
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = RD_KAFKA_PARTITION_UA; /* random */
int i;
const int NUM_ITER = 100;
struct rlimit rlim = {};
const char *topic = NULL;
/*
* Put some limits to catch bad cleanups by librdkafka (issue #20)
@ -69,6 +69,9 @@ int main (int argc, char **argv) {
test_conf_init(&conf, &topic_conf, 30);
if (!topic)
topic = test_mk_topic_name("generic", 0);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!rk)

View File

@ -70,7 +70,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 99; /* non-existent */
int r;
rd_kafka_t *rk;
@ -81,6 +80,7 @@ int main (int argc, char **argv) {
char msg[128];
int msgcnt = 10;
int i;
const struct rd_kafka_metadata *metadata;
test_conf_init(&conf, &topic_conf, 10);
@ -95,7 +95,8 @@ int main (int argc, char **argv) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("generic", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));

View File

@ -70,7 +70,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
@ -100,7 +99,8 @@ int main (int argc, char **argv) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("generic", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));

View File

@ -108,7 +108,6 @@ static void conf_cmp (const char *desc,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *ignore_conf, *conf, *conf2;
@ -117,6 +116,7 @@ int main (int argc, char **argv) {
const char **arr_orig, **arr_dup;
size_t cnt_orig, cnt_dup;
int i;
const char *topic;
static const char *gconfs[] = {
"message.max.bytes", "12345", /* int property */
"client.id", "my id", /* string property */
@ -136,6 +136,8 @@ int main (int argc, char **argv) {
rd_kafka_conf_destroy(ignore_conf);
rd_kafka_topic_conf_destroy(ignore_topic_conf);
topic = test_mk_topic_name("generic", 0);
/* Set up a global config object */
conf = rd_kafka_conf_new();

View File

@ -70,7 +70,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
@ -95,7 +94,8 @@ int main (int argc, char **argv) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("generic", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));

View File

@ -72,7 +72,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char topic[64];
int partition = 0;
int r;
rd_kafka_t *rk;
@ -87,9 +86,6 @@ int main (int argc, char **argv) {
/* Generate unique topic name */
test_conf_init(&conf, &topic_conf, 10);
snprintf(topic, sizeof(topic), "rdkafkatest1_auto_%x%x",
rand(), rand());
TEST_SAY("\033[33mNOTE! This test requires "
"auto.create.topics.enable=true to be configured on "
"the broker!\033[0m\n");
@ -105,7 +101,8 @@ int main (int argc, char **argv) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("autotopic", 1),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));

View File

@ -70,7 +70,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
@ -83,6 +82,14 @@ int main (int argc, char **argv) {
int i;
int reqacks;
int idbase = 0;
const char *topic = NULL;
TEST_SAY("\033[33mNOTE! This test requires at "
"least 3 brokers!\033[0m\n");
TEST_SAY("\033[33mNOTE! This test requires "
"default.replication.factor=3 to be configured on "
"all brokers!\033[0m\n");
/* Try different request.required.acks settings (issue #75) */
for (reqacks = -1 ; reqacks <= 2 ; reqacks++) {
@ -90,6 +97,9 @@ int main (int argc, char **argv) {
test_conf_init(&conf, &topic_conf, 10);
if (!topic)
topic = test_mk_topic_name("generic", 0);
snprintf(tmp, sizeof(tmp), "%i", reqacks);
if (rd_kafka_topic_conf_set(topic_conf, "request.required.acks",

View File

@ -74,7 +74,6 @@ static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
@ -87,6 +86,7 @@ int main (int argc, char **argv) {
int i;
int idbase = 0;
int pass;
const char *topic = NULL;
for (pass = 0 ; pass < 2 ; pass++) {
int enforce;
@ -102,6 +102,10 @@ int main (int argc, char **argv) {
}
test_conf_init(&conf, &topic_conf, 10);
if (!topic)
topic = test_mk_topic_name("generic", 0);
if (rd_kafka_topic_conf_set(topic_conf,
"enforce.isr.cnt",
enforce ? "5" : "0",

View File

@ -71,7 +71,6 @@ static void dr_single_partition_cb (rd_kafka_t *rk, void *payload, size_t len,
/* Produce a batch of messages to a single partition. */
static void test_single_partition (void) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
@ -100,7 +99,8 @@ static void test_single_partition (void) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("generic", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));
@ -198,7 +198,6 @@ static void dr_partitioner_cb (rd_kafka_t *rk, void *payload, size_t len,
/* Produce a batch of messages using random (default) partitioner */
static void test_partitioner (void) {
char *topic = "rdkafkatest1";
int partition = RD_KAFKA_PARTITION_UA;
int r;
rd_kafka_t *rk;
@ -227,7 +226,8 @@ static void test_partitioner (void) {
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt = rd_kafka_topic_new(rk, test_mk_topic_name("generic", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));

View File

@ -86,6 +86,11 @@ static void produce_messages (uint64_t testid, const char *topic,
rd_kafka_conf_set_dr_cb(conf, dr_cb);
/* Make sure all replicas are in-sync after producing
* so that consume test wont fail. */
rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
errstr, sizeof(errstr));
/* Create kafka instance */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
@ -465,17 +470,21 @@ static void consume_messages_with_queues (uint64_t testid, const char *topic,
* Consume with queue interface from both, simultanously.
*/
static void test_produce_consume (void) {
const char *topic = "rdkafkatest0012";
int msgcnt = 10000;
int partition_cnt = 2;
int i;
uint64_t testid;
int msg_base = 0;
const char *topic;
/* Generate a testid so we can differentiate messages
* from other tests */
testid = test_id_generate();
/* Read test.conf to configure topic name */
test_conf_init(NULL, NULL, 20);
topic = test_mk_topic_name("0012", 0);
TEST_SAY("Topic %s, testid %"PRIu64"\n", topic, testid);
/* Produce messages */

View File

@ -37,6 +37,9 @@
int test_level = 2;
int test_seed = 0;
static char test_topic_prefix[128] = "rdkafkatest";
static int test_topic_random = 0;
static void sig_alarm (int sig) {
TEST_FAIL("Test timed out");
}
@ -65,6 +68,21 @@ static void test_init (void) {
}
const char *test_mk_topic_name (const char *suffix, int randomized) {
static __thread char ret[128];
if (test_topic_random || randomized)
snprintf(ret, sizeof(ret), "%s_%"PRIx64"_%s",
test_topic_prefix, test_id_generate(), suffix);
else
snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix);
TEST_SAY("Using topic \"%s\"\n", ret);
return ret;
}
/**
* Creates and sets up kafka configuration objects.
* Will read "test.conf" file if it exists.
@ -79,14 +97,12 @@ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
test_init();
/* Limit the test run time. */
alarm(timeout);
signal(SIGALRM, sig_alarm);
if (conf) {
*conf = rd_kafka_conf_new();
*topic_conf = rd_kafka_topic_conf_new();
*conf = rd_kafka_conf_new();
*topic_conf = rd_kafka_topic_conf_new();
rd_kafka_conf_set_error_cb(*conf, test_error_cb);
rd_kafka_conf_set_error_cb(*conf, test_error_cb);
}
/* Open and read optional local test configuration file, if any. */
if (!(fp = fopen(test_conf, "r"))) {
@ -100,7 +116,7 @@ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
while (fgets(buf, sizeof(buf)-1, fp)) {
char *t;
char *b = buf;
rd_kafka_conf_res_t res;
rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;
char *name, *val;
line++;
@ -118,15 +134,40 @@ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
*t = '\0';
val = t+1;
if (!strncmp(name, "topic.", strlen("topic."))) {
if (!strcmp(name, "test.timeout.multiplier")) {
timeout = (float)timeout * strtod(val, NULL);
res = RD_KAFKA_CONF_OK;
} else if (!strcmp(name, "test.topic.prefix")) {
strncpy(test_topic_prefix, val,
sizeof(test_topic_prefix)-1);
res = RD_KAFKA_CONF_OK;
} else if (!strcmp(name, "test.topic.random")) {
if (!strcmp(val, "true") ||
!strcmp(val, "1"))
test_topic_random = 1;
else
test_topic_random = 0;
res = RD_KAFKA_CONF_OK;
} else if (!strncmp(name, "topic.", strlen("topic."))) {
name += strlen("topic.");
res = rd_kafka_topic_conf_set(*topic_conf,
name, val,
errstr, sizeof(errstr));
} else
res = rd_kafka_conf_set(*conf,
name, val,
errstr, sizeof(errstr));
if (conf)
res = rd_kafka_topic_conf_set(*topic_conf,
name, val,
errstr,
sizeof(errstr));
else
res = RD_KAFKA_CONF_OK;
name -= strlen("topic.");
}
if (res == RD_KAFKA_CONF_UNKNOWN) {
if (conf)
res = rd_kafka_conf_set(*conf,
name, val,
errstr, sizeof(errstr));
else
res = RD_KAFKA_CONF_OK;
}
if (res != RD_KAFKA_CONF_OK)
TEST_FAIL("%s:%i: %s\n",
@ -134,6 +175,10 @@ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
}
fclose(fp);
/* Limit the test run time. */
alarm(timeout);
signal(SIGALRM, sig_alarm);
}

View File

@ -1,5 +1,21 @@
# Copy this file to test.conf and set up according to your configuration.
#
# Test configuration
#
# For slow connections: multiply test timeouts by this much (float)
#test.timeout.multiplier=3.5
# Test topic names are constructed by:
# <prefix>_<suffix>, where default topic prefix is "rdkafkatest".
# suffix is specified by the tests.
#test.topic.prefix=bib
# Make topic names random:
# <prefix>_<randomnumber>_<suffix>
#test.topic.random=true
# Bootstrap broker(s)
metadata.broker.list=localhost:9092

View File

@ -45,6 +45,8 @@ extern int test_seed;
} while(0)
const char *test_mk_topic_name (const char *suffix, int randomized);
void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf,
int timeout);