mirror of https://github.com/reiseburo/hermann
Initial files commit. Consumer working from Hermann::Consumer.
This commit is contained in:
parent
53d950ea0e
commit
937eaea8fc
|
@ -0,0 +1,4 @@
|
|||
# A sample Gemfile
|
||||
source "https://rubygems.org"
|
||||
|
||||
gem 'rake'
|
|
@ -0,0 +1,31 @@
|
|||
|
||||
require 'rake/clean'
|
||||
|
||||
EXT_CONF = "ext/extconf.rb"
|
||||
MAKEFILE = 'ext/Makefile'
|
||||
MODULE = 'ext/hermann_lib.so'
|
||||
SRC = Dir.glob('ext/*.c')
|
||||
SRC << MAKEFILE
|
||||
|
||||
CLEAN.include [ 'ext/*.o', 'ext/depend', 'ext/hermann_lib.bundle', MODULE ]
|
||||
CLOBBER.include [ 'config.save', 'ext/mkmf.log', 'ext/hermann_lib.bundle', MAKEFILE ]
|
||||
|
||||
file MAKEFILE => EXT_CONF do |t|
|
||||
Dir::chdir(File::dirname(EXT_CONF)) do
|
||||
unless sh "ruby #{File::basename(EXT_CONF)}"
|
||||
$stderr.puts "Failed to run extconf"
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
file MODULE => SRC do |t|
|
||||
Dir::chdir(File::dirname(EXT_CONF)) do
|
||||
unless sh "make"
|
||||
$stderr.puts "make failed"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
desc "Build the native library"
|
||||
task :build => MODULE
|
|
@ -0,0 +1,4 @@
|
|||
#!ruby
|
||||
|
||||
require 'hermann'
|
||||
puts "Placeholder..."
|
|
@ -0,0 +1 @@
|
|||
system
|
|
@ -0,0 +1,239 @@
|
|||
|
||||
SHELL = /bin/sh
|
||||
|
||||
# V=0 quiet, V=1 verbose. other values don't work.
|
||||
V = 0
|
||||
Q1 = $(V:1=)
|
||||
Q = $(Q1:0=@)
|
||||
ECHO1 = $(V:1=@:)
|
||||
ECHO = $(ECHO1:0=@echo)
|
||||
|
||||
#### Start of system configuration section. ####
|
||||
|
||||
srcdir = .
|
||||
topdir = /Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0
|
||||
hdrdir = $(topdir)
|
||||
arch_hdrdir = /Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/x86_64-darwin13.0
|
||||
PATH_SEPARATOR = :
|
||||
VPATH = $(srcdir):$(arch_hdrdir)/ruby:$(hdrdir)/ruby
|
||||
prefix = $(DESTDIR)/Users/scampbell/.rbenv/versions/2.1.1
|
||||
rubysitearchprefix = $(rubylibprefix)/$(sitearch)
|
||||
rubyarchprefix = $(rubylibprefix)/$(arch)
|
||||
rubylibprefix = $(libdir)/$(RUBY_BASE_NAME)
|
||||
exec_prefix = $(prefix)
|
||||
vendorarchhdrdir = $(vendorhdrdir)/$(sitearch)
|
||||
sitearchhdrdir = $(sitehdrdir)/$(sitearch)
|
||||
rubyarchhdrdir = $(rubyhdrdir)/$(arch)
|
||||
vendorhdrdir = $(rubyhdrdir)/vendor_ruby
|
||||
sitehdrdir = $(rubyhdrdir)/site_ruby
|
||||
rubyhdrdir = $(includedir)/$(RUBY_VERSION_NAME)
|
||||
vendorarchdir = $(vendorlibdir)/$(sitearch)
|
||||
vendorlibdir = $(vendordir)/$(ruby_version)
|
||||
vendordir = $(rubylibprefix)/vendor_ruby
|
||||
sitearchdir = $(sitelibdir)/$(sitearch)
|
||||
sitelibdir = $(sitedir)/$(ruby_version)
|
||||
sitedir = $(rubylibprefix)/site_ruby
|
||||
rubyarchdir = $(rubylibdir)/$(arch)
|
||||
rubylibdir = $(rubylibprefix)/$(ruby_version)
|
||||
sitearchincludedir = $(includedir)/$(sitearch)
|
||||
archincludedir = $(includedir)/$(arch)
|
||||
sitearchlibdir = $(libdir)/$(sitearch)
|
||||
archlibdir = $(libdir)/$(arch)
|
||||
ridir = $(datarootdir)/$(RI_BASE_NAME)
|
||||
mandir = $(datarootdir)/man
|
||||
localedir = $(datarootdir)/locale
|
||||
libdir = $(exec_prefix)/lib
|
||||
psdir = $(docdir)
|
||||
pdfdir = $(docdir)
|
||||
dvidir = $(docdir)
|
||||
htmldir = $(docdir)
|
||||
infodir = $(datarootdir)/info
|
||||
docdir = $(datarootdir)/doc/$(PACKAGE)
|
||||
oldincludedir = $(DESTDIR)/usr/include
|
||||
includedir = $(prefix)/include
|
||||
localstatedir = $(prefix)/var
|
||||
sharedstatedir = $(prefix)/com
|
||||
sysconfdir = $(prefix)/etc
|
||||
datadir = $(datarootdir)
|
||||
datarootdir = $(prefix)/share
|
||||
libexecdir = $(exec_prefix)/libexec
|
||||
sbindir = $(exec_prefix)/sbin
|
||||
bindir = $(exec_prefix)/bin
|
||||
archdir = $(rubyarchdir)
|
||||
|
||||
|
||||
CC = /usr/local/bin/gcc-4.2
|
||||
CXX = /usr/local/bin/g++-4.2
|
||||
LIBRUBY = $(LIBRUBY_A)
|
||||
LIBRUBY_A = lib$(RUBY_SO_NAME)-static.a
|
||||
LIBRUBYARG_SHARED =
|
||||
LIBRUBYARG_STATIC = -l$(RUBY_SO_NAME)-static -framework CoreFoundation
|
||||
empty =
|
||||
OUTFLAG = -o $(empty)
|
||||
COUTFLAG = -o $(empty)
|
||||
|
||||
RUBY_EXTCONF_H =
|
||||
cflags = $(optflags) $(debugflags) $(warnflags)
|
||||
optflags = -O3 -fno-fast-math
|
||||
debugflags = -ggdb3
|
||||
warnflags = -Wall -Wextra -Wno-unused-parameter -Wno-parentheses -Wno-long-long -Wno-missing-field-initializers -Wunused-variable -Wpointer-arith -Wwrite-strings -Wdeclaration-after-statement -Wshorten-64-to-32 -Wimplicit-function-declaration -Wextra-tokens
|
||||
CCDLFLAGS = -fno-common
|
||||
CFLAGS = $(CCDLFLAGS) -O3 -Wno-error=shorten-64-to-32 -pipe $(ARCH_FLAG)
|
||||
INCFLAGS = -I. -I$(arch_hdrdir) -I$(hdrdir)/ruby/backward -I$(hdrdir) -I$(srcdir)
|
||||
DEFS =
|
||||
CPPFLAGS = -I/Users/scampbell/.rbenv/versions/2.1.1/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -D_DARWIN_UNLIMITED_SELECT -D_REENTRANT $(DEFS) $(cppflags)
|
||||
CXXFLAGS = $(CCDLFLAGS) $(cxxflags) $(ARCH_FLAG)
|
||||
ldflags = -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -fstack-protector
|
||||
dldflags = -Wl,-undefined,dynamic_lookup -Wl,-multiply_defined,suppress
|
||||
ARCH_FLAG =
|
||||
DLDFLAGS = $(ldflags) $(dldflags) $(ARCH_FLAG)
|
||||
LDSHARED = $(CC) -dynamic -bundle
|
||||
LDSHAREDXX = $(CXX) -dynamic -bundle
|
||||
AR = ar
|
||||
EXEEXT =
|
||||
|
||||
RUBY_INSTALL_NAME = ruby
|
||||
RUBY_SO_NAME = ruby
|
||||
RUBYW_INSTALL_NAME =
|
||||
RUBY_VERSION_NAME = $(RUBY_BASE_NAME)-$(ruby_version)
|
||||
RUBYW_BASE_NAME = rubyw
|
||||
RUBY_BASE_NAME = ruby
|
||||
|
||||
arch = x86_64-darwin13.0
|
||||
sitearch = $(arch)
|
||||
ruby_version = 2.1.0
|
||||
ruby = $(bindir)/ruby
|
||||
RUBY = $(ruby)
|
||||
ruby_headers = $(hdrdir)/ruby.h $(hdrdir)/ruby/ruby.h $(hdrdir)/ruby/defines.h $(hdrdir)/ruby/missing.h $(hdrdir)/ruby/intern.h $(hdrdir)/ruby/st.h $(hdrdir)/ruby/subst.h $(arch_hdrdir)/ruby/config.h
|
||||
|
||||
RM = rm -f
|
||||
RM_RF = $(RUBY) -run -e rm -- -rf
|
||||
RMDIRS = rmdir -p
|
||||
MAKEDIRS = mkdir -p
|
||||
INSTALL = /usr/bin/install -c
|
||||
INSTALL_PROG = $(INSTALL) -m 0755
|
||||
INSTALL_DATA = $(INSTALL) -m 644
|
||||
COPY = cp
|
||||
TOUCH = exit >
|
||||
|
||||
#### End of system configuration section. ####
|
||||
|
||||
preload =
|
||||
|
||||
libpath = . $(libdir) /Users/scampbell/.rbenv/versions/2.1.1/lib
|
||||
LIBPATH = -L. -L$(libdir) -L/Users/scampbell/.rbenv/versions/2.1.1/lib
|
||||
DEFFILE =
|
||||
|
||||
CLEANFILES = mkmf.log
|
||||
DISTCLEANFILES =
|
||||
DISTCLEANDIRS =
|
||||
|
||||
extout =
|
||||
extout_prefix =
|
||||
target_prefix = /hermann
|
||||
LOCAL_LIBS =
|
||||
LIBS = -lrdkafka -lpthread -lgmp -ldl -lobjc
|
||||
ORIG_SRCS = hermann_lib.c
|
||||
SRCS = $(ORIG_SRCS)
|
||||
OBJS = hermann_lib.o
|
||||
HDRS = $(srcdir)/hermann_lib.h
|
||||
TARGET = hermann_lib
|
||||
TARGET_NAME = hermann_lib
|
||||
TARGET_ENTRY = Init_$(TARGET_NAME)
|
||||
DLLIB = $(TARGET).bundle
|
||||
EXTSTATIC =
|
||||
STATIC_LIB =
|
||||
|
||||
TIMESTAMP_DIR = .
|
||||
BINDIR = $(bindir)
|
||||
RUBYCOMMONDIR = $(sitedir)$(target_prefix)
|
||||
RUBYLIBDIR = $(sitelibdir)$(target_prefix)
|
||||
RUBYARCHDIR = $(sitearchdir)$(target_prefix)
|
||||
HDRDIR = $(rubyhdrdir)/ruby$(target_prefix)
|
||||
ARCHHDRDIR = $(rubyhdrdir)/$(arch)/ruby$(target_prefix)
|
||||
|
||||
TARGET_SO = $(DLLIB)
|
||||
CLEANLIBS = $(TARGET).bundle
|
||||
CLEANOBJS = *.o *.bak
|
||||
|
||||
all: $(DLLIB)
|
||||
static: $(STATIC_LIB)
|
||||
.PHONY: all install static install-so install-rb
|
||||
.PHONY: clean clean-so clean-static clean-rb
|
||||
|
||||
clean-static::
|
||||
clean-rb-default::
|
||||
clean-rb::
|
||||
clean-so::
|
||||
clean: clean-so clean-static clean-rb-default clean-rb
|
||||
-$(Q)$(RM) $(CLEANLIBS) $(CLEANOBJS) $(CLEANFILES) .*.time
|
||||
|
||||
distclean-rb-default::
|
||||
distclean-rb::
|
||||
distclean-so::
|
||||
distclean-static::
|
||||
distclean: clean distclean-so distclean-static distclean-rb-default distclean-rb
|
||||
-$(Q)$(RM) Makefile $(RUBY_EXTCONF_H) conftest.* mkmf.log
|
||||
-$(Q)$(RM) core ruby$(EXEEXT) *~ $(DISTCLEANFILES)
|
||||
-$(Q)$(RMDIRS) $(DISTCLEANDIRS) 2> /dev/null || true
|
||||
|
||||
realclean: distclean
|
||||
install: install-so install-rb
|
||||
|
||||
install-so: $(DLLIB) $(TIMESTAMP_DIR)/.RUBYARCHDIR.-.hermann.time
|
||||
$(INSTALL_PROG) $(DLLIB) $(RUBYARCHDIR)
|
||||
clean-static::
|
||||
-$(Q)$(RM) $(STATIC_LIB)
|
||||
install-rb: pre-install-rb install-rb-default
|
||||
install-rb-default: pre-install-rb-default
|
||||
pre-install-rb: Makefile
|
||||
pre-install-rb-default: Makefile
|
||||
pre-install-rb-default:
|
||||
$(ECHO) installing default hermann_lib libraries
|
||||
$(TIMESTAMP_DIR)/.RUBYARCHDIR.-.hermann.time:
|
||||
$(Q) $(MAKEDIRS) $(@D) $(RUBYARCHDIR)
|
||||
$(Q) $(TOUCH) $@
|
||||
|
||||
site-install: site-install-so site-install-rb
|
||||
site-install-so: install-so
|
||||
site-install-rb: install-rb
|
||||
|
||||
.SUFFIXES: .c .m .cc .mm .cxx .cpp .C .o
|
||||
|
||||
.cc.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.mm.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.cxx.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.cpp.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.C.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CXX) $(INCFLAGS) $(CPPFLAGS) $(CXXFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.c.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CC) $(INCFLAGS) $(CPPFLAGS) $(CFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
.m.o:
|
||||
$(ECHO) compiling $(<)
|
||||
$(Q) $(CC) $(INCFLAGS) $(CPPFLAGS) $(CFLAGS) $(COUTFLAG)$@ -c $<
|
||||
|
||||
$(DLLIB): $(OBJS) Makefile
|
||||
$(ECHO) linking shared-object hermann/$(DLLIB)
|
||||
-$(Q)$(RM) $(@)
|
||||
$(Q) $(LDSHARED) -o $@ $(OBJS) $(LIBPATH) $(DLDFLAGS) $(LOCAL_LIBS) $(LIBS)
|
||||
$(Q) $(POSTLINK)
|
||||
|
||||
|
||||
|
||||
$(OBJS): $(HDRS) $(ruby_headers)
|
|
@ -0,0 +1,25 @@
|
|||
# External configuration for Hermann Gem
|
||||
|
||||
require 'mkmf'
|
||||
|
||||
RbConfig::MAKEFILE_CONFIG['CC'] = ENV['CC'] if ENV['CC']
|
||||
|
||||
LIBDIR = RbConfig::CONFIG['libdir']
|
||||
INCLUDEDIR = RbConfig::CONFIG['includedir']
|
||||
|
||||
HEADER_DIRS = [INCLUDEDIR]
|
||||
|
||||
LIB_DIRS = [LIBDIR]
|
||||
|
||||
dir_config('rdkafka', HEADER_DIRS, LIB_DIRS)
|
||||
|
||||
unless find_header('librdkafka/rdkafka.h')
|
||||
abort "librdkafka not installed"
|
||||
end
|
||||
|
||||
unless find_library('rdkafka', 'rd_kafka_conf_new')
|
||||
abort "librdkafka not installed"
|
||||
end
|
||||
|
||||
# create_header('hermann_lib.h')
|
||||
create_makefile('hermann/hermann_lib')
|
Binary file not shown.
|
@ -0,0 +1,236 @@
|
|||
// Hermann.c
|
||||
|
||||
#include "hermann_lib.h"
|
||||
|
||||
/**
|
||||
* Message delivery report callback.
|
||||
* Called once for each message.
|
||||
* See rdkafka.h for more information.
|
||||
*/
|
||||
static void msg_delivered (rd_kafka_t *rk,
|
||||
void *payload, size_t len,
|
||||
int error_code,
|
||||
void *opaque, void *msg_opaque) {
|
||||
|
||||
if (error_code)
|
||||
fprintf(stderr, "%% Message delivery failed: %s\n",
|
||||
rd_kafka_err2str(error_code));
|
||||
else if (!quiet)
|
||||
fprintf(stderr, "%% Message delivered (%zd bytes)\n", len);
|
||||
}
|
||||
|
||||
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
|
||||
const char *p = (const char *)ptr;
|
||||
int of = 0;
|
||||
|
||||
|
||||
if (name)
|
||||
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
|
||||
|
||||
for (of = 0 ; of < len ; of += 16) {
|
||||
char hexen[16*3+1];
|
||||
char charen[16+1];
|
||||
int hof = 0;
|
||||
|
||||
int cof = 0;
|
||||
int i;
|
||||
|
||||
for (i = of ; i < of + 16 && i < len ; i++) {
|
||||
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
|
||||
cof += sprintf(charen+cof, "%c",
|
||||
isprint((int)p[i]) ? p[i] : '.');
|
||||
}
|
||||
fprintf(fp, "%08x: %-48s %-16s\n",
|
||||
of, hexen, charen);
|
||||
}
|
||||
}
|
||||
|
||||
static void msg_consume (rd_kafka_message_t *rkmessage,
|
||||
void *opaque) {
|
||||
if (rkmessage->err) {
|
||||
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
||||
fprintf(stderr,
|
||||
"%% Consumer reached end of %s [%"PRId32"] "
|
||||
"message queue at offset %"PRId64"\n",
|
||||
rd_kafka_topic_name(rkmessage->rkt),
|
||||
rkmessage->partition, rkmessage->offset);
|
||||
|
||||
if (exit_eof)
|
||||
run = 0;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
|
||||
"offset %"PRId64": %s\n",
|
||||
rd_kafka_topic_name(rkmessage->rkt),
|
||||
rkmessage->partition,
|
||||
rkmessage->offset,
|
||||
rd_kafka_message_errstr(rkmessage));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!quiet)
|
||||
fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
|
||||
rkmessage->offset, rkmessage->len);
|
||||
|
||||
if (rkmessage->key_len) {
|
||||
if (output == OUTPUT_HEXDUMP)
|
||||
hexdump(stdout, "Message Key",
|
||||
rkmessage->key, rkmessage->key_len);
|
||||
else
|
||||
printf("Key: %.*s\n",
|
||||
(int)rkmessage->key_len, (char *)rkmessage->key);
|
||||
}
|
||||
|
||||
if (output == OUTPUT_HEXDUMP)
|
||||
hexdump(stdout, "Message Payload",
|
||||
rkmessage->payload, rkmessage->len);
|
||||
else
|
||||
printf("%.*s\n",
|
||||
(int)rkmessage->len, (char *)rkmessage->payload);
|
||||
}
|
||||
|
||||
static void sig_usr1 (int sig) {
|
||||
rd_kafka_dump(stdout, rk);
|
||||
}
|
||||
|
||||
static void stop (int sig) {
|
||||
run = 0;
|
||||
fclose(stdin); /* abort fgets() */
|
||||
}
|
||||
|
||||
/**
|
||||
* Kafka logger callback (optional)
|
||||
*/
|
||||
static void logger (const rd_kafka_t *rk, int level,
|
||||
const char *fac, const char *buf) {
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
|
||||
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
|
||||
level, fac, rd_kafka_name(rk), buf);
|
||||
}
|
||||
|
||||
// Main entry point for Consumer behavior
|
||||
void actAsConsumer(char* topic) {
|
||||
|
||||
fprintf(stderr, "actAsConsumer for topic %s\n", topic);
|
||||
|
||||
/* Kafka configuration */
|
||||
rd_kafka_topic_t *rkt;
|
||||
char *brokers = "localhost:9092";
|
||||
char mode = 'C';
|
||||
int partition = 0; // todo: handle proper partitioning
|
||||
int opt;
|
||||
rd_kafka_conf_t *conf;
|
||||
rd_kafka_topic_conf_t *topic_conf;
|
||||
char errstr[512];
|
||||
const char *debug = NULL;
|
||||
int64_t start_offset = 0;
|
||||
int do_conf_dump = 0;
|
||||
|
||||
quiet = !isatty(STDIN_FILENO);
|
||||
|
||||
/* Kafka configuration */
|
||||
conf = rd_kafka_conf_new();
|
||||
fprintf(stderr, "Kafka configuration created\n");
|
||||
|
||||
/* Topic configuration */
|
||||
topic_conf = rd_kafka_topic_conf_new();
|
||||
fprintf(stderr, "Topic configuration created\n");
|
||||
|
||||
/* TODO: offset calculation */
|
||||
start_offset = RD_KAFKA_OFFSET_END;
|
||||
|
||||
signal(SIGINT, stop);
|
||||
signal(SIGUSR1, sig_usr1);
|
||||
fprintf(stderr, "Signals sent\n");
|
||||
|
||||
/* Consumer specific code */
|
||||
|
||||
fprintf(stderr, "Create a Kafka handle\n");
|
||||
/* Create Kafka handle */
|
||||
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
|
||||
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Set logger */
|
||||
/*rd_kafka_set_logger(rk, logger);
|
||||
fprintf(stderr, "Logger set\n");
|
||||
rd_kafka_set_log_level(rk, LOG_DEBUG);
|
||||
fprintf(stderr, "Loglevel configured\n");*/
|
||||
|
||||
/* Add brokers */
|
||||
fprintf(stderr, "About to add brokers..");
|
||||
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
||||
fprintf(stderr, "%% No valid brokers specified\n");
|
||||
exit(1);
|
||||
}
|
||||
fprintf(stderr, "Brokers added\n");
|
||||
|
||||
/* Create topic */
|
||||
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
||||
fprintf(stderr, "Topic created\n");
|
||||
|
||||
/* Start consuming */
|
||||
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
|
||||
fprintf(stderr, "%% Failed to start consuming: %s\n",
|
||||
rd_kafka_err2str(rd_kafka_errno2err(errno)));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
fprintf(stderr, "Consume started\n");
|
||||
|
||||
/* Run loop */
|
||||
while (run) {
|
||||
rd_kafka_message_t *rkmessage;
|
||||
|
||||
/* Consume single message.
|
||||
* See rdkafka_performance.c for high speed
|
||||
* consuming of messages. */
|
||||
rkmessage = rd_kafka_consume(rkt, partition, 1000);
|
||||
if (!rkmessage) /* timeout */
|
||||
continue;
|
||||
|
||||
msg_consume(rkmessage, NULL);
|
||||
|
||||
/* Return message to rdkafka */
|
||||
rd_kafka_message_destroy(rkmessage);
|
||||
}
|
||||
|
||||
fprintf(stderr, "Run loop exited\n");
|
||||
|
||||
/* Stop consuming */
|
||||
rd_kafka_consume_stop(rkt, partition);
|
||||
|
||||
rd_kafka_topic_destroy(rkt);
|
||||
|
||||
rd_kafka_destroy(rk);
|
||||
|
||||
/* Let background threads clean up and terminate cleanly. */
|
||||
rd_kafka_wait_destroyed(2000);
|
||||
}
|
||||
|
||||
// Ruby gem extensions
|
||||
static VALUE consume(VALUE c, VALUE topicValue) {
|
||||
fprintf(stderr, "Called consume with one argument\n");
|
||||
|
||||
char* topic = StringValueCStr(topicValue);
|
||||
|
||||
fprintf(stderr, "Topic is: %s\n", topic);
|
||||
|
||||
actAsConsumer(topic);
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
void Init_hermann_lib() {
|
||||
|
||||
m_hermann = rb_define_module("Hermann");
|
||||
|
||||
VALUE c_consumer = rb_define_class_under(m_hermann, "Consumer", rb_cObject);
|
||||
|
||||
rb_define_method( c_consumer, "consume", consume, 1 );
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
#ifndef HERMANN_H
|
||||
#define HERMANN_H
|
||||
|
||||
#include <ruby.h>
|
||||
|
||||
#include <ctype.h>
|
||||
#include <signal.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <syslog.h>
|
||||
#include <sys/time.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
// Holds the defined Ruby module for Hermann
|
||||
static VALUE m_hermann;
|
||||
|
||||
// From rdkafka_example.c
|
||||
static int run = 1;
|
||||
static rd_kafka_t *rk;
|
||||
static int exit_eof = 0;
|
||||
static int quiet = 0;
|
||||
static enum {
|
||||
OUTPUT_HEXDUMP,
|
||||
OUTPUT_RAW,
|
||||
} output = OUTPUT_HEXDUMP;
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
Binary file not shown.
|
@ -0,0 +1,67 @@
|
|||
find_header: checking for librdkafka/rdkafka.h... -------------------- yes
|
||||
|
||||
"/usr/local/bin/gcc-4.2 -o conftest -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/x86_64-darwin13.0 -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/ruby/backward -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0 -I. -I/Users/scampbell/.rbenv/versions/2.1.1/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -D_DARWIN_UNLIMITED_SELECT -D_REENTRANT -O3 -Wno-error=shorten-64-to-32 -pipe conftest.c -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -fstack-protector -lruby-static -framework CoreFoundation -lpthread -lgmp -ldl -lobjc "
|
||||
checked program was:
|
||||
/* begin */
|
||||
1: #include "ruby.h"
|
||||
2:
|
||||
3: int main(int argc, char **argv)
|
||||
4: {
|
||||
5: return 0;
|
||||
6: }
|
||||
/* end */
|
||||
|
||||
"/usr/local/bin/cpp-4.2 -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/x86_64-darwin13.0 -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/ruby/backward -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0 -I. -I/Users/scampbell/.rbenv/versions/2.1.1/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -D_DARWIN_UNLIMITED_SELECT -D_REENTRANT -O3 -Wno-error=shorten-64-to-32 -pipe conftest.c -o conftest.i"
|
||||
checked program was:
|
||||
/* begin */
|
||||
1: #include "ruby.h"
|
||||
2:
|
||||
3: #include <librdkafka/rdkafka.h>
|
||||
/* end */
|
||||
|
||||
--------------------
|
||||
|
||||
find_library: checking for rd_kafka_conf_new() in -lrdkafka... -------------------- yes
|
||||
|
||||
"/usr/local/bin/gcc-4.2 -o conftest -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/x86_64-darwin13.0 -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/ruby/backward -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0 -I. -I/Users/scampbell/.rbenv/versions/2.1.1/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -D_DARWIN_UNLIMITED_SELECT -D_REENTRANT -O3 -Wno-error=shorten-64-to-32 -pipe conftest.c -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -fstack-protector -lruby-static -framework CoreFoundation -lrdkafka -lpthread -lgmp -ldl -lobjc "
|
||||
conftest.c: In function ‘t’:
|
||||
conftest.c:13: error: ‘rd_kafka_conf_new’ undeclared (first use in this function)
|
||||
conftest.c:13: error: (Each undeclared identifier is reported only once
|
||||
conftest.c:13: error: for each function it appears in.)
|
||||
checked program was:
|
||||
/* begin */
|
||||
1: #include "ruby.h"
|
||||
2:
|
||||
3: /*top*/
|
||||
4: extern int t(void);
|
||||
5: int main(int argc, char **argv)
|
||||
6: {
|
||||
7: if (argc > 1000000) {
|
||||
8: printf("%p", &t);
|
||||
9: }
|
||||
10:
|
||||
11: return 0;
|
||||
12: }
|
||||
13: int t(void) { void ((*volatile p)()); p = (void ((*)()))rd_kafka_conf_new; return 0; }
|
||||
/* end */
|
||||
|
||||
"/usr/local/bin/gcc-4.2 -o conftest -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/x86_64-darwin13.0 -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0/ruby/backward -I/Users/scampbell/.rbenv/versions/2.1.1/include/ruby-2.1.0 -I. -I/Users/scampbell/.rbenv/versions/2.1.1/include -D_XOPEN_SOURCE -D_DARWIN_C_SOURCE -D_DARWIN_UNLIMITED_SELECT -D_REENTRANT -O3 -Wno-error=shorten-64-to-32 -pipe conftest.c -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L/Users/scampbell/.rbenv/versions/2.1.1/lib -L. -L/Users/scampbell/.rbenv/versions/2.1.1/lib -fstack-protector -lruby-static -framework CoreFoundation -lrdkafka -lpthread -lgmp -ldl -lobjc "
|
||||
checked program was:
|
||||
/* begin */
|
||||
1: #include "ruby.h"
|
||||
2:
|
||||
3: /*top*/
|
||||
4: extern int t(void);
|
||||
5: int main(int argc, char **argv)
|
||||
6: {
|
||||
7: if (argc > 1000000) {
|
||||
8: printf("%p", &t);
|
||||
9: }
|
||||
10:
|
||||
11: return 0;
|
||||
12: }
|
||||
13: int t(void) { rd_kafka_conf_new(); return 0; }
|
||||
/* end */
|
||||
|
||||
--------------------
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
|
||||
SPEC = Gem::Specification.new do |s|
|
||||
s.name = "hermann"
|
||||
s.version = "0.0.19"
|
||||
s.default_executable = "hermann"
|
||||
|
||||
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
|
||||
s.authors = ["Stan Campbell"]
|
||||
s.date = %q{2014-05-29}
|
||||
s.description = %q{Ruby gem wrapper for a C based Kafka Consumer}
|
||||
s.email = %q{stan.campbell3@gmail.com}
|
||||
s.files = [ "Rakefile", "ext/hermann_lib.h", "ext/hermann_lib.c", "ext/extconf.rb", "bin/hermann",
|
||||
"lib/hermann.rb", "ext/hermann_lib.o"]
|
||||
s.extensions = [ "ext/extconf.rb"]
|
||||
s.test_files = ["test/test_hermann.rb"]
|
||||
s.homepage = %q{http://rubygems.org/gems/hermann}
|
||||
s.require_paths = ["lib", "ext"]
|
||||
s.rubygems_version = %q{2.2.2}
|
||||
s.summary = %q{The Kafka consumer is based on the librdkafka C library.}
|
||||
|
||||
s.platform = Gem::Platform::CURRENT
|
||||
|
||||
if s.respond_to? :specification_version then
|
||||
s.specification_version = 3
|
||||
|
||||
if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
|
||||
else
|
||||
end
|
||||
else
|
||||
end
|
||||
end
|
|
@ -0,0 +1,14 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="RUBY_MODULE" version="4">
|
||||
<component name="CompassSettings">
|
||||
<option name="compassSupportEnabled" value="true" />
|
||||
</component>
|
||||
<component name="NewModuleRootManager" inherit-compiler-output="true">
|
||||
<exclude-output />
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="jdk" jdkName="rbenv: 2.1.1" jdkType="RUBY_SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
<orderEntry type="library" scope="PROVIDED" name="bundler (v1.6.2, rbenv: 2.1.1) [gem]" level="application" />
|
||||
</component>
|
||||
</module>
|
||||
|
|
@ -0,0 +1 @@
|
|||
require 'hermann_lib'
|
|
@ -0,0 +1,9 @@
|
|||
require 'test/unit'
|
||||
require 'hermann'
|
||||
|
||||
class HermannTest < Test::Unit::TestCase
|
||||
|
||||
def test_stub
|
||||
assert_equal "Apples", "Apples"
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue