mirror of https://github.com/reiseburo/hermann
commit
b48333524c
|
@ -2,19 +2,34 @@ module Hermann
|
|||
def self.jruby?
|
||||
return RUBY_PLATFORM == "java"
|
||||
end
|
||||
|
||||
if self.jruby?
|
||||
require 'java'
|
||||
require 'hermann_jars'
|
||||
|
||||
module JavaUtil
|
||||
include_package 'java.util'
|
||||
end
|
||||
module ProducerUtil
|
||||
include_package 'kafka.producer'
|
||||
end
|
||||
module JavaApiUtil
|
||||
include_package 'kafka.javaapi.producer'
|
||||
# Validates that the args are non-blank strings
|
||||
#
|
||||
# @param [Object] key to validate
|
||||
#
|
||||
# @param [Object] val to validate
|
||||
#
|
||||
# @raises [ConfigurationErorr] if either values are empty
|
||||
def self.validate_property!(key, val)
|
||||
if key.to_s.empty? || val.to_s.empty?
|
||||
raise Hermann::Errors::ConfigurationError
|
||||
end
|
||||
end
|
||||
|
||||
# Packages options into Java Properties object
|
||||
#
|
||||
# @params [Hash] hash of options to package
|
||||
#
|
||||
# @return [Properties] packaged java properties
|
||||
def self.package_properties(options)
|
||||
properties = JavaUtil::Properties.new
|
||||
options.each do |key, val|
|
||||
Hermann.validate_property!(key, val)
|
||||
properties.put(key, val)
|
||||
end
|
||||
properties
|
||||
end
|
||||
end
|
||||
|
||||
if Hermann.jruby?
|
||||
require 'hermann/java'
|
||||
end
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
require 'hermann'
|
||||
|
||||
unless Hermann.jruby?
|
||||
if Hermann.jruby?
|
||||
require 'hermann/provider/java_simple_consumer'
|
||||
else
|
||||
require 'hermann_lib'
|
||||
end
|
||||
|
||||
|
@ -8,11 +10,30 @@ module Hermann
|
|||
class Consumer
|
||||
attr_reader :topic, :brokers, :partition, :internal
|
||||
|
||||
def initialize(topic, brokers, partition)
|
||||
|
||||
# Instantiate Consumer
|
||||
#
|
||||
# @params [String] kafka topic
|
||||
#
|
||||
# @params [String] group ID
|
||||
#
|
||||
# @params [String] comma separated zookeeper list
|
||||
#
|
||||
# @params [Hash] options for consumer
|
||||
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
|
||||
# @option opts [String] :partition (for MRI) The kafka partition
|
||||
# @option opts [Fixnum] :sleep_time (Jruby) Time to sleep between consume retries, defaults to 1sec
|
||||
# @option opts [Boolean] :do_retry (Jruby) Retry consume attempts if exceptions are thrown, defaults to true
|
||||
def initialize(topic, groupId, zookeepers, opts={})
|
||||
@topic = topic
|
||||
@brokers = brokers
|
||||
@partition = partition
|
||||
unless Hermann.jruby?
|
||||
|
||||
if Hermann.jruby?
|
||||
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, groupId, topic, opts)
|
||||
else
|
||||
brokers = opts.delete(:brokers)
|
||||
partition = opts.delete(:partition)
|
||||
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
module Hermann
|
||||
require 'java'
|
||||
require 'hermann_jars'
|
||||
|
||||
module JavaUtil
|
||||
include_package 'java.util'
|
||||
end
|
||||
module ProducerUtil
|
||||
include_package 'kafka.producer'
|
||||
end
|
||||
module ConsumerUtil
|
||||
include_package "kafka.consumer"
|
||||
end
|
||||
module JavaApiUtil
|
||||
include_package 'kafka.javaapi.producer'
|
||||
end
|
||||
module KafkaUtil
|
||||
include_package "kafka.util"
|
||||
end
|
||||
end
|
|
@ -11,6 +11,13 @@ module Hermann
|
|||
class JavaProducer
|
||||
attr_accessor :producer
|
||||
|
||||
#default kafka Producer options
|
||||
DEFAULTS = {
|
||||
'serializer.class' => 'kafka.serializer.StringEncoder',
|
||||
'partitioner.class' => 'kafka.producer.DefaultPartitioner',
|
||||
'request.required.acks' => '1',
|
||||
'message.send.max.retries' => '0'
|
||||
}.freeze
|
||||
|
||||
# Instantiate JavaProducer
|
||||
#
|
||||
|
@ -25,17 +32,10 @@ module Hermann
|
|||
# JavaProducer.new('0:9092', {'request.required.acks' => '1'})
|
||||
#
|
||||
def initialize(brokers, opts={})
|
||||
properties = create_properties(brokers, opts)
|
||||
config = create_config(properties)
|
||||
config = create_config(brokers, opts)
|
||||
@producer = JavaApiUtil::Producer.new(config)
|
||||
end
|
||||
|
||||
DEFAULTS = {
|
||||
'serializer.class' => 'kafka.serializer.StringEncoder',
|
||||
'partitioner.class' => 'kafka.producer.DefaultPartitioner',
|
||||
'request.required.acks' => '1'
|
||||
}.freeze
|
||||
|
||||
# Push a value onto the Kafka topic passed to this +Producer+
|
||||
#
|
||||
# @param [Object] value A single object to push
|
||||
|
@ -67,39 +67,21 @@ module Hermann
|
|||
end
|
||||
|
||||
private
|
||||
|
||||
# Creates a ProducerConfig object
|
||||
#
|
||||
# @param [Properties] object with broker properties
|
||||
#
|
||||
# @return [ProducerConfig] - packaged config for +Producer+
|
||||
def create_config(properties)
|
||||
ProducerUtil::ProducerConfig.new(properties)
|
||||
end
|
||||
|
||||
# Creates Properties Object
|
||||
# @param [String] comma separated list of brokers
|
||||
#
|
||||
# @param [Hash] brokers passed into this function
|
||||
# @option args [String] :brokers - string of brokers
|
||||
#
|
||||
# @return [Properties] properties object for creating +ProducerConfig+
|
||||
# @return [ProducerConfig] - packaged config for +Producer+
|
||||
#
|
||||
# @raises [RuntimeError] if options does not contain key value strings
|
||||
def create_properties(brokers, opts={})
|
||||
brokers = { 'metadata.broker.list' => brokers }
|
||||
options = DEFAULTS.merge(brokers).merge(opts)
|
||||
properties = JavaUtil::Properties.new
|
||||
options.each do |key, val|
|
||||
validate_property!(key, val)
|
||||
properties.put(key, val)
|
||||
end
|
||||
properties
|
||||
end
|
||||
|
||||
def validate_property!(key, val)
|
||||
if key.to_s.empty? || val.to_s.empty?
|
||||
raise Hermann::Errors::ConfigurationError, "Invalid Broker Properties"
|
||||
end
|
||||
def create_config(brokers, opts={})
|
||||
brokers = { 'metadata.broker.list' => brokers }
|
||||
options = DEFAULTS.merge(brokers).merge(opts)
|
||||
properties = Hermann.package_properties(options)
|
||||
ProducerUtil::ProducerConfig.new(properties)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
require 'hermann'
|
||||
require 'hermann/errors'
|
||||
|
||||
module Hermann
|
||||
module Provider
|
||||
|
||||
# Implements a java based consumer. The #consumer method loops infinitely,
|
||||
# the hasNext() method blocks until a message is available.
|
||||
class JavaSimpleConsumer
|
||||
attr_accessor :consumer, :topic, :zookeeper
|
||||
|
||||
NUM_THREADS = 1
|
||||
|
||||
#default zookeeper connection options
|
||||
DEFAULTS = {
|
||||
'zookeeper.session.timeout.ms' => '400',
|
||||
'zookeeper.sync.time.ms' => '200',
|
||||
'auto.commit.interval.ms' => '1000'
|
||||
}.freeze
|
||||
|
||||
# Instantiate JavaSimpleConsumer
|
||||
#
|
||||
# @params [String] list of zookeepers
|
||||
#
|
||||
# @params [String] Group ID
|
||||
#
|
||||
# @params [String] Kafka topic
|
||||
#
|
||||
# @params [Hash] kafka options for consumer
|
||||
# @option opts [Fixnum] :sleep_time Time to sleep between consume retries, defaults to 1sec
|
||||
# @option opts [Boolean] :do_retry Retry consume attempts if exceptions are thrown, defaults to true
|
||||
def initialize(zookeepers, groupId, topic, opts={})
|
||||
config = create_config(zookeepers, groupId)
|
||||
@consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
|
||||
@topic = topic
|
||||
@sleep_time = opts.delete(:sleep_time) || 1
|
||||
@do_retry = opts.delete(:do_retry) || true
|
||||
end
|
||||
|
||||
# Starts infinite loop to consume messages. hasNext() blocks until a
|
||||
# message is available at which point it is yielded to the block
|
||||
#
|
||||
# ==== Examples
|
||||
#
|
||||
# consumer.consume do |message|
|
||||
# puts "Received: #{message}"
|
||||
# end
|
||||
#
|
||||
def consume
|
||||
begin
|
||||
stream = get_stream
|
||||
it = stream.iterator
|
||||
while it.hasNext do
|
||||
yield it.next.message.to_s
|
||||
end
|
||||
rescue Exception => e
|
||||
puts "#{self.class.name}#consume exception: #{e.class.name}"
|
||||
puts "Msg: #{e.message}"
|
||||
puts e.backtrace.join("\n")
|
||||
if retry?
|
||||
sleep @sleep_time
|
||||
retry
|
||||
else
|
||||
raise e
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def retry?
|
||||
@do_retry
|
||||
end
|
||||
|
||||
# Gets the message stream of the topic. Creates message streams for
|
||||
# a topic and the number of threads requested. In this case the default
|
||||
# number of threads is NUM_THREADS.
|
||||
def get_stream
|
||||
@topicCountMap = JavaUtil::HashMap.new
|
||||
@value = NUM_THREADS.to_java Java::int
|
||||
@topicCountMap.put("#{@topic}", @value)
|
||||
consumerMap = @consumer.createMessageStreams(@topicCountMap)
|
||||
consumerMap[@topic].first
|
||||
end
|
||||
|
||||
# Creates a ConsumerConfig object
|
||||
#
|
||||
# @param [String] zookeepers list
|
||||
#
|
||||
# @param [String] group ID
|
||||
#
|
||||
# @return [ConsumerConfig] - packaged config for +Consumer+
|
||||
#
|
||||
# @raises [RuntimeError] if options does not contain key value strings
|
||||
def create_config(zookeepers, groupId, opts={})
|
||||
config = connect_opts(zookeepers, groupId)
|
||||
options = DEFAULTS.merge(config).merge(opts)
|
||||
properties = Hermann.package_properties(options)
|
||||
ConsumerUtil::ConsumerConfig.new(properties)
|
||||
end
|
||||
|
||||
# Connection options to pass to ConsumerConfig
|
||||
def connect_opts(zookeepers, groupId)
|
||||
{
|
||||
'zookeeper.connect' => zookeepers,
|
||||
'group.id' => groupId
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -3,35 +3,53 @@ require 'hermann/consumer'
|
|||
|
||||
# XXX: Hermann::Consumer isn't really supported anywhere, MRI included right
|
||||
# now
|
||||
describe Hermann::Consumer, :platform => :mri do
|
||||
subject(:consumer) { described_class.new(topic, brokers, partition) }
|
||||
describe Hermann::Consumer do
|
||||
subject(:consumer) { described_class.new(topic, nil, nil, opts) }
|
||||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
let(:partition) { 1 }
|
||||
let(:opts) { { :brokers => brokers, :partition => partition } }
|
||||
|
||||
it { should respond_to :consume }
|
||||
|
||||
describe '#consume' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { consumer.consume }.to raise_error(RuntimeError)
|
||||
context "on C ruby", :platform => :mri do
|
||||
it { should respond_to :consume }
|
||||
|
||||
describe '#consume' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { consumer.consume }.to raise_error(RuntimeError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad partition' do
|
||||
let(:partition) { -1 }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a bad broker' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad partition' do
|
||||
let(:partition) { -1 }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
context 'on Jruby', :platform => :java do
|
||||
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
|
||||
|
||||
context 'with a bad broker' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
let(:zookeepers) { 'localhost:2181' }
|
||||
let(:groupId) { 'groupId' }
|
||||
let(:do_retry) { true }
|
||||
let(:sleep_time) { 1 }
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
it 'creates a Hermann::Provider::JavaSimpleConsumer' do
|
||||
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
|
||||
expect(subject.internal).to be_a(Hermann::Provider::JavaSimpleConsumer)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,36 @@
|
|||
require 'spec_helper'
|
||||
require 'hermann/errors'
|
||||
|
||||
describe Hermann do
|
||||
it { should be_instance_of Module }
|
||||
|
||||
describe '.validate_property!' do
|
||||
subject { described_class.send(:validate_property!, foo, bar) }
|
||||
|
||||
context 'with valid property' do
|
||||
let(:foo) { 'foo' }
|
||||
let(:bar) { 'bar' }
|
||||
it 'returns true' do
|
||||
expect{ subject }.to_not raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with valid property' do
|
||||
let(:foo) { '' }
|
||||
let(:bar) { '' }
|
||||
it 'returns false' do
|
||||
expect{ subject }.to raise_error(Hermann::Errors::ConfigurationError)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'on Jruby', :platform => :java do
|
||||
describe '.package_properties' do
|
||||
let(:options) { { 'f' => '1' } }
|
||||
it 'puts options into java Properties' do
|
||||
expect(described_class.package_properties(options)).to eq options
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -3,10 +3,11 @@ require 'hermann/provider/java_producer'
|
|||
require 'hermann/errors'
|
||||
|
||||
describe Hermann::Provider::JavaProducer, :platform => :java do
|
||||
subject(:producer) { described_class.new(brokers) }
|
||||
subject(:producer) { described_class.new(brokers, opts) }
|
||||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { '0:1337'}
|
||||
let(:opts) { {} }
|
||||
|
||||
describe '#push_single' do
|
||||
subject(:result) { producer.push_single('foo', topic, nil) }
|
||||
|
@ -43,13 +44,7 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
it 'should reject' do
|
||||
future = result.wait(1)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
|
@ -59,17 +54,20 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
end
|
||||
end
|
||||
|
||||
describe '#create_properties' do
|
||||
subject { producer.send(:create_properties, brokers, opts) }
|
||||
describe '#create_config' do
|
||||
let(:opts) { {'f'=>'1'} }
|
||||
let(:result) {
|
||||
let(:options) {
|
||||
Hermann::Provider::JavaProducer::DEFAULTS.merge({
|
||||
"metadata.broker.list"=>brokers, "f"=>"1"
|
||||
})
|
||||
}
|
||||
let(:producer_config) { double }
|
||||
|
||||
it 'creates Properties' do
|
||||
expect(subject).to eq result
|
||||
expect(Hermann).to receive(:package_properties).with(options)
|
||||
expect(Hermann::ProducerUtil::ProducerConfig).to receive(:new) { producer_config }
|
||||
expect(Hermann::JavaApiUtil::Producer).to receive(:new) { double }
|
||||
expect(subject).to_not be_nil
|
||||
end
|
||||
context 'without brokers' do
|
||||
let(:brokers) { '' }
|
||||
|
@ -78,24 +76,4 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#validate_property!' do
|
||||
subject { producer.send(:validate_property!, foo, bar) }
|
||||
|
||||
context 'with valid property' do
|
||||
let(:foo) { 'foo' }
|
||||
let(:bar) { 'bar' }
|
||||
it 'returns true' do
|
||||
expect{ subject }.to_not raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with valid property' do
|
||||
let(:foo) { '' }
|
||||
let(:bar) { '' }
|
||||
it 'returns false' do
|
||||
expect{ subject }.to raise_error(Hermann::Errors::ConfigurationError)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
require 'spec_helper'
|
||||
require 'hermann/provider/java_simple_consumer'
|
||||
require 'hermann/errors'
|
||||
|
||||
describe Hermann::Provider::JavaSimpleConsumer, :platform => :java do
|
||||
subject(:consumer) { described_class.new(zookeeper, groupId, topic) }
|
||||
|
||||
let(:zookeeper) { 'localhost:2181' }
|
||||
let(:groupId) { 'groupId' }
|
||||
let(:topic) { 'topic' }
|
||||
let(:internal_consumer) { double('ConsumerUtil::Consumer') }
|
||||
|
||||
before do
|
||||
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { internal_consumer }
|
||||
end
|
||||
|
||||
describe '#consume' do
|
||||
let(:stream) { double }
|
||||
let(:iterator) { double }
|
||||
let(:msg) { double }
|
||||
|
||||
it 'yields messages one at a time' do
|
||||
allow(consumer).to receive(:get_stream) { stream }
|
||||
allow(stream).to receive(:iterator) { iterator }
|
||||
allow(iterator).to receive(:hasNext).and_return(true, false)
|
||||
allow(iterator).to receive_message_chain(:next, :message, :to_s) { msg }
|
||||
|
||||
expect{ |b|
|
||||
subject.consume(&b)
|
||||
}.to yield_with_args(msg)
|
||||
end
|
||||
it 'retries consuming if there is an exception' do
|
||||
allow(consumer).to receive(:get_stream).and_raise(StandardError)
|
||||
#artificially allow one one retry
|
||||
allow(consumer).to receive(:retry?).and_return(true, false)
|
||||
expect(consumer).to receive(:sleep).once
|
||||
expect{ |b| subject.consume(&b) }.to raise_error(StandardError)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#get_stream' do
|
||||
subject { consumer.send(:get_stream) }
|
||||
|
||||
let(:map) { { topic => ['foo'] } }
|
||||
|
||||
it 'gets the consumer stream' do
|
||||
allow(internal_consumer).to receive(:createMessageStreams) { map }
|
||||
expect(subject).to eq 'foo'
|
||||
end
|
||||
end
|
||||
|
||||
describe '#create_config' do
|
||||
subject { consumer.send(:create_config, zookeeper, groupId) }
|
||||
|
||||
it 'creates the consumer config' do
|
||||
expect(subject).to be_a Hermann::ConsumerUtil::ConsumerConfig
|
||||
end
|
||||
end
|
||||
|
||||
describe '#connect_opts' do
|
||||
subject { consumer.send(:connect_opts, zookeeper, groupId) }
|
||||
|
||||
it 'creates a hash of connection options' do
|
||||
expect(subject).to be_a Hash
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue