mirror of https://github.com/reiseburo/hermann
commit
0526361011
14
README.md
14
README.md
|
@ -101,3 +101,17 @@ producer = Hermann::Producer.new('topic', kafka_broker_ids)
|
|||
promise = producer.push("foo") # returns Concurrent::Promise
|
||||
promise.value!
|
||||
```
|
||||
|
||||
### Integration Testing
|
||||
|
||||
* Download Kafka
|
||||
* Start Zookeeper
|
||||
* set port 2181
|
||||
* Start Kafka
|
||||
* Set properties file ```zookeeper.connect=localhost:2181```
|
||||
* bundle exec jruby -S rspec spec/integration
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ Gem::Specification.new do |s|
|
|||
s.add_dependency "zk", "~> 1.9.4"
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
#IMPORTANT: make sure that jar-dependencies is only a development dependency of your gem. if it is a runtime dependencies the require_jars file will be overwritten during installation.
|
||||
s.add_dependency 'jar-dependencies', '~>0.1.2'
|
||||
s.add_development_dependency 'ruby-maven', '~> 3.1.1.0'
|
||||
s.add_development_dependency 'rake'
|
||||
|
|
|
@ -38,8 +38,18 @@ module Hermann
|
|||
end
|
||||
end
|
||||
|
||||
# Delegates the consume method to internal consumer classes
|
||||
def consume(topic=nil, &block)
|
||||
@internal.consume(topic, &block)
|
||||
end
|
||||
|
||||
# Delegates the shutdown of kafka messages threads to internal consumer classes
|
||||
def shutdown
|
||||
if Hermann.jruby?
|
||||
@internal.shutdown
|
||||
else
|
||||
#no op
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -37,6 +37,13 @@ module Hermann
|
|||
@do_retry = opts.delete(:do_retry) || true
|
||||
end
|
||||
|
||||
# Shuts down the various threads created by createMessageStreams
|
||||
# This can be called after the thread executing consume has exited
|
||||
# to clean up.
|
||||
def shutdown
|
||||
@consumer.shutdown
|
||||
end
|
||||
|
||||
# Starts infinite loop to consume messages. hasNext() blocks until a
|
||||
# message is available at which point it is yielded to the block
|
||||
#
|
||||
|
|
|
@ -37,19 +37,38 @@ describe Hermann::Consumer do
|
|||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
|
||||
describe '#shutdown' do
|
||||
it 'does nothing' do
|
||||
expect(consumer.shutdown).to be nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'on Jruby', :platform => :java do
|
||||
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
|
||||
|
||||
let(:zookeepers) { 'localhost:2181' }
|
||||
let(:zookeepers) { 'localhost:2181' }
|
||||
let(:groupId) { 'groupId' }
|
||||
let(:do_retry) { true }
|
||||
let(:sleep_time) { 1 }
|
||||
let(:internal) { double('Hermann::Provider::JavaSimpleConsumer')}
|
||||
|
||||
before do
|
||||
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
|
||||
end
|
||||
|
||||
it 'creates a Hermann::Provider::JavaSimpleConsumer' do
|
||||
allow(Hermann::ConsumerUtil::Consumer).to receive(:createJavaConsumerConnector).with(any_args) { double }
|
||||
expect(subject.internal).to be_a(Hermann::Provider::JavaSimpleConsumer)
|
||||
end
|
||||
|
||||
describe '#shutdown' do
|
||||
let(:result) { double('NoClass') }
|
||||
it 'calls shutdown' do
|
||||
consumer.instance_variable_set(:@internal, internal)
|
||||
allow(internal).to receive(:shutdown) { result }
|
||||
expect(consumer.shutdown).to eq result
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
require 'spec_helper'
|
||||
|
||||
require 'hermann/producer'
|
||||
require 'hermann/consumer'
|
||||
require 'hermann/discovery/zookeeper'
|
||||
require 'concurrent'
|
||||
|
||||
class ConsumerTest
|
||||
def create_consumer
|
||||
zookeeper = "localhost:2181"
|
||||
groupId = "group1"
|
||||
topic = 'test'
|
||||
|
||||
consumer = Hermann::Consumer.new(topic, groupId, zookeeper)
|
||||
|
||||
consumer.consume(topic) do |msg|
|
||||
if msg == 'msg'
|
||||
consumer.shutdown
|
||||
return true
|
||||
end
|
||||
end
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
describe 'producer' do
|
||||
include_context 'integration test context'
|
||||
|
||||
let(:message) { 'msg' }
|
||||
|
||||
it 'produces and consumes messages', :type => :integration do
|
||||
test_consumer = Concurrent::Promise.execute { ConsumerTest.new.create_consumer }
|
||||
broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers
|
||||
producer = Hermann::Producer.new(nil, broker_ids)
|
||||
producer.push(message, :topic => topic).wait(1)
|
||||
expect(test_consumer.value(1)).to be true
|
||||
end
|
||||
end
|
|
@ -15,6 +15,7 @@ RSpec.configure do |c|
|
|||
shared_context 'integration test context', :type => :integration do
|
||||
let(:topic) { $integrationconf['kafka']['topic'] }
|
||||
let(:brokers) { $integrationconf['kafka']['brokers'] }
|
||||
let(:zookeepers) { $integrationconf['zookeepers'] }
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue