mirror of https://github.com/reiseburo/hermann
Refactor the Hermann::Producer Java-based integration test to respect integration.yml
I've also refactored/cleaned it up a bit to make sure it runs properly on my machine
This commit is contained in:
parent
3c9c234c43
commit
7f4de34be0
|
@ -5,39 +5,40 @@ require 'hermann/consumer'
|
|||
require 'hermann/discovery/zookeeper'
|
||||
require 'concurrent'
|
||||
|
||||
describe 'producer' do
|
||||
include_context 'integration test context'
|
||||
|
||||
if Hermann.jruby?
|
||||
|
||||
class ConsumerTest
|
||||
def create_consumer
|
||||
zookeeper = "localhost:2181"
|
||||
groupId = "group1"
|
||||
topic = 'test'
|
||||
|
||||
consumer = Hermann::Consumer.new(topic, groupId, zookeeper)
|
||||
|
||||
consumer.consume(topic) do |msg|
|
||||
if msg == 'msg'
|
||||
let(:timeout) { 10 }
|
||||
let(:message) { 'msg' }
|
||||
let(:consumer) do
|
||||
Hermann::Consumer.new(topic, 'rspec-group', zookeepers)
|
||||
end
|
||||
let(:consumer_promise) do
|
||||
Concurrent::Promise.execute do
|
||||
valid = false
|
||||
puts "consuming off `#{topic}`"
|
||||
consumer.consume(topic) do |dequeued|
|
||||
puts "received the message: #{dequeued}"
|
||||
if message == dequeued
|
||||
consumer.shutdown
|
||||
return true
|
||||
valid = true
|
||||
end
|
||||
end
|
||||
false
|
||||
# Return this out of the block
|
||||
next valid
|
||||
end
|
||||
end
|
||||
|
||||
describe 'producer' do
|
||||
include_context 'integration test context'
|
||||
|
||||
let(:message) { 'msg' }
|
||||
|
||||
it 'produces and consumes messages', :type => :integration, :platform => :java do
|
||||
test_consumer = Concurrent::Promise.execute { ConsumerTest.new.create_consumer }
|
||||
broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers
|
||||
producer = Hermann::Producer.new(nil, broker_ids)
|
||||
producer.push(message, :topic => topic).wait(1)
|
||||
expect(test_consumer.value(1)).to be true
|
||||
end
|
||||
it 'produces and consumes messages', :type => :integration, :platform => :java do
|
||||
broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers
|
||||
puts "using ZK discovered brokers: #{broker_ids}"
|
||||
producer = Hermann::Producer.new(nil, broker_ids)
|
||||
producer.push(message, :topic => topic).value!(timeout)
|
||||
expect(consumer_promise.value!(timeout)).to be true
|
||||
end
|
||||
|
||||
end
|
||||
after :each do
|
||||
# Make sure we shut down our connection in any case
|
||||
consumer.shutdown
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue