diff --git a/spec/fixtures/testevent.pb.rb b/spec/fixtures/testevent.pb.rb new file mode 100644 index 0000000..e783023 --- /dev/null +++ b/spec/fixtures/testevent.pb.rb @@ -0,0 +1,35 @@ +## +# This file is auto-generated. DO NOT EDIT! +# +require 'protobuf/message' + +module Hermann + + ## + # Enum Classes + # + class States < ::Protobuf::Enum + define :FULFILLED, 1 + define :UNFULFILLED, 2 + define :PENDING, 3 + define :REJECTED, 4 + end + + + ## + # Message Classes + # + class TestEvent < ::Protobuf::Message; end + + + ## + # Message Fields + # + class TestEvent + required :string, :name, 1 + required ::Hermann::States, :state, 2 + optional :int32, :bogomips, 3 + end + +end + diff --git a/spec/fixtures/testevent.proto b/spec/fixtures/testevent.proto new file mode 100644 index 0000000..1782d95 --- /dev/null +++ b/spec/fixtures/testevent.proto @@ -0,0 +1,16 @@ +package hermann; + +// Generate Ruby stubs with: protoc --ruby_out=. spec/fixtures/testevent.proto + +enum States { + FULFILLED = 1; + UNFULFILLED = 2; + PENDING = 3; + REJECTED = 4; +} + +message TestEvent { + required string name = 1; + required States state = 2; + optional int32 bogomips = 3; +} diff --git a/spec/integration/producer_spec.rb b/spec/integration/producer_spec.rb index e336095..4b927d2 100644 --- a/spec/integration/producer_spec.rb +++ b/spec/integration/producer_spec.rb @@ -5,6 +5,9 @@ require 'hermann/consumer' require 'hermann/discovery/zookeeper' require 'concurrent' +require 'protobuf' +require_relative '../fixtures/testevent.pb' + describe 'producer' do include_context 'integration test context' @@ -15,26 +18,50 @@ describe 'producer' do end let(:consumer_promise) do Concurrent::Promise.execute do - valid = false + value = nil puts "consuming off `#{topic}`" consumer.consume(topic) do |dequeued| - puts "received the message: #{dequeued}" - if message == dequeued - consumer.shutdown - valid = true - end + puts "received the message: #{dequeued.inspect}" + value = dequeued + consumer.shutdown end # Return this out of the block - next valid + next value end end - - it 'produces and consumes messages', :type => :integration, :platform => :java do + let(:brokers) do broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers puts "using ZK discovered brokers: #{broker_ids}" - producer = Hermann::Producer.new(nil, broker_ids) + broker_ids + end + let(:producer) { Hermann::Producer.new(nil, brokers) } + + + + it 'produces and consumes messages', :type => :integration, :platform => :java do producer.push(message, :topic => topic).value!(timeout) - expect(consumer_promise.value!(timeout)).to be true + expect(consumer_promise.value!(timeout)).to eql(message) + end + + + context 'with binary data', :type => :integration, :platform => :java do + let(:event) do + Hermann::TestEvent.new(:name => 'rspec', + :state => 3, + :bogomips => 9001) + end + + let(:message) { event.encode } + + it 'should be a thing' do + producer.push(message, :topic => topic).value!(timeout) + dequeued = consumer_promise.value!(timeout) + expect(dequeued).to eql(message) + + expect { + Hermann::TestEvent.decode(dequeued) + }.not_to raise_error + end end after :each do