add simple java consumer

This commit is contained in:
jway 2014-10-22 17:12:44 -07:00
parent 3c0550151d
commit 972fe47410
9 changed files with 340 additions and 99 deletions

View File

@ -2,19 +2,34 @@ module Hermann
def self.jruby?
return RUBY_PLATFORM == "java"
end
if self.jruby?
require 'java'
require 'hermann_jars'
module JavaUtil
include_package 'java.util'
end
module ProducerUtil
include_package 'kafka.producer'
end
module JavaApiUtil
include_package 'kafka.javaapi.producer'
# Validates that the args are non-blank strings
#
# @param [Object] key to validate
#
# @param [Object] val to validate
#
# @raises [ConfigurationErorr] if either values are empty
def self.validate_property!(key, val)
if key.to_s.empty? || val.to_s.empty?
raise Hermann::Errors::ConfigurationError
end
end
# Packages options into Java Properties object
#
# @params [Hash] hash of options to package
#
# @return [Properties] packaged java properties
def self.package_properties(options)
properties = JavaUtil::Properties.new
options.each do |key, val|
Hermann.validate_property!(key, val)
properties.put(key, val)
end
properties
end
end
if Hermann.jruby?
require 'hermann/java'
end

View File

@ -1,6 +1,8 @@
require 'hermann'
unless Hermann.jruby?
if Hermann.jruby?
require 'hermann/provider/java_simple_consumer'
else
require 'hermann_lib'
end
@ -8,11 +10,30 @@ module Hermann
class Consumer
attr_reader :topic, :brokers, :partition, :internal
def initialize(topic, brokers, partition)
# Instantiate Consumer
#
# @params [String] kafka topic
#
# @params [String] group ID
#
# @params [String] comma separated zookeeper list
#
# @params [Hash] options for consumer
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [String] :partition (for MRI) The kafka partition
# @option opts [Fixnum] :sleep_time (Jruby) Time to sleep between consume retries, defaults to 1sec
# @option opts [Boolean] :do_retry (Jruby) Retry consume attempts if exceptions are thrown, defaults to true
def initialize(topic, groupId, zookeepers, opts={})
@topic = topic
@brokers = brokers
@partition = partition
unless Hermann.jruby?
if Hermann.jruby?
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, groupId, topic, opts)
else
brokers = opts.delete(:brokers)
partition = opts.delete(:partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
end
end

20
lib/hermann/java.rb Normal file
View File

@ -0,0 +1,20 @@
module Hermann
require 'java'
require 'hermann_jars'
module JavaUtil
include_package 'java.util'
end
module ProducerUtil
include_package 'kafka.producer'
end
module ConsumerUtil
include_package "kafka.consumer"
end
module JavaApiUtil
include_package 'kafka.javaapi.producer'
end
module KafkaUtil
include_package "kafka.util"
end
end

View File

@ -11,6 +11,13 @@ module Hermann
class JavaProducer
attr_accessor :producer
#default kafka Producer options
DEFAULTS = {
'serializer.class' => 'kafka.serializer.StringEncoder',
'partitioner.class' => 'kafka.producer.DefaultPartitioner',
'request.required.acks' => '1',
'message.send.max.retries' => '0'
}.freeze
# Instantiate JavaProducer
#
@ -25,17 +32,10 @@ module Hermann
# JavaProducer.new('0:9092', {'request.required.acks' => '1'})
#
def initialize(brokers, opts={})
properties = create_properties(brokers, opts)
config = create_config(properties)
config = create_config(brokers, opts)
@producer = JavaApiUtil::Producer.new(config)
end
DEFAULTS = {
'serializer.class' => 'kafka.serializer.StringEncoder',
'partitioner.class' => 'kafka.producer.DefaultPartitioner',
'request.required.acks' => '1'
}.freeze
# Push a value onto the Kafka topic passed to this +Producer+
#
# @param [Object] value A single object to push
@ -67,39 +67,21 @@ module Hermann
end
private
# Creates a ProducerConfig object
#
# @param [Properties] object with broker properties
#
# @return [ProducerConfig] - packaged config for +Producer+
def create_config(properties)
ProducerUtil::ProducerConfig.new(properties)
end
# Creates Properties Object
# @param [String] comma separated list of brokers
#
# @param [Hash] brokers passed into this function
# @option args [String] :brokers - string of brokers
#
# @return [Properties] properties object for creating +ProducerConfig+
# @return [ProducerConfig] - packaged config for +Producer+
#
# @raises [RuntimeError] if options does not contain key value strings
def create_properties(brokers, opts={})
brokers = { 'metadata.broker.list' => brokers }
options = DEFAULTS.merge(brokers).merge(opts)
properties = JavaUtil::Properties.new
options.each do |key, val|
validate_property!(key, val)
properties.put(key, val)
end
properties
end
def validate_property!(key, val)
if key.to_s.empty? || val.to_s.empty?
raise Hermann::Errors::ConfigurationError, "Invalid Broker Properties"
end
def create_config(brokers, opts={})
brokers = { 'metadata.broker.list' => brokers }
options = DEFAULTS.merge(brokers).merge(opts)
properties = Hermann.package_properties(options)
ProducerUtil::ProducerConfig.new(properties)
end
end
end

View File

@ -0,0 +1,110 @@
require 'hermann'
require 'hermann/errors'
module Hermann
module Provider
# Implements a java based consumer. The #consumer method loops infinitely,
# the hasNext() method blocks until a message is available.
class JavaSimpleConsumer
attr_accessor :consumer, :topic, :zookeeper
NUM_THREADS = 1
#default zookeeper connection options
DEFAULTS = {
'zookeeper.session.timeout.ms' => '400',
'zookeeper.sync.time.ms' => '200',
'auto.commit.interval.ms' => '1000'
}.freeze
# Instantiate JavaSimpleConsumer
#
# @params [String] list of zookeepers
#
# @params [String] Group ID
#
# @params [String] Kafka topic
#
# @params [Hash] kafka options for consumer
# @option opts [Fixnum] :sleep_time Time to sleep between consume retries, defaults to 1sec
# @option opts [Boolean] :do_retry Retry consume attempts if exceptions are thrown, defaults to true
def initialize(zookeepers, groupId, topic, opts={})
config = create_config(zookeepers, groupId)
@consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
@topic = topic
@sleep_time = opts.delete(:sleep_time) || 1
@do_retry = opts.delete(:do_retry) || true
end
# Starts infinite loop to consume messages. hasNext() blocks until a
# message is available at which point it is yielded to the block
#
# ==== Examples
#
# consumer.consume do |message|
# puts "Received: #{message}"
# end
#
def consume
begin
stream = get_stream
it = stream.iterator
while it.hasNext do
yield it.next.message.to_s
end
rescue Exception => e
puts "#{self.class.name}#consume exception: #{e.class.name}"
puts "Msg: #{e.message}"
puts e.backtrace.join("\n")
if retry?
sleep @sleep_time
retry
else
raise e
end
end
end
private
def retry?
@do_retry
end
# Gets the message stream of the topic. Creates message streams for
# a topic and the number of threads requested. In this case the default
# number of threads is NUM_THREADS.
def get_stream
@topicCountMap = JavaUtil::HashMap.new
@value = NUM_THREADS.to_java Java::int
@topicCountMap.put("#{@topic}", @value)
consumerMap = @consumer.createMessageStreams(@topicCountMap)
consumerMap[@topic].first
end
# Creates a ConsumerConfig object
#
# @param [String] zookeepers list
#
# @param [String] group ID
#
# @return [ConsumerConfig] - packaged config for +Consumer+
#
# @raises [RuntimeError] if options does not contain key value strings
def create_config(zookeepers, groupId, opts={})
config = connect_opts(zookeepers, groupId)
options = DEFAULTS.merge(config).merge(opts)
properties = Hermann.package_properties(options)
ConsumerUtil::ConsumerConfig.new(properties)
end
# Connection options to pass to ConsumerConfig
def connect_opts(zookeepers, groupId)
{
'zookeeper.connect' => zookeepers,
'group.id' => groupId
}
end
end
end
end

View File

@ -3,35 +3,53 @@ require 'hermann/consumer'
# XXX: Hermann::Consumer isn't really supported anywhere, MRI included right
# now
describe Hermann::Consumer, :platform => :mri do
subject(:consumer) { described_class.new(topic, brokers, partition) }
describe Hermann::Consumer do
subject(:consumer) { described_class.new(topic, nil, nil, opts) }
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
let(:partition) { 1 }
let(:opts) { { :brokers => brokers, :partition => partition } }
it { should respond_to :consume }
describe '#consume' do
shared_examples 'an error condition' do
it 'should raise an exception' do
expect { consumer.consume }.to raise_error(RuntimeError)
context "on C ruby", :platform => :mri do
it { should respond_to :consume }
describe '#consume' do
shared_examples 'an error condition' do
it 'should raise an exception' do
expect { consumer.consume }.to raise_error(RuntimeError)
end
end
context 'with a bad partition' do
let(:partition) { -1 }
it_behaves_like 'an error condition'
end
context 'with a bad broker' do
let(:brokers) { '' }
it_behaves_like 'an error condition'
end
context 'with a bad topic' do
let(:topic) { '' }
it_behaves_like 'an error condition'
end
end
end
context 'with a bad partition' do
let(:partition) { -1 }
it_behaves_like 'an error condition'
end
context 'on Jruby', :platform => :java do
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
context 'with a bad broker' do
let(:brokers) { '' }
it_behaves_like 'an error condition'
end
let(:zookeepers) { 'localhost:2181' }
let(:groupId) { 'groupId' }
let(:do_retry) { true }
let(:sleep_time) { 1 }
context 'with a bad topic' do
let(:topic) { '' }
it_behaves_like 'an error condition'
it 'creates a Hermann::Provider::JavaSimpleConsumer' do
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
expect(subject.internal).to be_a(Hermann::Provider::JavaSimpleConsumer)
end
end
end

View File

@ -1,6 +1,36 @@
require 'spec_helper'
require 'hermann/errors'
describe Hermann do
it { should be_instance_of Module }
describe '.validate_property!' do
subject { described_class.send(:validate_property!, foo, bar) }
context 'with valid property' do
let(:foo) { 'foo' }
let(:bar) { 'bar' }
it 'returns true' do
expect{ subject }.to_not raise_error
end
end
context 'with valid property' do
let(:foo) { '' }
let(:bar) { '' }
it 'returns false' do
expect{ subject }.to raise_error(Hermann::Errors::ConfigurationError)
end
end
end
context 'on Jruby', :platform => :java do
describe '.package_properties' do
let(:options) { { 'f' => '1' } }
it 'puts options into java Properties' do
expect(described_class.package_properties(options)).to eq options
end
end
end
end

View File

@ -3,10 +3,11 @@ require 'hermann/provider/java_producer'
require 'hermann/errors'
describe Hermann::Provider::JavaProducer, :platform => :java do
subject(:producer) { described_class.new(brokers) }
subject(:producer) { described_class.new(brokers, opts) }
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
describe '#push_single' do
subject(:result) { producer.push_single('foo', topic, nil) }
@ -43,13 +44,7 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
context 'with a non-existing broker' do
let(:brokers) { 'localhost:13337' }
let(:timeout) { 2 }
let(:value) { 'rspec' }
it 'should reject' do
future = result.wait(1)
expect(future).to be_rejected
end
it_behaves_like 'an error condition'
end
context 'with a bad topic' do
@ -59,17 +54,20 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
end
end
describe '#create_properties' do
subject { producer.send(:create_properties, brokers, opts) }
describe '#create_config' do
let(:opts) { {'f'=>'1'} }
let(:result) {
let(:options) {
Hermann::Provider::JavaProducer::DEFAULTS.merge({
"metadata.broker.list"=>brokers, "f"=>"1"
})
}
let(:producer_config) { double }
it 'creates Properties' do
expect(subject).to eq result
expect(Hermann).to receive(:package_properties).with(options)
expect(Hermann::ProducerUtil::ProducerConfig).to receive(:new) { producer_config }
expect(Hermann::JavaApiUtil::Producer).to receive(:new) { double }
expect(subject).to_not be_nil
end
context 'without brokers' do
let(:brokers) { '' }
@ -78,24 +76,4 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
end
end
end
describe '#validate_property!' do
subject { producer.send(:validate_property!, foo, bar) }
context 'with valid property' do
let(:foo) { 'foo' }
let(:bar) { 'bar' }
it 'returns true' do
expect{ subject }.to_not raise_error
end
end
context 'with valid property' do
let(:foo) { '' }
let(:bar) { '' }
it 'returns false' do
expect{ subject }.to raise_error(Hermann::Errors::ConfigurationError)
end
end
end
end

View File

@ -0,0 +1,67 @@
require 'spec_helper'
require 'hermann/provider/java_simple_consumer'
require 'hermann/errors'
describe Hermann::Provider::JavaSimpleConsumer, :platform => :java do
subject(:consumer) { described_class.new(zookeeper, groupId, topic) }
let(:zookeeper) { 'localhost:2181' }
let(:groupId) { 'groupId' }
let(:topic) { 'topic' }
let(:internal_consumer) { double('ConsumerUtil::Consumer') }
before do
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { internal_consumer }
end
describe '#consume' do
let(:stream) { double }
let(:iterator) { double }
let(:msg) { double }
it 'yields messages one at a time' do
allow(consumer).to receive(:get_stream) { stream }
allow(stream).to receive(:iterator) { iterator }
allow(iterator).to receive(:hasNext).and_return(true, false)
allow(iterator).to receive_message_chain(:next, :message, :to_s) { msg }
expect{ |b|
subject.consume(&b)
}.to yield_with_args(msg)
end
it 'retries consuming if there is an exception' do
allow(consumer).to receive(:get_stream).and_raise(StandardError)
#artificially allow one one retry
allow(consumer).to receive(:retry?).and_return(true, false)
expect(consumer).to receive(:sleep).once
expect{ |b| subject.consume(&b) }.to raise_error(StandardError)
end
end
describe '#get_stream' do
subject { consumer.send(:get_stream) }
let(:map) { { topic => ['foo'] } }
it 'gets the consumer stream' do
allow(internal_consumer).to receive(:createMessageStreams) { map }
expect(subject).to eq 'foo'
end
end
describe '#create_config' do
subject { consumer.send(:create_config, zookeeper, groupId) }
it 'creates the consumer config' do
expect(subject).to be_a Hermann::ConsumerUtil::ConsumerConfig
end
end
describe '#connect_opts' do
subject { consumer.send(:connect_opts, zookeeper, groupId) }
it 'creates a hash of connection options' do
expect(subject).to be_a Hash
end
end
end