Extracted rdkafka from librd to its own library.

This commit is contained in:
Magnus Edenhill 2012-09-19 12:26:37 +02:00
parent d911d209cf
commit 96b461570a
25 changed files with 3694 additions and 3 deletions

1
.dir-locals.el Normal file
View File

@ -0,0 +1 @@
( (c-mode . ((c-file-style . "linux"))) )

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
*~
\#*
*.o
*.so
*.so.?
*.a
*.d
core

25
LICENSE Normal file
View File

@ -0,0 +1,25 @@
librdkafka - Apache Kafka C driver library
Copyright (c) 2012, Magnus Edenhill
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

23
LICENSE.pycrc Normal file
View File

@ -0,0 +1,23 @@
The following license applies to the files rdcrc32.c and rdcrc32.h which
have been generated by the pycrc tool.
============================================================================
Copyright (c) 2006-2012, Thomas Pircher <tehpeh@gmx.net>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

68
Makefile Normal file
View File

@ -0,0 +1,68 @@
LIBNAME=librdkafka
LIBVER=0
LIBVER_FULL=$(LIBVER).0.0
PREFIX?=/usr/local
# The preferred way to compile is to have a separate checkout of librd
# and link with it. If that is not desirable or possible the required librd
# functionality is included with librdkafka for compile-time inclusion.
# Define WITH_LIBRD to use an external librd, or leave undefined for the
# integrated version.
#WITH_LIBRD=1
# Use gcc as ld to avoid __stack_chk_fail_error symbol error.
LD=gcc
SRCS= rdkafka.c
ifndef WITH_LIBRD
SRCS+=rdcrc32.c rdgz.c rdaddr.c rdrand.c rdfile.c
endif
HDRS= rdkafka.h
OBJS= $(SRCS:.c=.o)
DEPS= ${OBJS:%.o=%.d}
CFLAGS+=-O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I.
CFLAGS+=-g
# Profiling
#CFLAGS+=-O0
#CFLAGS += -pg
#LDFLAGS += -pg
LDFLAGS+=-shared -g -fPIC -lpthread -lrt -lz -lc
.PHONY:
all: libs
libs: $(LIBNAME).so $(LIBNAME).a
%.o: %.c
$(CC) -MD -MP $(CFLAGS) -c $<
$(LIBNAME).so: $(OBJS)
$(LD) -shared -Wl,-soname,$(LIBNAME).so.$(LIBVER) \
$(LDFLAGS) $(OBJS) -o $@
ln -fs $(LIBNAME).so $(LIBNAME).so.$(LIBVER)
$(LIBNAME).a: $(OBJS)
$(AR) rcs $@ $(OBJS)
install:
install -d $(PREFIX)/include/librdkafka $(PREFIX)/lib
install -t $(PREFIX)/include/$(LIBNAME) $(HDRS)
install -t $(PREFIX)/lib $(LIBNAME).so
install -t $(PREFIX)/lib $(LIBNAME).so.$(LIBVER)
install -t $(PREFIX)/lib $(LIBNAME).a
clean:
rm -f $(OBJS) $(DEPS) $(LIBNAME)*.a $(LIBNAME)*.so $(LIBNAME)*.so.?
-include $(DEPS)

View File

@ -1,4 +1,72 @@
librdkafka
==========
librdkafka - Apache Kafka C client library
==========================================
Copyright (c) 2012, [Magnus Edenhill](http://www.edenhill.se/), et.al.
[https://github.com/edenhill/librdkafka](https://github.com/edenhill/librdkafka)
**librdkafka** is a C implementation of the
[Apache Kafka](http://incubator.apache.org/kafka/) protocol, containing both
Producer and Consumer support.
It currently supports Apache Kafka version 0.7.* (and possibly earlier).
ZooKeeper integration is planned but currently not available.
**librdkafka** is licensed under the 2-clause BSD license.
# Usage
## Requirements
The GNU toolchain
pthreads
zlib
## Instructions
### Building
make all
make install
# or to install in another location than /usr/local:
PREFIX=/my/prefix make install
### Usage in code
See `examples/rdkafka_example.c` for full examples of both
producer and consumer sides.
#include <librdkafka/rdkafka.h>
..
rd_kafka_t *rk;
rk = rd_kafka_new_consumer(broker, topic, partition, 0, &conf)
while (run) {
rko = rd_kafka_consume(rk, RD_POLL_INFINITE);
if (rko->rko_err)
..errhandling..
else if (rko->rko_len)
handle_message(rko->rko_payload, rko->rko_len);
}
rd_kafka_destroy(rk);
Link your program with `-lrdkafka -lz -lpthread -lrt`.
## Documentation
The API is documented in `rdkafka.h`
## Examples
See the `examples/`sub-directory.
Apache Kafka C library

1
examples/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
rdkafka_example

33
examples/Makefile Normal file
View File

@ -0,0 +1,33 @@
CC ?= cc
CFLAGS += -g
CFLAGS += -Wall -Werror -Wfloat-equal -Wpointer-arith -O2 -I../
LDFLAGS += -L../ -lrdkafka
LDFLAGS += -lpthread -lrt -lz
# Profiling
#CFLAGS += -O0 -pg
#LDFLAGS += -pg
all:
@echo "# Examples are built individually, i.e.:"
@echo " make rdkafka_example"
rdkafka_example: rdkafka_example.c
@(test $@ -nt $< || \
$(CC) $(CFLAGS) $< -o $@ $(LDFLAGS))
@echo "# $@ is ready"
@echo "#"
@echo "# Run producer (write messages on stdin)"
@echo "LD_LIBRARY_PATH=../ ./rdkafka_example -P -t <topic> -p <partition>"
@echo ""
@echo "# or consumer"
@echo "LD_LIBRARY_PATH=../ ./rdkafka_example -C -t <topic> -p <partition>"
@echo ""
@echo "#"
@echo "# More usage options:"
@echo "LD_LIBRARY_PATH=../ ./rdkafka_example --help"
clean:
rm -f rdkafka_example

225
examples/rdkafka_example.c Normal file
View File

@ -0,0 +1,225 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka consumer & producer example programs
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
/* Typical include path would be <librdkafka/rdkafkah>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
static int run = 1;
static void stop (int sig) {
run = 0;
}
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
const char *p = (const char *)ptr;
int of = 0;
if (name)
fprintf(fp, "%s hexdump (%lu bytes):\n", name, len);
for (of = 0 ; of < len ; of += 16) {
char hexen[16*3+1];
char charen[16+1];
int hof = 0;
int cof = 0;
int i;
for (i = of ; i < of + 16 && i < len ; i++) {
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
cof += sprintf(charen+cof, "%c",
isprint(p[i]) ? p[i] : '.');
}
fprintf(fp, "%08x: %-48s %-16s\n",
of, hexen, charen);
}
}
int main (int argc, char **argv) {
rd_kafka_t *rk;
char *broker = NULL;
char mode = 'C';
char *topic = NULL;
int partition = 0;
int opt;
while ((opt = getopt(argc, argv, "PCt:p:b:")) != -1) {
switch (opt) {
case 'P':
case 'C':
mode = opt;
break;
case 't':
topic = optarg;
break;
case 'p':
partition = atoi(optarg);
break;
case 'b':
broker = optarg;
break;
default:
goto usage;
}
}
if (!topic || optind != argc) {
usage:
fprintf(stderr,
"Usage: %s [-C|-P] -t <topic> "
"[-p <partition>] [-b <broker>]\n"
"\n"
" Options:\n"
" -C | -P Consumer or Producer mode\n"
" -t <topic> Topic to fetch / produce\n"
" -p <num> Partition (defaults to 0)\n"
" -b <broker> Broker address (localhost:9092)\n"
"\n"
" In Consumer mode:\n"
" writes fetched messages to stdout\n"
" In Producer mode:\n"
" reads messages from stdin and sends to broker\n"
"\n",
argv[0]);
exit(1);
}
signal(SIGINT, stop);
if (mode == 'P') {
/*
* Producer
*/
char buf[1024];
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, broker, NULL))) {
perror("kafka_new producer");
exit(1);
}
fprintf(stderr, "%% Type stuff and hit enter to send\n");
while (run && (fgets(buf, sizeof(buf), stdin))) {
int len = strlen(buf);
/* Send/Produce message. */
rd_kafka_produce(rk, topic, partition, 0, buf, len);
fprintf(stderr, "%% Sent %i bytes to topic "
"%s partition %i\n", len, topic, partition);
}
/* Destroy the handle */
rd_kafka_destroy(rk);
} else if (mode == 'C') {
/*
* Consumer
*/
rd_kafka_op_t *rko;
/* Base our configuration on the default config. */
rd_kafka_conf_t conf = rd_kafka_defaultconf;
/* The offset storage file is optional but its presence
* avoids starting all over from offset 0 again when
* the program restarts.
* ZooKeeper functionality will be implemented in future
* versions and then the offset will be stored there instead. */
conf.consumer.offset_file = "."; /* current directory */
/* Indicate to rdkafka that the application is responsible
* for storing the offset. This allows the application to
* succesfully handle a message before storing the offset.
* If this flag is not set rdkafka will store the offset
* just prior to returning the message from rd_kafka_consume().
*/
conf.flags |= RD_KAFKA_CONF_F_APP_OFFSET_STORE;
/* Use the consumer convenience function
* to create a Kafka handle. */
if (!(rk = rd_kafka_new_consumer(broker, topic,
(uint32_t)partition,
0, &conf))) {
perror("kafka_new_consumer");
exit(1);
}
while (run) {
/* Fetch an "op" which is one of:
* - a kafka message (if rko_len>0 && rko_err==0)
* - an error (if rko_err)
*/
if (!(rko = rd_kafka_consume(rk, 1000/*timeout ms*/)))
continue;
if (rko->rko_err)
fprintf(stderr, "%% Error: %.*s\n",
rko->rko_len, rko->rko_payload);
else if (rko->rko_len) {
fprintf(stderr, "%% Message with "
"next-offset %"PRIu64" is %i bytes\n",
rko->rko_offset, rko->rko_len);
hexdump(stdout, "Message",
rko->rko_payload, rko->rko_len);
}
/* rko_offset contains the offset of the _next_
* message. We store it when we're done processing
* the current message. */
if (rko->rko_offset)
rd_kafka_offset_store(rk, rko->rko_offset);
/* Destroy the op */
rd_kafka_op_destroy(rk, rko);
}
/* Destroy the handle */
rd_kafka_destroy(rk);
}
return 0;
}

37
rd.c Normal file
View File

@ -0,0 +1,37 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
void rd_init (void) {
extern void rd_thread_init (void);
extern void rd_timers_init();
rd_thread_init();
rd_timers_init();
}

109
rd.h Normal file
View File

@ -0,0 +1,109 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <alloca.h>
#include <assert.h>
#include <pthread.h>
#include "rdtypes.h"
#ifndef likely
#define likely(x) __builtin_expect((x),1)
#endif
#ifndef unlikely
#define unlikely(x) __builtin_expect((x),0)
#endif
#define RD_UNUSED __attribute__((unused))
#define RD_PACKED __attribute__((packed))
#define RD_ARRAY_SIZE(A) (sizeof((A)) / sizeof(*(A)))
#define RD_ARRAYSIZE(A) RD_ARRAY_SIZE(A)
#define RD_SIZEOF(TYPE,MEMBER) sizeof(((TYPE *)NULL)->MEMBER)
#define RD_OFFSETOF(TYPE,MEMBER) ((size_t) &(((TYPE *)NULL)->MEMBER))
/**
* Returns the 'I'th array element from static sized array 'A'
* or NULL if 'I' is out of range.
* 'PFX' is an optional prefix to provide the correct return type.
*/
#define RD_ARRAY_ELEM(A,I,PFX...) \
((unsigned int)(I) < RD_ARRAY_SIZE(A) ? PFX (A)[(I)] : NULL)
#define RD_STRINGIFY(X) # X
#define RD_MIN(a,b) ((a) < (b) ? (a) : (b))
#define RD_MAX(a,b) ((a) > (b) ? (a) : (b))
/**
* Cap an integer (of any type) to reside within the defined limit.
*/
#define RD_INT_CAP(val,low,hi) \
((val) < (low) ? low : ((val) > (hi) ? (hi) : (val)))
#define rd_atomic_add(PTR,VAL) __sync_add_and_fetch(PTR,VAL)
#define rd_atomic_sub(PTR,VAL) __sync_sub_and_fetch(PTR,VAL)
#define rd_atomic_add_prev(PTR,VAL) __sync_fetch_and_add(PTR,VAL)
#define rd_atomic_sub_prev(PTR,VAL) __sync_fetch_and_sub(PTR,VAL)
#ifndef be64toh
#include <byteswap.h>
#if __BYTE_ORDER == __BIG_ENDIAN
#define be64toh(x) (x)
#else
# if __BYTE_ORDER == __LITTLE_ENDIAN
#define be64toh(x) bswap_64(x)
# endif
#endif
#define htobe64(x) be64toh(x)
#endif
void rd_init (void);

201
rdaddr.c Normal file
View File

@ -0,0 +1,201 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdaddr.h"
#include "rdrand.h"
const char *rd_sockaddr2str (const void *addr, int flags) {
const rd_sockaddr_inx_t *a = (const rd_sockaddr_inx_t *)addr;
static __thread char ret[32][INET6_ADDRSTRLEN + 16];
static __thread int reti = 0;
char portstr[64];
int of = 0;
int niflags = NI_NUMERICSERV;
reti = (reti + 1) % 32;
switch (a->sinx_family)
{
case AF_INET:
case AF_INET6:
if (flags & RD_SOCKADDR2STR_F_FAMILY)
of += sprintf(&ret[reti][of], "ipv%i#",
a->sinx_family == AF_INET ? 4 : 6);
if ((flags & RD_SOCKADDR2STR_F_PORT) &&
a->sinx_family == AF_INET6)
ret[reti][of++] = '[';
if (!(flags & RD_SOCKADDR2STR_F_RESOLVE))
niflags |= NI_NUMERICHOST;
if (getnameinfo((const struct sockaddr *)a,
RD_SOCKADDR_INX_LEN(a),
ret[reti]+of, sizeof(ret[reti])-of,
(flags & RD_SOCKADDR2STR_F_PORT) ?
portstr : NULL,
(flags & RD_SOCKADDR2STR_F_PORT) ?
sizeof(portstr) : 0,
niflags))
break;
if (flags & RD_SOCKADDR2STR_F_PORT) {
int len = strlen(ret[reti]);
snprintf(ret[reti]+len, sizeof(ret[reti])-len,
"%s:%s",
a->sinx_family == AF_INET6 ? "]" : "",
portstr);
}
return ret[reti];
}
/* Error-case */
snprintf(ret[reti], sizeof(ret[reti]), "<unsupported:%s>",
rd_family2str(a->sinx_family));
return ret[reti];
}
const char *rd_addrinfo_prepare (const char *nodesvc,
char **node, char **svc) {
static __thread char snode[256];
static __thread char ssvc[64];
const char *t;
const char *svct = NULL;
int nodelen = 0;
*snode = '\0';
*ssvc = '\0';
if (*nodesvc == '[') {
/* "[host]".. (enveloped node name) */
if (!(t = strchr(nodesvc, ']')))
return "Missing close-']'";
nodesvc++;
nodelen = (int)(t-nodesvc);
svct = t+1;
} else if (*nodesvc == ':' && *(nodesvc+1) != ':') {
/* ":".. (port only) */
nodelen = 0;
svct = nodesvc;
}
if ((svct = strrchr(svct ? : nodesvc, ':')) && (*(svct-1) != ':') &&
*(++svct)) {
/* Optional ":service" definition. */
if (strlen(svct) >= sizeof(ssvc))
return "Service name too long";
strcpy(ssvc, svct);
if (!nodelen)
nodelen = (int)(svct - nodesvc)-1;
} else if (!nodelen)
nodelen = strlen(nodesvc);
if (nodelen) {
strncpy(snode, nodesvc, nodelen);
snode[nodelen] = '\0';
}
*node = snode;
*svc = ssvc;
return NULL;
}
rd_sockaddr_list_t *rd_getaddrinfo (const char *nodesvc, const char *defsvc,
int flags, int family,
int socktype, int protocol,
const char **errstr) {
struct addrinfo hints = { ai_family: family,
ai_socktype: socktype,
ai_protocol: protocol,
ai_flags: flags };
struct addrinfo *ais, *ai;
char *node, *svc;
int r;
int cnt = 0;
rd_sockaddr_list_t *rsal;
if ((*errstr = rd_addrinfo_prepare(nodesvc, &node, &svc))) {
errno = EINVAL;
return NULL;
}
if (*svc)
defsvc = svc;
if ((r = getaddrinfo(node, defsvc, &hints, &ais))) {
*errstr = gai_strerror(r);
errno = EFAULT;
return NULL;
}
/* Count number of addresses */
for (ai = ais ; ai != NULL ; ai = ai->ai_next)
cnt++;
if (cnt == 0) {
/* unlikely? */
freeaddrinfo(ais);
errno = ENOENT;
*errstr = "No addresses";
return NULL;
}
rsal = calloc(1, sizeof(*rsal) + (sizeof(*rsal->rsal_addr) * cnt));
for (ai = ais ; ai != NULL ; ai = ai->ai_next)
memcpy(&rsal->rsal_addr[rsal->rsal_cnt++],
ai->ai_addr, ai->ai_addrlen);
freeaddrinfo(ais);
/* Shuffle address list for proper round-robin */
if (!(flags & RD_AI_NOSHUFFLE))
rd_array_shuffle(rsal->rsal_addr, rsal->rsal_cnt,
sizeof(*rsal->rsal_addr));
return rsal;
}
void rd_sockaddr_list_destroy (rd_sockaddr_list_t *rsal) {
free(rsal);
}

183
rdaddr.h Normal file
View File

