add shutdown

* producer/consumer integration tests
This commit is contained in:
jway 2014-10-27 14:07:02 -07:00
parent 1f52bad871
commit 9acce0a64b
7 changed files with 92 additions and 2 deletions

View File

@ -101,3 +101,17 @@ producer = Hermann::Producer.new('topic', kafka_broker_ids)
promise = producer.push("foo") # returns Concurrent::Promise
promise.value!
```
### Integration Testing
* Download Kafka
* Start Zookeeper
* set port 2181
* Start Kafka
* Set properties file ```zookeeper.connect=localhost:2181```
* bundle exec jruby -S rspec spec/integration

View File

@ -27,6 +27,7 @@ Gem::Specification.new do |s|
s.add_dependency "zk", "~> 1.9.4"
if RUBY_PLATFORM == "java"
#IMPORTANT: make sure that jar-dependencies is only a development dependency of your gem. if it is a runtime dependencies the require_jars file will be overwritten during installation.
s.add_dependency 'jar-dependencies', '~>0.1.2'
s.add_development_dependency 'ruby-maven', '~> 3.1.1.0'
s.add_development_dependency 'rake'

View File

@ -38,8 +38,18 @@ module Hermann
end
end
# Delegates the consume method to internal consumer classes
def consume(topic=nil, &block)
@internal.consume(topic, &block)
end
# Delegates the shutdown of kafka messages threads to internal consumer classes
def shutdown
if Hermann.jruby?
@internal.shutdown
else
#no op
end
end
end
end

View File

@ -37,6 +37,13 @@ module Hermann
@do_retry = opts.delete(:do_retry) || true
end
# Shuts down the various threads created by createMessageStreams
# This can be called after the thread executing consume has exited
# to clean up.
def shutdown
@consumer.shutdown
end
# Starts infinite loop to consume messages. hasNext() blocks until a
# message is available at which point it is yielded to the block
#

View File

@ -37,19 +37,38 @@ describe Hermann::Consumer do
it_behaves_like 'an error condition'
end
end
describe '#shutdown' do
it 'does nothing' do
expect(consumer.shutdown).to be nil
end
end
end
context 'on Jruby', :platform => :java do
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
let(:zookeepers) { 'localhost:2181' }
let(:zookeepers) { 'localhost:2181' }
let(:groupId) { 'groupId' }
let(:do_retry) { true }
let(:sleep_time) { 1 }
let(:internal) { double('Hermann::Provider::JavaSimpleConsumer')}
before do
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
end
it 'creates a Hermann::Provider::JavaSimpleConsumer' do
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
expect(subject.internal).to be_a(Hermann::Provider::JavaSimpleConsumer)
end
describe '#shutdown' do
let(:result) { double('NoClass') }
it 'calls shutdown' do
consumer.instance_variable_set(:@internal, internal)
allow(internal).to receive(:shutdown) { result }
expect(consumer.shutdown).to eq result
end
end
end
end

View File

@ -0,0 +1,38 @@
require 'spec_helper'
require 'hermann/producer'
require 'hermann/consumer'
require 'hermann/discovery/zookeeper'
require 'concurrent'
class ConsumerTest
def create_consumer
zookeeper = "localhost:2181"
groupId = "group1"
topic = 'test'
consumer = Hermann::Consumer.new(topic, groupId, zookeeper)
consumer.consume(topic) do |msg|
if msg == 'msg'
consumer.shutdown
return true
end
end
false
end
end
describe 'producer' do
include_context 'integration test context'
let(:message) { 'msg' }
it 'produces and consumes messages', :type => :integration do
test_consumer = Concurrent::Promise.execute { ConsumerTest.new.create_consumer }
broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers
producer = Hermann::Producer.new(nil, broker_ids)
producer.push(message, :topic => topic).wait(1)
expect(test_consumer.value(1)).to be true
end
end

View File

@ -15,6 +15,7 @@ RSpec.configure do |c|
shared_context 'integration test context', :type => :integration do
let(:topic) { $integrationconf['kafka']['topic'] }
let(:brokers) { $integrationconf['kafka']['brokers'] }
let(:zookeepers) { $integrationconf['zookeepers'] }
end
end