Merge remote-tracking branch 'upstream/wip/PART-3886'

Conflicts:
	hermann.iml
This commit is contained in:
Stan Campbell 2014-08-19 14:18:55 -07:00
commit d51050c3a6
5 changed files with 192 additions and 201 deletions

View File

@ -1,157 +0,0 @@
SHELL = /bin/sh
#### Start of system configuration section. ####
srcdir = .
topdir = /Users/scampbell/.rbenv/versions/1.8.7-p375/lib/ruby/1.8/i686-darwin13.2.0
hdrdir = $(topdir)
VPATH = $(srcdir):$(topdir):$(hdrdir)
prefix = $(DESTDIR)/Users/scampbell/.rbenv/versions/1.8.7-p375
exec_prefix = $(prefix)
sitedir = $(libdir)/ruby/site_ruby
archdir = $(rubylibdir)/$(arch)
sitelibdir = $(sitedir)/$(ruby_version)
vendorlibdir = $(vendordir)/$(ruby_version)
sbindir = $(exec_prefix)/sbin
localedir = $(datarootdir)/locale
datarootdir = $(prefix)/share
includedir = $(prefix)/include
localstatedir = $(prefix)/var
htmldir = $(docdir)
infodir = $(datarootdir)/info
docdir = $(datarootdir)/doc/$(PACKAGE)
pdfdir = $(docdir)
sitearchdir = $(sitelibdir)/$(sitearch)
libexecdir = $(exec_prefix)/libexec
bindir = $(exec_prefix)/bin
libdir = $(exec_prefix)/lib
psdir = $(docdir)
sysconfdir = $(prefix)/etc
oldincludedir = $(DESTDIR)/usr/include
rubylibdir = $(libdir)/ruby/$(ruby_version)
vendordir = $(libdir)/ruby/vendor_ruby
dvidir = $(docdir)
mandir = $(datarootdir)/man
sharedstatedir = $(prefix)/com
datadir = $(datarootdir)
vendorarchdir = $(vendorlibdir)/$(sitearch)
CC = /usr/local/bin/gcc-4.2
LIBRUBY = $(LIBRUBY_A)
LIBRUBY_A = lib$(RUBY_SO_NAME)-static.a
LIBRUBYARG_SHARED =
LIBRUBYARG_STATIC = -l$(RUBY_SO_NAME)-static
RUBY_EXTCONF_H =
CFLAGS = -fno-common -O3 -Wno-error=shorten-64-to-32 -pipe -fno-common $(cflags)
INCFLAGS = -I. -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/lib/ruby/1.8/i686-darwin13.2.0 -I.
DEFS =
CPPFLAGS = -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include
CXXFLAGS = $(CFLAGS)
ldflags = -L. -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib
dldflags =
archflag =
DLDFLAGS = $(ldflags) $(dldflags) $(archflag)
LDSHARED = cc -dynamic -bundle -undefined suppress -flat_namespace
AR = ar
EXEEXT =
RUBY_INSTALL_NAME = ruby
RUBY_SO_NAME = ruby
arch = i686-darwin13.2.0
sitearch = i686-darwin13.2.0
ruby_version = 1.8
ruby = /Users/scampbell/.rbenv/versions/1.8.7-p375/bin/ruby
RUBY = $(ruby)
RM = rm -f
MAKEDIRS = mkdir -p
INSTALL = /usr/bin/install -c
INSTALL_PROG = $(INSTALL) -m 0755
INSTALL_DATA = $(INSTALL) -m 644
COPY = cp
#### End of system configuration section. ####
preload =
libpath = . $(libdir) /Users/scampbell/.rbenv/versions/1.8.7-p375/lib
LIBPATH = -L. -L$(libdir) -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib
DEFFILE =
CLEANFILES = mkmf.log
DISTCLEANFILES =
extout =
extout_prefix =
target_prefix = /hermann
LOCAL_LIBS =
LIBS = -lrdkafka -ldl -lobjc
SRCS = hermann_lib.c
OBJS = hermann_lib.o
TARGET = hermann_lib
DLLIB = $(TARGET).bundle
EXTSTATIC =
STATIC_LIB =
BINDIR = $(bindir)
RUBYCOMMONDIR = $(sitedir)$(target_prefix)
RUBYLIBDIR = $(sitelibdir)$(target_prefix)
RUBYARCHDIR = $(sitearchdir)$(target_prefix)
TARGET_SO = $(DLLIB)
CLEANLIBS = $(TARGET).bundle $(TARGET).il? $(TARGET).tds $(TARGET).map
CLEANOBJS = *.o *.a *.s[ol] *.pdb *.exp *.bak
all: $(DLLIB)
static: $(STATIC_LIB)
clean:
@-$(RM) $(CLEANLIBS) $(CLEANOBJS) $(CLEANFILES)
distclean: clean
@-$(RM) Makefile $(RUBY_EXTCONF_H) conftest.* mkmf.log
@-$(RM) core ruby$(EXEEXT) *~ $(DISTCLEANFILES)
realclean: distclean
install: install-so install-rb
install-so: $(RUBYARCHDIR)
install-so: $(RUBYARCHDIR)/$(DLLIB)
$(RUBYARCHDIR)/$(DLLIB): $(DLLIB)
$(INSTALL_PROG) $(DLLIB) $(RUBYARCHDIR)
install-rb: pre-install-rb install-rb-default
install-rb-default: pre-install-rb-default
pre-install-rb: Makefile
pre-install-rb-default: Makefile
$(RUBYARCHDIR):
$(MAKEDIRS) $@
site-install: site-install-so site-install-rb
site-install-so: install-so
site-install-rb: install-rb
.SUFFIXES: .c .m .cc .cxx .cpp .C .o
.cc.o:
$(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) -c $<
.cxx.o:
$(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) -c $<
.cpp.o:
$(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) -c $<
.C.o:
$(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) -c $<
.c.o:
$(CC) $(INCFLAGS) $(CPPFLAGS) $(CFLAGS) -c $<
$(DLLIB): $(OBJS) Makefile
@-$(RM) $@
$(LDSHARED) -o $@ $(OBJS) $(LIBPATH) $(DLDFLAGS) $(LOCAL_LIBS) $(LIBS)
$(OBJS): ruby.h defines.h