@ -0,0 +1,183 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
/**
* rd_sockaddr_inx_t is a union for either ipv4 or ipv6 sockaddrs.
* It provides conveniant abstraction of AF_INET* agnostic operations.
*/
typedef union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} rd_sockaddr_inx_t;
#define sinx_family in.sin_family
#define sinx_addr in.sin_addr
#define RD_SOCKADDR_INX_LEN(sinx) \
((sinx)->sinx_family == AF_INET ? sizeof(struct sockaddr_in) : \
(sinx)->sinx_family == AF_INET6 ? sizeof(struct sockaddr_in6): \
sizeof(rd_sockaddr_inx_t))
#define RD_SOCKADDR_INX_PORT(sinx) \
((sinx)->sinx_family == AF_INET ? (sinx)->in.sin_port : \
(sinx)->sinx_family == AF_INET6 ? (sinx)->in6.sin6_port : 0)
#define RD_SOCKADDR_INX_PORT_SET(sinx,port) do { \
if ((sinx)->sinx_family == AF_INET) \
(sinx)->in.sin_port = port; \
else if ((sinx)->sinx_family == AF_INET6) \
(sinx)->in6.sin6_port = port; \
} while (0)
/**
* Returns a thread-local temporary string (may be called up to 32 times
* without buffer wrapping) containing the human string representation
* of the sockaddr (which should be AF_INET or AF_INET6 at this point).
* If the RD_SOCKADDR2STR_F_PORT is provided the port number will be
* appended to the string.
* IPv6 address enveloping ("[addr]:port") will also be performed
* if .._F_PORT is set.
*/
#define RD_SOCKADDR2STR_F_PORT 0x1 /* Append the port. */
#define RD_SOCKADDR2STR_F_RESOLVE 0x2 /* Try to resolve address to hostname. */
#define RD_SOCKADDR2STR_F_FAMILY 0x4 /* Prepend address family. */
#define RD_SOCKADDR2STR_F_NICE /* Nice and friendly output */ \
(RD_SOCKADDR2STR_F_PORT | RD_SOCKADDR2STR_F_RESOLVE)
const char *rd_sockaddr2str (const void *addr, int flags);
/**
* Splits a node:service definition up into their node and svc counterparts
* suitable for passing to getaddrinfo().
* Returns NULL on success (and temporarily available pointers in '*node'
* and '*svc') or error string on failure.
*
* Thread-safe but returned buffers in '*node' and '*svc' are only
* usable until the next call to rd_addrinfo_prepare() in the same thread.
*/
const char *rd_addrinfo_prepare (const char *nodesvc,
char **node, char **svc);
typedef struct rd_sockaddr_list_s {
int rsal_cnt;
int rsal_curr;
rd_sockaddr_inx_t rsal_addr[0];
} rd_sockaddr_list_t;
/**
* Returns the next address from a sockaddr list and updates
* the current-index to point to it.
*
* Typical usage is for round-robin connection attempts or similar:
* while (1) {
* rd_sockaddr_inx_t *sinx = rd_sockaddr_list_next(my_server_list);
* if (do_connect((struct sockaddr *)sinx) == -1) {
* sleep(1);
* continue;
* }
* ...
* }
*
*/
static inline rd_sockaddr_inx_t *
rd_sockaddr_list_next (rd_sockaddr_list_t *rsal) RD_UNUSED;
static inline rd_sockaddr_inx_t *
rd_sockaddr_list_next (rd_sockaddr_list_t *rsal) {
rsal->rsal_curr = (rsal->rsal_curr + 1) % rsal->rsal_cnt;
return &rsal->rsal_addr[rsal->rsal_curr];
}
#define RD_SOCKADDR_LIST_FOREACH(sinx, rsal) \
for ((sinx) = &(rsal)->rsal_addr[0] ; \
(sinx) < &(rsal)->rsal_addr[(rsal)->rsal_len] ; \
(sinx)++)
/**
* Wrapper for getaddrinfo(3) that performs these additional tasks:
* - Input is a combined "<node>[:<svc>]" string, with support for
* IPv6 enveloping ("[addr]:port").
* - Returns a rd_sockaddr_list_t which must be freed with
* rd_sockaddr_list_destroy() when done with it.
* - Automatically shuffles the returned address list to provide
* round-robin (unless RD_AI_NOSHUFFLE is provided in 'flags').
*
* Thread-safe.
*/
#define RD_AI_NOSHUFFLE 0x10000000 /* Dont shuffle returned address list.
* FIXME: Guessing non-used bits like this
* is a bad idea. */
rd_sockaddr_list_t *rd_getaddrinfo (const char *nodesvc, const char *defsvc,
int flags, int family,
int socktype, int protocol,
const char **errstr);
/**
* Frees a sockaddr list.
*
* Thread-safe.
*/
void rd_sockaddr_list_destroy (rd_sockaddr_list_t *rsal);
/**
* Returns the human readable name of a socket family.
*/
static const char *rd_family2str (int af) RD_UNUSED;
static const char *rd_family2str (int af) {
static const char *names[] = {
[AF_LOCAL] = "local",
[AF_INET] = "inet",
[AF_INET6] = "inet6",
[AF_NETLINK] = "netlink",
[AF_ROUTE] = "route",
[AF_PACKET] = "packet",
[AF_BLUETOOTH] = "bluetooth",
};
if (unlikely(af >= RD_ARRAYSIZE(names))) {
static __thread char tmp[16];
snprintf(tmp, sizeof(tmp), "af-%i", af);
return tmp;
}
return names[af];
}

133
rdcrc32.c Normal file
View File

@ -0,0 +1,133 @@
/**
* \file rdcrc32.c
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:37:04 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#include "rdcrc32.h" /* include the header file generated with pycrc */
#include <stdlib.h>
#include <stdint.h>
/**
* Static table used for the table_driven implementation.
*****************************************************************************/
static const rd_crc32_t crc_table[256] = {
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d
};
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len)
{
unsigned int i;
rd_crc32_t ret;
ret = data & 0x01;
for (i = 1; i < data_len; i++) {
data >>= 1;
ret = (ret << 1) | (data & 0x01);
}
return ret;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len)
{
unsigned int tbl_idx;
while (data_len--) {
tbl_idx = (crc ^ *data) & 0xff;
crc = (crc_table[tbl_idx] ^ (crc >> 8)) & 0xffffffff;
data++;
}
return crc & 0xffffffff;
}

103
rdcrc32.h Normal file
View File

@ -0,0 +1,103 @@
/**
* \file rdcrc32.h
* Functions and types for CRC checks.
*
* Generated on Tue May 8 17:36:59 2012,
* by pycrc v0.7.10, http://www.tty1.net/pycrc/
*
* NOTE: Contains librd modifications:
* - rd_crc32() helper.
* - __RDCRC32___H__ define (was missing the '32' part).
*
* using the configuration:
* Width = 32
* Poly = 0x04c11db7
* XorIn = 0xffffffff
* ReflectIn = True
* XorOut = 0xffffffff
* ReflectOut = True
* Algorithm = table-driven
*****************************************************************************/
#ifndef __RDCRC32___H__
#define __RDCRC32___H__
#include <stdlib.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* The definition of the used algorithm.
*****************************************************************************/
#define CRC_ALGO_TABLE_DRIVEN 1
/**
* The type of the CRC values.
*
* This type must be big enough to contain at least 32 bits.
*****************************************************************************/
typedef uint32_t rd_crc32_t;
/**
* Reflect all bits of a \a data word of \a data_len bytes.
*
* \param data The data word to be reflected.
* \param data_len The width of \a data expressed in number of bits.
* \return The reflected data.
*****************************************************************************/
rd_crc32_t rd_crc32_reflect(rd_crc32_t data, size_t data_len);
/**
* Calculate the initial crc value.
*
* \return The initial crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_init(void)
{
return 0xffffffff;
}
/**
* Update the crc value with new data.
*
* \param crc The current crc value.
* \param data Pointer to a buffer of \a data_len bytes.
* \param data_len Number of bytes in the \a data buffer.
* \return The updated crc value.
*****************************************************************************/
rd_crc32_t rd_crc32_update(rd_crc32_t crc, const unsigned char *data, size_t data_len);
/**
* Calculate the final crc value.
*
* \param crc The current crc value.
* \return The final crc value.
*****************************************************************************/
static inline rd_crc32_t rd_crc32_finalize(rd_crc32_t crc)
{
return crc ^ 0xffffffff;
}
/**
* Wrapper for performing CRC32 on the provided buffer.
*/
static inline rd_crc32_t rd_crc32 (const char *data, size_t data_len) {
return rd_crc32_finalize(rd_crc32_update(rd_crc32_init(),
(const unsigned char *)data,
data_len));
}
#ifdef __cplusplus
} /* closing brace for extern "C" */
#endif
#endif /* __RDCRC32___H__ */

139
rdfile.c Normal file
View File

@ -0,0 +1,139 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdfile.h"
const char *rd_basename (const char *path) {
const char *t = path;
const char *t2 = t;
while ((t = strchr(t, '/')))
t2 = ++t;
return t2;
}
const char *rd_pwd (void) {
static __thread char path[PATH_MAX];
if (!getcwd(path, sizeof(path)-1))
return NULL;
return path;
}
ssize_t rd_file_size (const char *path) {
struct stat st;
if (stat(path, &st) == -1)
return (ssize_t)-1;
return st.st_size;
}
ssize_t rd_file_size_fd (int fd) {
struct stat st;
if (fstat(fd, &st) == -1)
return (ssize_t)-1;
return st.st_size;
}
mode_t rd_file_mode (const char *path) {
struct stat st;
if (stat(path, &st) == -1)
return 0;
return st.st_mode;
}
char *rd_file_read (const char *path, int *lenp) {
char *buf;
int fd;
ssize_t size;
int r;
if ((fd = open(path, O_RDONLY)) == -1)
return NULL;
if ((size = rd_file_size_fd(fd)) == -1) {
close(fd);
return NULL;
}
if (!(buf = malloc(size+1))) {
close(fd);
return NULL;
}
if ((r = read(fd, buf, size)) == -1) {
close(fd);
free(buf);
return NULL;
}
buf[r] = '\0';
if (lenp)
*lenp = r;
return buf;
}
int rd_file_write (const char *path, const char *buf, int len,
int flags, mode_t mode) {
int fd;
int r;
int of = 0;
if ((fd = open(path, O_CREAT|O_WRONLY|flags, mode)) == -1)
return -1;
while (of < len) {
if ((r = write(fd, buf+of, RD_MIN(16384, len - of))) == -1) {
close(fd);
return -1;
}
of += r;
}
close(fd);
return 0;
}

88
rdfile.h Normal file
View File

@ -0,0 +1,88 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <unistd.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
/**
* Behaves pretty much like basename(3) but does not alter the
* input string in any way.
*/
const char *rd_basename (const char *path);
/**
* Returns the current directory in a static buffer.
*/
const char *rd_pwd (void);
/**
* Returns the size of the file, or -1 on failure.
*/
ssize_t rd_file_size (const char *path);
/**
* Returns the size of the file already opened, or -1 on failure.
*/
ssize_t rd_file_size_fd (int fd);
/**
* Performs stat(2) on 'path' and returns 'struct stat.st_mode' on success
* or 0 on failure.
*
* Example usage:
* if (S_ISDIR(rd_file_mode(mypath)))
* ..
*/
mode_t rd_file_mode (const char *path);
/**
* Opens the specified file and reads the entire content into a malloced
* buffer which is null-terminated. The actual length of the buffer, without
* the conveniant null-terminator, is returned in '*lenp'.
* The buffer is returned, or NULL on failure.
*/
char *rd_file_read (const char *path, int *lenp);
/**
* Writes 'buf' of 'len' bytes to 'path'.
* Hint: Use O_APPEND or O_TRUNC in 'flags'.
* Returns 0 on success or -1 on error.
* See open(2) for more info.
*/
int rd_file_write (const char *path, const char *buf, int len,
int flags, mode_t mode);

121
rdgz.c Normal file
View File

@ -0,0 +1,121 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdgz.h"
#include <zlib.h>
#define RD_GZ_CHUNK 262144
void *rd_gz_decompress (void *compressed, int compressed_len,
uint64_t *decompressed_lenp) {
int pass = 1;
char *decompressed = NULL;
/* First pass (1): calculate decompressed size.
* (pass-1 is skipped if *decompressed_lenp is
* non-zero).
* Second pass (2): perform actual decompression.
*/
if (*decompressed_lenp != 0LLU)
pass++;
for (; pass <= 2 ; pass++) {
z_stream strm = {};
gz_header hdr;
char buf[512];
char *p;
int len;
int r;
if ((r = inflateInit2(&strm, 15+32)) != Z_OK)
goto fail;
strm.next_in = compressed;
strm.avail_in = compressed_len;
if ((r = inflateGetHeader(&strm, &hdr)) != Z_OK) {
inflateEnd(&strm);
goto fail;
}
if (pass == 1) {
/* Use dummy output buffer */
p = buf;
len = sizeof(buf);
} else {
/* Use real output buffer */
p = decompressed;
len = *decompressed_lenp;
}
do {
strm.next_out = (unsigned char *)p;
strm.avail_out = len;
r = inflate(&strm, Z_NO_FLUSH);
switch (r) {
case Z_STREAM_ERROR:
case Z_NEED_DICT:
case Z_DATA_ERROR:
case Z_MEM_ERROR:
inflateEnd(&strm);
goto fail;
}
p += len - strm.avail_out;
len -= len - strm.avail_out;
} while (strm.avail_out == 0 && r != Z_STREAM_END);
if (pass == 1) {
*decompressed_lenp = strm.total_out;
if (!(decompressed = malloc(*decompressed_lenp+1))) {
inflateEnd(&strm);
return NULL;
}
/* For convenience of the caller we nul-terminate
* the buffer. If it happens to be a string there
* is no need for extra copies. */
decompressed[*decompressed_lenp] = '\0';
}
inflateEnd(&strm);
}
return decompressed;
fail:
if (decompressed)
free(decompressed);
return NULL;
}

