fixup exception handling in consumer

This commit is contained in:
jway 2014-11-13 10:53:39 -08:00
parent 4ef0bb0498
commit 6300705af9
5 changed files with 65 additions and 39 deletions

View File

@ -21,11 +21,9 @@ module Hermann
#
# @params [String] comma separated zookeeper list
#
# @params [Hash] options for consumer
# @params [Hash] options for Consumer
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [String] :partition (for MRI) The kafka partition
# @option opts [Fixnum] :sleep_time (Jruby) Time to sleep between consume retries, defaults to 1sec
# @option opts [Boolean] :do_retry (Jruby) Retry consume attempts if exceptions are thrown, defaults to true
def initialize(topic, groupId, zookeepers, opts={})
@topic = topic
@brokers = brokers

View File

@ -12,11 +12,18 @@ module Hermann
NUM_THREADS = 1
#default zookeeper connection options
DEFAULTS = {
'zookeeper.session.timeout.ms' => '400',
'zookeeper.sync.time.ms' => '200',
'auto.commit.interval.ms' => '1000'
}.freeze
DEFAULTS_HERMANN_OPTS = {
'zookeeper.session.timeout.ms' => '400',
'zookeeper.sync.time.ms' => '200',
'auto.commit.interval.ms' => '1000'
}.freeze
DEFAULT_CONSUMER_OPTIONS = {
:do_retry => true,
:max_retries => 3,
:backoff_time_sec => 1,
:logger => nil
}.freeze
# Instantiate JavaSimpleConsumer
#
@ -27,14 +34,20 @@ module Hermann
# @params [String] Kafka topic
#
# @params [Hash] kafka options for consumer
# @option opts [Fixnum] :sleep_time Time to sleep between consume retries, defaults to 1sec
# @option opts [Fixnum] :backoff_time_sec Time to sleep between consume retries, defaults to 1sec
# @option opts [Boolean] :do_retry Retry consume attempts if exceptions are thrown, defaults to true
# @option opts [Fixnum] :max_retries Number of max_retries to retry #consume when it throws an exception
# @option opts [Logger] :logger Pass in a Logger
def initialize(zookeepers, groupId, topic, opts={})
config = create_config(zookeepers, groupId)
@consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
@topic = topic
@sleep_time = opts.delete(:sleep_time) || 1
@do_retry = opts.delete(:do_retry) || true
config = create_config(zookeepers, groupId)
@consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config)
@topic = topic
options = DEFAULT_CONSUMER_OPTIONS.merge(opts)
@backoff_time_sec = options[:backoff_time_sec]
@do_retry = options[:do_retry]
@max_retries = options[:max_retries]
@logger = options[:logger]
end
# Shuts down the various threads created by createMessageStreams
@ -63,14 +76,13 @@ module Hermann
message = it.next.message
yield String.from_java_bytes(message)
end
rescue Exception => e
puts "#{self.class.name}#consume exception: #{e.class.name}"
puts "Msg: #{e.message}"
puts e.backtrace.join("\n")
if retry?
sleep @sleep_time
rescue => e
if retry? && @max_retries > 0
sleep @backoff_time_sec
@max_retries -= 1
retry
else
log_exception(e)
raise e
end
end
@ -81,6 +93,13 @@ module Hermann
@do_retry
end
def log_exception(e)
return unless @logger
@logger.error("#{self.class.name}#consume exception: #{e.class.name}")
@logger.error("Msg: #{e.message}")
@logger.error(e.backtrace.join("\n"))
end
# Gets the message stream of the topic. Creates message streams for
# a topic and the number of threads requested. In this case the default
# number of threads is NUM_THREADS.
@ -106,7 +125,7 @@ module Hermann
# @raises [RuntimeError] if options does not contain key value strings
def create_config(zookeepers, groupId, opts={})
config = connect_opts(zookeepers, groupId)
options = DEFAULTS.merge(config).merge(opts)
options = DEFAULTS_HERMANN_OPTS.merge(config).merge(opts)
properties = Hermann.package_properties(options)
ConsumerUtil::ConsumerConfig.new(properties)
end

View File

@ -1,15 +1,15 @@
# this is a generated file, to avoid over-writing it just delete this comment
require 'jar_dependencies'
require_jar( 'junit', 'junit', '3.8.1' )
require_jar( 'com.101tec', 'zkclient', '0.3' )
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
require_jar( 'log4j', 'log4j', '1.2.17' )
require_jar( 'jline', 'jline', '0.9.94' )
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
require_jar( 'org.mod4j.org.eclipse.xtext', 'log4j', '1.2.15' )
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )
require_jar( 'jline', 'jline', '0.9.94' )
require_jar( 'com.101tec', 'zkclient', '0.3' )
require_jar( 'org.mod4j.org.eclipse.xtext', 'log4j', '1.2.15' )
require_jar( 'junit', 'junit', '3.8.1' )
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )

View File

@ -29,14 +29,14 @@ describe 'producer' do
next value
end
end
let(:brokers) do
let(:broker_ids) do
broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers
puts ""
puts "using ZK discovered brokers: #{broker_ids}"
puts ""
broker_ids
end
let(:producer) { Hermann::Producer.new(nil, brokers) }
let(:producer) { Hermann::Producer.new(nil, broker_ids) }
it 'produces and consumes messages', :type => :integration, :platform => :java do
producer.push(message, :topic => topic).value!(timeout)
@ -48,7 +48,7 @@ describe 'producer' do
let(:event) do
Hermann::TestEvent.new(:name => 'rspec',
:state => 3,
:bogomips => 9001)
:bogomips => 9001)
end
let(:message) { event.encode }

View File

@ -3,12 +3,13 @@ require 'hermann/provider/java_simple_consumer'
require 'hermann/errors'
describe Hermann::Provider::JavaSimpleConsumer, :platform => :java do
subject(:consumer) { described_class.new(zookeeper, groupId, topic) }
subject(:consumer) { described_class.new(zookeeper, groupId, topic, {:logger => logger}) }
let(:zookeeper) { 'localhost:2181' }
let(:groupId) { 'groupId' }
let(:topic) { 'topic' }
let(:internal_consumer) { double('ConsumerUtil::Consumer') }
let(:logger) { double('logger') }
before do
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { internal_consumer }
@ -31,10 +32,18 @@ describe Hermann::Provider::JavaSimpleConsumer, :platform => :java do
end
it 'retries consuming if there is an exception' do
allow(consumer).to receive(:get_stream).and_raise(StandardError)
#artificially allow one one retry
allow(consumer).to receive(:retry?).and_return(true, false)
expect(consumer).to receive(:sleep).once
expect{ |b| subject.consume(&b) }.to raise_error(StandardError)
expect(consumer).to receive(:sleep).exactly(3).times
expect(logger).to receive(:error).exactly(3).times
expect{ |b| consumer.consume(&b) }.to raise_error(StandardError)
end
context 'with no logger' do
subject(:consumer) { described_class.new(zookeeper, groupId, topic, {}) }
it 'does not call logger' do
allow(consumer).to receive(:get_stream).and_raise(StandardError)
allow(consumer).to receive(:sleep)
expect{ |b| consumer.consume(&b) }.to raise_error(StandardError)
end
end
end