View File

@ -46,6 +46,50 @@ void log_debug(char* msg) {
}
}
/**
* Convenience function
*
* @param config HermannInstanceConfig
* @param outputStream FILE*
*
* Log the contents of the configuration to the provided stream.
*/
void fprintf_hermann_instance_config(HermannInstanceConfig* config, FILE* outputStream) {
const char* topic;
const char* brokers;
int isRkSet;
int isRktSet;
int partition;
int isInitialized;
if(config==NULL) {
fprintf(outputStream, "NULL configuration");
} else {
isRkSet = config->rk != NULL;
isRktSet = config->rkt != NULL;
if(config->topic == NULL) {
topic = NULL;
} else {
topic = config->topic;
}
if(config->brokers == NULL) {
brokers = "NULL";
} else {
brokers = config->brokers;
}
partition = config->partition;
isInitialized = config->isInitialized;
}
fprintf(outputStream, "{ topic: %s, brokers: %s, partition: %d, isInitialized: %d, rkSet: %d, rkTSet: %d }\n",
topic, brokers, partition, isInitialized, isRkSet, isRktSet );
}
/**
* Message delivery report callback.
* Called once for each message.
@ -226,7 +270,11 @@ static void logger (const rd_kafka_t *rk, int level,
*
* @param config HermannInstanceConfig* pointer to the instance configuration for this producer or consumer
*/
void consumer_init_kafka(HermannInstanceConfig* config) {
void consumer_init_kafka(HermannInstanceConfig* config)
{
#ifdef TRACE
fprintf(stderr, "consumer_init_kafka");
#endif
config->quiet = !isatty(STDIN_FILENO);
@ -271,6 +319,10 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
static void consumer_consume_stop_callback(void *ptr) {
HermannInstanceConfig* config = (HermannInstanceConfig*)ptr;
#ifdef TRACE
fprintf(stderr, "consumer_consume_stop_callback");
#endif
config->run = 0;
}
@ -280,6 +332,10 @@ static void consumer_consume_stop_callback(void *ptr) {
*/
void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
#ifdef TRACE
fprintf(stderr, "consumer_consume_loop");
#endif
while (consumerConfig->run) {
rd_kafka_message_t *rkmessage;
@ -304,6 +360,10 @@ static VALUE consumer_consume(VALUE self) {
HermannInstanceConfig* consumerConfig;
#ifdef TRACE
fprintf(stderr, "consumer_consume");
#endif
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
if(consumerConfig->topic==NULL) {
@ -352,6 +412,10 @@ static VALUE consumer_consume(VALUE self) {
*/
void producer_init_kafka(HermannInstanceConfig* config) {
#ifdef TRACE
fprintf(stderr, "producer_init_kafka\n");
#endif
config->quiet = !isatty(STDIN_FILENO);
/* Kafka configuration */
@ -389,6 +453,11 @@ void producer_init_kafka(HermannInstanceConfig* config) {
/* We're now initialized */
config->isInitialized = 1;
#ifdef TRACE
fprintf(stderr, "producer_init_kafka::END\n");
fprintf_hermann_instance_config(config, stderr);
#endif
}
/**
@ -402,6 +471,10 @@ static VALUE producer_push_single(VALUE self, VALUE message) {
HermannInstanceConfig* producerConfig;
char buf[2048];
#ifdef TRACE
fprintf(stderr, "producer_push_single\n");
#endif
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if(producerConfig->topic==NULL) {
@ -420,6 +493,13 @@ static VALUE producer_push_single(VALUE self, VALUE message) {
if (buf[len-1] == '\n')
buf[--len] = '\0';
#ifdef TRACE
fprintf(stderr, "producer_push_single::before_produce message1\n");
fprintf_hermann_instance_config(producerConfig, stderr);
fprintf(stderr, "producer_push_single::before_produce_message2\n");
fflush(stderr);
#endif
/* Send/Produce message. */
if (rd_kafka_produce(producerConfig->rkt, producerConfig->partition, RD_KAFKA_MSG_F_COPY,
/* Payload and length */
@ -442,6 +522,10 @@ static VALUE producer_push_single(VALUE self, VALUE message) {
/* Must poll to handle delivery reports */
rd_kafka_poll(producerConfig->rk, 0);
#ifdef TRACE
fprintf(stderr, "producer_push_single::prior return\n");
#endif
return self;
}
@ -459,6 +543,10 @@ static VALUE producer_push_array(VALUE self, int length, VALUE array) {
int i;
VALUE message;
#ifdef TRACE
fprintf(stderr, "producer_push_array\n");
#endif
for(i=0;i<length;i++) {
message = RARRAY_PTR(array)[i];
producer_push_single(self, message);
@ -479,6 +567,10 @@ static VALUE producer_push(VALUE self, VALUE message) {
VALUE arrayP = rb_check_array_type(message);
#ifdef TRACE
fprintf(stderr, "producer_push\n");
#endif
if(!NIL_P(arrayP)) {
return producer_push_array(self, RARRAY_LEN(arrayP), message);
} else {
@ -497,10 +589,18 @@ static void consumer_free(void * p) {
HermannInstanceConfig* config = (HermannInstanceConfig *)p;
// the p *should* contain a pointer to the consumerConfig which also must be freed
rd_kafka_topic_destroy(config->rkt);
#ifdef TRACE
fprintf(stderr, "consumer_free\n");
#endif
rd_kafka_destroy(config->rk);
// the p *should* contain a pointer to the consumerConfig which also must be freed
if(config->rkt != NULL) {
rd_kafka_topic_destroy(config->rkt);
}
if(config->rk != NULL) {
rd_kafka_destroy(config->rk);
}
// clean up the struct
free(config);
@ -516,8 +616,31 @@ static void consumer_free(void * p) {
static VALUE consumer_allocate(VALUE klass) {
VALUE obj;
HermannInstanceConfig* consumerConfig;
#ifdef TRACE
fprintf(stderr, "consumer_free\n");
#endif
consumerConfig = ALLOC(HermannInstanceConfig);
// Make sure it's initialized
consumerConfig->topic = NULL;
consumerConfig->rk = NULL;
consumerConfig->rkt = NULL;
consumerConfig->brokers = NULL;
consumerConfig->partition = -1;
consumerConfig->topic_conf = NULL;
consumerConfig->errstr[0] = 0;
consumerConfig->conf = NULL;
consumerConfig->debug = NULL;
consumerConfig->start_offset = -1;
consumerConfig->do_conf_dump = -1;
consumerConfig->run = 0;
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
consumerConfig->isInitialized = 0;
HermannInstanceConfig* consumerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, consumer_free, consumerConfig);
return obj;
@ -542,6 +665,10 @@ static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, VALUE p
char* brokersPtr;
int partitionNo;
#ifdef TRACE
fprintf(stderr, "consumer_initialize\n");
#endif
topicPtr = StringValuePtr(topic);
brokersPtr = StringValuePtr(brokers);
partitionNo = FIX2INT(partition);
@ -570,6 +697,10 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
#ifdef TRACE
fprintf(stderr, "consumer_init_copy\n");
#endif
if(copy == orig) {
return copy;
}
@ -596,13 +727,27 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
*/
static void producer_free(void * p) {
HermannInstanceConfig* config = (HermannInstanceConfig *)p;
HermannInstanceConfig* config;
#ifdef TRACE
fprintf(stderr, "producer_free\n");
#endif
config = (HermannInstanceConfig *)p;
if(NULL==p) {
return;
}
// Clean up the topic
rd_kafka_topic_destroy(config->rkt);
if(config->rkt != NULL) {
rd_kafka_topic_destroy(config->rkt);
}
// Take care of the producer instance
rd_kafka_destroy(config->rk);
if(config->rk != NULL) {
rd_kafka_destroy(config->rk);
}
// Free the struct
free(config);
@ -618,8 +763,30 @@ static void producer_free(void * p) {
static VALUE producer_allocate(VALUE klass) {
VALUE obj;
HermannInstanceConfig* producerConfig;
#ifdef TRACE
fprintf(stderr, "producer_allocate\n");
#endif
producerConfig = ALLOC(HermannInstanceConfig);
producerConfig->topic = NULL;
producerConfig->rk = NULL;
producerConfig->rkt = NULL;
producerConfig->brokers = NULL;
producerConfig->partition = -1;
producerConfig->topic_conf = NULL;
producerConfig->errstr[0] = 0;
producerConfig->conf = NULL;
producerConfig->debug = NULL;
producerConfig->start_offset = -1;
producerConfig->do_conf_dump = -1;
producerConfig->run = 0;
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
producerConfig->isInitialized = 0;
HermannInstanceConfig* producerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig);
return obj;
@ -640,6 +807,10 @@ static VALUE producer_initialize(VALUE self, VALUE topic, VALUE brokers) {
char* topicPtr;
char* brokersPtr;
#ifdef TRACE
fprintf(stderr, "producer_initialize\n");
#endif
topicPtr = StringValuePtr(topic);
brokersPtr = StringValuePtr(brokers);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
@ -669,6 +840,10 @@ static VALUE producer_init_copy(VALUE copy, VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
#ifdef TRACE
fprintf(stderr, "producer_init_copy\n");
#endif
if(copy == orig) {
return copy;
}
@ -695,6 +870,10 @@ static VALUE producer_init_copy(VALUE copy, VALUE orig) {
*/
void Init_hermann_lib() {
#ifdef TRACE
fprintf(stderr, "init_hermann_lib\n");
#endif
/* Define the module */
m_hermann = rb_define_module("Hermann");

View File

@ -42,6 +42,8 @@
#include <librdkafka/rdkafka.h>
#undef TRACE
// Holds the defined Ruby module for Hermann
static VALUE m_hermann;

View File

@ -1,34 +0,0 @@
find_header: checking for librdkafka/rdkafka.h... -------------------- yes
"/usr/local/bin/gcc-4.2 -E -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/lib/ruby/1.8/i686-darwin13.2.0 -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -O3 -Wno-error=shorten-64-to-32 -pipe -fno-common conftest.c -o conftest.i"
checked program was:
/* begin */
1: #include <librdkafka/rdkafka.h>
/* end */
--------------------
find_library: checking for rd_kafka_conf_new() in -lrdkafka... -------------------- yes
"/usr/local/bin/gcc-4.2 -o conftest -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/lib/ruby/1.8/i686-darwin13.2.0 -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -O3 -Wno-error=shorten-64-to-32 -pipe -fno-common conftest.c -L. -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -L. -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -lruby-static -lrdkafka -ldl -lobjc "
conftest.c: In function t:
conftest.c:3: error: rd_kafka_conf_new undeclared (first use in this function)
conftest.c:3: error: (Each undeclared identifier is reported only once
conftest.c:3: error: for each function it appears in.)
checked program was:
/* begin */
1: /*top*/
2: int main() { return 0; }
3: int t() { void ((*volatile p)()); p = (void ((*)()))rd_kafka_conf_new; return 0; }
/* end */
"/usr/local/bin/gcc-4.2 -o conftest -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/lib/ruby/1.8/i686-darwin13.2.0 -I. -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -I/Users/scampbell/.rbenv/versions/1.8.7-p375/include -O3 -Wno-error=shorten-64-to-32 -pipe -fno-common conftest.c -L. -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -L. -L/Users/scampbell/.rbenv/versions/1.8.7-p375/lib -lruby-static -lrdkafka -ldl -lobjc "
checked program was:
/* begin */
1: /*top*/
2: int main() { return 0; }
3: int t() { rd_kafka_conf_new(); return 0; }
/* end */
--------------------

View File

@ -1,6 +1,6 @@
SPEC = Gem::Specification.new do |s|
s.name = "hermann"
s.version = "0.11"
s.version = "0.13"
s.default_executable = "hermann"
s.authors = ["Stan Campbell"]
@ -15,6 +15,7 @@ SPEC = Gem::Specification.new do |s|
s.require_paths = ["lib", "ext"]
s.rubygems_version = %q{2.2.2}
s.summary = %q{The Kafka consumer is based on the librdkafka C library.}
s.licenses = ['MIT']
s.platform = Gem::Platform::CURRENT