42
rdgz.h Normal file
View File

@ -0,0 +1,42 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
/**
* Simple gzip decompression returning the inflated data
* in a malloced buffer.
* '*decompressed_lenp' must be 0 if the length of the uncompressed data
* is not known in which case it will be calculated.
* The returned buffer is nul-terminated (the actual allocated length
* is '*decompressed_lenp'+1.
*
* The decompressed length is returned in '*decompressed_lenp'.
*/
void *rd_gz_decompress (void *compressed, int compressed_len,
uint64_t *decompressed_lenp);

1334
rdkafka.c Normal file

File diff suppressed because it is too large Load Diff

518
rdkafka.h Normal file
View File

@ -0,0 +1,518 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/queue.h>
#ifndef WITH_LIBRD
#include "rd.h"
#include "rdaddr.h"
#define RD_POLL_INFINITE -1
#define RD_POLL_NOWAIT 0
#else
#include <librd/rd.h>
#include <librd/rdaddr.h>
#endif
#define RD_KAFKA_TOPIC_MAXLEN 256
typedef enum {
RD_KAFKA_PRODUCER,
RD_KAFKA_CONSUMER,
} rd_kafka_type_t;
typedef enum {
RD_KAFKA_STATE_DOWN,
RD_KAFKA_STATE_CONNECTING,
RD_KAFKA_STATE_UP,
} rd_kafka_state_t;
typedef enum {
/* Internal errors to rdkafka: */
RD_KAFKA_RESP_ERR__BAD_MSG = -199,
RD_KAFKA_RESP_ERR__BAD_COMPRESSION = -198,
RD_KAFKA_RESP_ERR__FAIL = -197, /* See rko_payload for error string */
/* Standard Kafka errors: */
RD_KAFKA_RESP_ERR_UNKNOWN = -1,
RD_KAFKA_RESP_ERR_NO_ERROR = 0,
RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE = 1,
RD_KAFKA_RESP_ERR_INVALID_MSG = 2,
RD_KAFKA_RESP_ERR_WRONG_PARTITION = 3,
RD_KAFKA_RESP_ERR_INVALID_FETCH_SIZE = 4,
} rd_kafka_resp_err_t;
/**
* Optional configuration struct passed to rd_kafka_new*().
* See head of rdkafka.c for defaults.
* See comment below for rd_kafka_defaultconf use.
*/
typedef struct rd_kafka_conf_s {
int max_msg_size; /* Maximum receive message size.
* This is a safety precaution to
* avoid memory exhaustion in case of
* protocol hickups. */
int flags;
#define RD_KAFKA_CONF_F_APP_OFFSET_STORE 0x1 /* No automatic offset storage
* will be performed. The
* application needs to
* call rd_kafka_offset_store()
* explicitly.
* This may be used to make sure
* a message is properly handled
* before storing the offset.
* If not set, and an offset
* storage is available, the
* offset will be stored
* just prior to passing the
* message to the application.*/
struct {
int poll_interval; /* Time in milliseconds to sleep before
* trying to FETCH again if the broker
* did not return any messages for
* the last FETCH call.
* I.e.: idle poll interval. */
int replyq_low_thres; /* The low water threshold for the
* reply queue.
* I.e.: how many messages we'll try
* to keep in the reply queue at any
* given time.
* The reply queue is the queue of
* read messages from the broker
* that are still to be passed to
* the application. */
uint32_t max_size; /* The maximum size to be returned
* by FETCH. */
char *offset_file; /* File to read/store current
* offset from/in.
* If the path is a directory then a
* filename is generated (including
* the topic and partition) and
* appended. */
int offset_file_flags; /* open(2) flags. */
#define RD_KAFKA_OFFSET_FILE_FLAGMASK (O_SYNC|O_ASYNC)
/* For internal use.
* Use the rd_kafka_new_consumer() API instead. */
char *topic; /* Topic to consume. */
uint32_t partition; /* Partition to consume. */
uint64_t offset; /* Initial offset. */
} consumer;
} rd_kafka_conf_t;
typedef enum {
RD_KAFKA_OP_PRODUCE, /* Application -> Kafka thread */
RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */
RD_KAFKA_OP_ERR, /* Kafka thread -> Application */
} rd_kafka_op_type_t;
typedef struct rd_kafka_op_s {
TAILQ_ENTRY(rd_kafka_op_s) rko_link;
rd_kafka_op_type_t rko_type;
char *rko_topic;
uint32_t rko_partition;
int rko_flags;
#define RD_KAFKA_OP_F_FREE 0x1 /* Free the payload when done with it. */
#define RD_KAFKA_OP_F_FREE_TOPIC 0x2 /* Free the topic when done with it. */
/* For PRODUCE and ERR */
char *rko_payload;
int rko_len;
/* For FETCH */
uint64_t rko_offset;
#define rko_max_size rko_len
/* For replies */
rd_kafka_resp_err_t rko_err;
int8_t rko_compression;
int64_t rko_offset_len; /* Length to use to advance the offset. */
} rd_kafka_op_t;
typedef struct rd_kafka_q_s {
pthread_mutex_t rkq_lock;
pthread_cond_t rkq_cond;
TAILQ_HEAD(, rd_kafka_op_s) rkq_q;
int rkq_qlen;
} rd_kafka_q_t;
/**
* Kafka handle.
*/
typedef struct rd_kafka_s {
rd_kafka_q_t rk_op; /* application -> kafka operation queue */
rd_kafka_q_t rk_rep; /* kafka -> application reply queue */
struct {
char name[128];
rd_sockaddr_list_t *rsal;
int curr_addr;
int s; /* TCP socket */
struct {
uint64_t tx_bytes;
uint64_t tx; /* Kafka-messages (not payload msgs) */
uint64_t rx_bytes;
uint64_t rx; /* Kafka messages (not payload msgs) */
} stats;
} rk_broker;
rd_kafka_conf_t rk_conf;
int rk_flags;
int rk_terminate;
pthread_t rk_thread;
pthread_mutex_t rk_lock;
int rk_refcnt;
rd_kafka_type_t rk_type;
rd_kafka_state_t rk_state;
struct timeval rk_tv_state_change;
union {
struct {
char *topic;
uint32_t partition;
uint64_t offset;
uint64_t app_offset;
int offset_file_fd;
} consumer;
} rk_u;
#define rk_consumer rk_u.consumer
struct {
char msg[512];
int err; /* errno */
} rk_err;
} rd_kafka_t;
/**
* Accessor functions.
*
* Locality: any thread
*/
#define rd_kafka_name(rk) ((rk)->rk_broker.name)
#define rd_kafka_state(rk) ((rk)->rk_state)
/**
* Destroy the Kafka handle.
*
* Locality: application thread
*/
void rd_kafka_destroy (rd_kafka_t *rk);
/**
* Creates a new Kafka handle and starts its operation according to the
* specified 'type'.
*
* The 'broker' argument depicts the address to the Kafka broker (sorry,
* no ZooKeeper support at this point) in the standard "<host>[:<port>]" format
*
* If 'broker' is NULL it defaults to "localhost:9092".
*
* If the 'broker' node name resolves to multiple addresses (and possibly
* address families) all will be used for connection attempts in
* round-robin fashion.
*
* 'conf' is an optional struct that will be copied to replace rdkafka's
* default configuration. See the 'rd_kafka_conf_t' type for more information.
*
*
* Returns the Kafka handle.
*
* To destroy the Kafka handle, use rd_kafka_destroy().
*
* Locality: application thread
*/
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, const char *broker,
const rd_kafka_conf_t *conf);
/**
* Creates a new Kafka consumer handle and sets it up for fetching messages
* from 'topic' + 'partion', beginning at 'offset'.
*
* If 'conf->consumer.offset_file' is non-NULL then the 'offset' parameter is
* ignored and the file's offset is used instead.
*
* Returns the Kafka handle.
*
* To destroy the Kafka handle, use rd_kafka_destroy().
*
* Locality: application thread
*/
rd_kafka_t *rd_kafka_new_consumer (const char *broker,
const char *topic,
uint32_t partition,
uint64_t offset,
const rd_kafka_conf_t *conf);
/**
* Fetches kafka messages from the internal reply queue that the kafka
* thread tries to keep populated.
*
* Will block until 'timeout_ms' expires (milliseconds, RD_POLL_NOWAIT or
* RD_POLL_INFINITE) or until a message is returned.
*
* The caller must check the reply's rko_err (RD_KAFKA_ERR_*) to distinguish
* between errors and actual data messages.
*
* Communication failure propagation:
* If rko_err is RD_KAFKA_ERR__FAIL it means a critical error has occured
* and the connection to the broker has been torn down. The application
* does not need to take any action but should log the contents of
* rko->rko_payload.
*
* Returns NULL on timeout or an 'rd_kafka_op_t *' reply on success.
*
* Locality: application thread
*/
rd_kafka_op_t *rd_kafka_consume (rd_kafka_t *rk, int timeout_ms);
/**
* Stores the current offset in whatever storage the handle has defined.
* Must only be called by the application if RD_KAFKA_CONF_F_APP_OFFSET_STORE
* is set in conf.flags.
*
* Locality: any thread
*/
int rd_kafka_offset_store (rd_kafka_t *rk, uint64_t offset);
/**
* Produce and send a single message to the broker.
*
* Locality: application thread
*/
void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
int msgflags, char *payload, size_t len);
/**
* Destroys an op as returned by rd_kafka_consume().
*
* Locality: any thread
*/
void rd_kafka_op_destroy (rd_kafka_t *rk, rd_kafka_op_t *rko);
/**
* Returns a human readable representation of a kafka error.
*/
const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
/**
* Returns the current out queue length (ops waiting to be sent to the broker).
*
* Locality: any thread
*/
static inline int rd_kafka_outq_len (rd_kafka_t *rk) __attribute__((unused));
static inline int rd_kafka_outq_len (rd_kafka_t *rk) {
return rk->rk_op.rkq_qlen;
}
/**
* Returns the current reply queue length (messages from the broker waiting
* for the application thread to consume).
*
* Locality: any thread
*/
static inline int rd_kafka_replyq_len (rd_kafka_t *rk) __attribute__((unused));
static inline int rd_kafka_replyq_len (rd_kafka_t *rk) {
return rk->rk_rep.rkq_qlen;
}
/**
* The default configuration.
* When providing your own configuration to the rd_kafka_new_*() calls
* its advisable to base it on this default configuration and only
* change the relevant parts.
* I.e.:
*
* rd_kafka_conf_t myconf = rd_kafka_defaultconf;
* myconf.consumer.offset_file = "/var/kafka/offsets/";
* rk = rd_kafka_new_consumer(, ... &myconf);
*/
extern const rd_kafka_conf_t rd_kafka_defaultconf;
/**
* Builtin (default) log sink: print to stderr
*/
void rd_kafka_log_print (const rd_kafka_t *rk, int level,
const char *fac, const char *buf);
/**
* Builtin log sink: print to syslog.
*/
void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
const char *fac, const char *buf);
/**
* Set logger function.
* The default is to print to stderr, but a syslog is also available,
* see rd_kafka_log_(print|syslog) for the builtin alternatives.
* Alternatively the application may provide its own logger callback.
* Or pass 'func' as NULL to disable logging.
*
* NOTE: 'rk' may be passed as NULL.
*/
void rd_kafka_set_logger (void (*func) (const rd_kafka_t *rk, int level,
const char *fac, const char *buf));
#ifdef NEED_RD_KAFKAPROTO_DEF
/*
* Kafka protocol definitions.
* This is kept as an opt-in ifdef-space to avoid name space cluttering
* for the application while still keeping the implementation to
* just two files for easy inclusion in applications in case the library
* variant is not desired.
*/
#define RD_KAFKA_PORT 9092
#define RD_KAFKA_PORT_STR "9092"
/**
* Generic Request header.
*/
struct rd_kafkap_req {
uint32_t rkpr_len;
uint16_t rkpr_type;
#define RD_KAFKAP_PRODUCE 0
#define RD_KAFKAP_FETCH 1
#define RD_KAFKAP_MULTIFETCH 2
#define RD_KAFKAP_MULTIPRODUCE 3
#define RD_KAFKAP_OFFSETS 4
uint16_t rkpr_topic_len;
char rkpr_topic[0]; /* TOPIC and PARTITION follows */
} RD_PACKED;
/**
* Generic Multi-Request header.
*/
struct rd_kafkap_multireq {
uint32_t rkpmr_len;
uint16_t rkpmr_type;
uint16_t rkpmr_topicpart_cnt;
uint32_t rkpr_topic_len;
char rkpr_topic[0]; /* TOPIC and PARTITION follows */
} RD_PACKED;
/**
* Generic Response header.
*/
struct rd_kafkap_resp {
uint32_t rkprp_len;
int16_t rkprp_error; /* rd_kafka_resp_err_t */
} RD_PACKED;
/**
* MESSAGE header
*/
struct rd_kafkap_msg {
uint32_t rkpm_len;
uint8_t rkpm_magic;
#define RD_KAFKAP_MSG_MAGIC_NO_COMPRESSION_ATTR 0 /* Not supported. */
#define RD_KAFKAP_MSG_MAGIC_COMPRESSION_ATTR 1
uint8_t rkpm_compression;
#define RD_KAFKAP_MSG_COMPRESSION_NONE 0
#define RD_KAFKAP_MSG_COMPRESSION_GZIP 1
#define RD_KAFKAP_MSG_COMPRESSION_SNAPPY 2
uint32_t rkpm_cksum;
char rkpm_payload[0];
} RD_PACKED;
/**
* PRODUCE header, directly follows the request header.
*/
struct rd_kafkap_produce {
uint32_t rkpp_msgs_len;
struct rd_kafkap_msg rkpp_msgs[0];
} RD_PACKED;
/**
* FETCH request header, directly follows the request header.
*/
struct rd_kafkap_fetch_req {
uint64_t rkpfr_offset;
uint32_t rkpfr_max_size;
} RD_PACKED;
/**
* FETCH response header, directly follows the response header.
*/
struct rd_kafkap_fetch_resp {
struct rd_kafkap_msg rkpfrp_msgs[0];
} RD_PACKED;
/**
* Helper struct containing a protocol-encoded topic+partition.
*/
struct rd_kafkap_topicpart {
int rkptp_len;
char rkptp_buf[0];
};
#endif /* NEED_KAFKAPROTO_DEF */

