diff --git a/README.md b/README.md index 4f409f9..35cca5a 100644 --- a/README.md +++ b/README.md @@ -101,3 +101,17 @@ producer = Hermann::Producer.new('topic', kafka_broker_ids) promise = producer.push("foo") # returns Concurrent::Promise promise.value! ``` + +### Integration Testing + +* Download Kafka +* Start Zookeeper + * set port 2181 +* Start Kafka + * Set properties file ```zookeeper.connect=localhost:2181``` +* bundle exec jruby -S rspec spec/integration + + + + + diff --git a/hermann.gemspec b/hermann.gemspec index 73c5d78..d72c5cd 100644 --- a/hermann.gemspec +++ b/hermann.gemspec @@ -27,6 +27,7 @@ Gem::Specification.new do |s| s.add_dependency "zk", "~> 1.9.4" if RUBY_PLATFORM == "java" + #IMPORTANT: make sure that jar-dependencies is only a development dependency of your gem. if it is a runtime dependencies the require_jars file will be overwritten during installation. s.add_dependency 'jar-dependencies', '~>0.1.2' s.add_development_dependency 'ruby-maven', '~> 3.1.1.0' s.add_development_dependency 'rake' diff --git a/lib/hermann/consumer.rb b/lib/hermann/consumer.rb index fa32518..d805bd2 100644 --- a/lib/hermann/consumer.rb +++ b/lib/hermann/consumer.rb @@ -38,8 +38,18 @@ module Hermann end end + # Delegates the consume method to internal consumer classes def consume(topic=nil, &block) @internal.consume(topic, &block) end + + # Delegates the shutdown of kafka messages threads to internal consumer classes + def shutdown + if Hermann.jruby? + @internal.shutdown + else + #no op + end + end end end diff --git a/lib/hermann/provider/java_simple_consumer.rb b/lib/hermann/provider/java_simple_consumer.rb index 6c6dc52..9f1447e 100644 --- a/lib/hermann/provider/java_simple_consumer.rb +++ b/lib/hermann/provider/java_simple_consumer.rb @@ -37,6 +37,13 @@ module Hermann @do_retry = opts.delete(:do_retry) || true end + # Shuts down the various threads created by createMessageStreams + # This can be called after the thread executing consume has exited + # to clean up. + def shutdown + @consumer.shutdown + end + # Starts infinite loop to consume messages. hasNext() blocks until a # message is available at which point it is yielded to the block # diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 7cf6754..3b8b3e1 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -37,19 +37,38 @@ describe Hermann::Consumer do it_behaves_like 'an error condition' end end + + describe '#shutdown' do + it 'does nothing' do + expect(consumer.shutdown).to be nil + end + end end context 'on Jruby', :platform => :java do subject(:consumer) { described_class.new(topic, groupId, zookeepers) } - let(:zookeepers) { 'localhost:2181' } + let(:zookeepers) { 'localhost:2181' } let(:groupId) { 'groupId' } let(:do_retry) { true } let(:sleep_time) { 1 } + let(:internal) { double('Hermann::Provider::JavaSimpleConsumer')} + + before do + allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double } + end it 'creates a Hermann::Provider::JavaSimpleConsumer' do - allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double } expect(subject.internal).to be_a(Hermann::Provider::JavaSimpleConsumer) end + + describe '#shutdown' do + let(:result) { double('NoClass') } + it 'calls shutdown' do + consumer.instance_variable_set(:@internal, internal) + allow(internal).to receive(:shutdown) { result } + expect(consumer.shutdown).to eq result + end + end end end diff --git a/spec/integration/producer_spec.rb b/spec/integration/producer_spec.rb new file mode 100644 index 0000000..a6d3e0d --- /dev/null +++ b/spec/integration/producer_spec.rb @@ -0,0 +1,38 @@ +require 'spec_helper' + +require 'hermann/producer' +require 'hermann/consumer' +require 'hermann/discovery/zookeeper' +require 'concurrent' + +class ConsumerTest + def create_consumer + zookeeper = "localhost:2181" + groupId = "group1" + topic = 'test' + + consumer = Hermann::Consumer.new(topic, groupId, zookeeper) + + consumer.consume(topic) do |msg| + if msg == 'msg' + consumer.shutdown + return true + end + end + false + end +end + +describe 'producer' do + include_context 'integration test context' + + let(:message) { 'msg' } + + it 'produces and consumes messages', :type => :integration do + test_consumer = Concurrent::Promise.execute { ConsumerTest.new.create_consumer } + broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers + producer = Hermann::Producer.new(nil, broker_ids) + producer.push(message, :topic => topic).wait(1) + expect(test_consumer.value(1)).to be true + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 75c72c4..33d6ab7 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -15,6 +15,7 @@ RSpec.configure do |c| shared_context 'integration test context', :type => :integration do let(:topic) { $integrationconf['kafka']['topic'] } let(:brokers) { $integrationconf['kafka']['brokers'] } + let(:zookeepers) { $integrationconf['zookeepers'] } end end