50
rdrand.c Normal file
View File

@ -0,0 +1,50 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdrand.h"
void rd_array_shuffle (void *base, size_t nmemb, size_t entry_size) {
int i;
void *tmp = alloca(entry_size);
/* FIXME: Optimized version for word-sized entries. */
for (i = nmemb - 1 ; i > 0 ; i--) {
int j = rd_jitter(0, i);
if (unlikely(i == j))
continue;
memcpy(tmp, (char *)base + (i*entry_size), entry_size);
memcpy((char *)base+(i*entry_size),
(char *)base+(j*entry_size), entry_size);
memcpy((char *)base+(j*entry_size), tmp, entry_size);
}
}

45
rdrand.h Normal file
View File

@ -0,0 +1,45 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
/**
* Returns a random (using rand(3)) number between 'low'..'high' (inclusive).
*/
static inline int rd_jitter (int low, int high) RD_UNUSED;
static inline int rd_jitter (int low, int high) {
return (low + (rand() % (high+1)));
}
/**
* Shuffles (randomizes) an array using the modern Fisher-Yates algorithm.
*/
void rd_array_shuffle (void *base, size_t nmemb, size_t entry_size);

89
rdtime.h Normal file
View File

@ -0,0 +1,89 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#ifndef TIMEVAL_TO_TIMESPEC
#define TIMEVAL_TO_TIMESPEC(tv,ts) do { \
(ts)->tv_sec = (tv)->tv_sec; \
(ts)->tv_nsec = (tv)->tv_usec * 1000; \
} while (0)
#define TIMESPEC_TO_TIMEVAL(tv, ts) do { \
(tv)->tv_sec = (ts)->tv_sec; \
(tv)->tv_usec = (ts)->tv_nsec / 1000; \
} while (0)
#endif
#define TIMESPEC_TO_TS(ts) \
(((rd_ts_t)(ts)->tv_sec * 1000000LLU) + ((ts)->tv_nsec / 1000))
#define TS_TO_TIMESPEC(ts,tsx) do { \
(ts)->tv_sec = (tsx) / 1000000; \
(ts)->tv_nsec = ((tsx) % 1000000) * 1000; \
if ((ts)->tv_nsec > 1000000000LLU) { \
(ts)->tv_sec++; \
(ts)->tv_nsec -= 1000000000LLU; \
} \
} while (0)
#define TIMESPEC_CLEAR(ts) ((ts)->tv_sec = (ts)->tv_nsec = 0LLU)
static inline rd_ts_t rd_clock (void) RD_UNUSED;
static inline rd_ts_t rd_clock (void) {
#ifdef __APPLE__
/* No monotonic clock on Darwin */
struct timeval tv;
gettimeofday(&tv, NULL);
TIMEVAL_TO_TIMESPEC(&tv, &ts);
return ((rd_ts_t)tv.tv_sec * 1000000LLU) + (rd_ts_t)tv.tv_usec;
#else
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ((rd_ts_t)ts.tv_sec * 1000000LLU) +
((rd_ts_t)ts.tv_nsec / 1000LLU);
#endif
}
/**
* Thread-safe version of ctime() that strips the trailing newline.
*/
static inline const char *rd_ctime (const time_t *t) RD_UNUSED;
static inline const char *rd_ctime (const time_t *t) {
static __thread char ret[27];
ctime_r(t, ret);
ret[25] = '\0';
return ret;
}

47
rdtypes.h Normal file
View File

@ -0,0 +1,47 @@
/*
* librd - Rapid Development C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <inttypes.h>
/*
* Fundamental types
*/
typedef pthread_mutex_t rd_mutex_t;
typedef pthread_rwlock_t rd_rwlock_t;
typedef pthread_cond_t rd_cond_t;
/* Timestamp (microseconds) */
typedef uint64_t rd_ts_t;