diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index 0678e8a..8807413 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -55,6 +55,7 @@ module Hermann else if RUBY_PLATFORM == "java" result = @internal.push_single(value) + @children << result else @internal.push_single(value, result) end diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb index 64d71ca..d69e0b7 100644 --- a/lib/hermann/provider/java_producer.rb +++ b/lib/hermann/provider/java_producer.rb @@ -5,7 +5,7 @@ require 'concurrent' module Hermann module Provider class JavaProducer - attr_accessor :topic, :producer + attr_accessor :topic, :producer, :connected def initialize(topic, brokers) @topic = topic @@ -24,7 +24,7 @@ module Hermann Concurrent::Promise.new { data = ProducerUtil::KeyedMessage.new(@topic, msg) @producer.send(data) - }.rescue { |reason| raise reason } + } end private diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index e17121e..6f86c6d 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -7,132 +7,155 @@ describe Hermann::Producer do let(:topic) { 'rspec' } let(:brokers) { 'localhost:1337' } - describe '#connected?' do - subject { producer.connected? } - context 'by default' do - before :each do - expect(producer.internal).to receive(:connected?).and_call_original - end + context "java" do + if RUBY_PLATFORM == "java" + describe '#create_result' do + subject { producer.create_result } - it { should be false } - end - end + it { should be_instance_of Hermann::Result } - describe '#connect' do - let(:timeout) { 0 } - subject(:connect!) { producer.connect(timeout) } - - it 'should delegate connection to the underlying Producer' do - expect(producer.internal).to receive(:connect).and_call_original - connect! - end - end - - describe '#push' do - subject(:result) { producer.push(value) } - - context 'error conditions' do - shared_examples 'an error condition' do - it 'should raise an exception' do - expect { producer.push('rspec') }.to raise_error(RuntimeError) - end - end - - context 'with a bad broker configuration' do - let(:brokers) { '' } - it_behaves_like 'an error condition' - end - - context 'with a non-existing broker' do - let(:brokers) { 'localhost:13337' } - let(:timeout) { 2 } - let(:value) { 'rspec' } - - it 'should reject' do - future = result - expect(future).not_to be_nil - producer.tick_reactor(timeout) - expect(future).to be_rejected - end - end - - context 'with a bad topic' do - let(:topic) { '' } - it_behaves_like 'an error condition' - end - end - - context 'with a single value' do - let(:value) { 'hello' } - - it 'should invoke #push_single' do - expect(producer.internal).to receive(:push_single) - expect(result).to be_instance_of Hermann::Result - end - end - - context 'with an array value' do - let(:value) { ['hello', 'world'] } - - it 'should invoke #push_single for each element' do - value.each do |v| - expect(producer.internal).to receive(:push_single).with(v, anything) - end - - expect(result).to be_instance_of Array - result.each do |elem| - expect(elem).to be_instance_of Hermann::Result + it 'should add the result to the producers children' do + expect(producer.children).to be_empty + expect(subject).to be_instance_of Hermann::Result + expect(producer.children).to_not be_empty end end end end - describe '#create_result' do - subject { producer.create_result } + context "not java" do + if RUBY_PLATFORM != "java" + describe '#push' do + subject(:result) { producer.push(value) } - it { should be_instance_of Hermann::Result } + context 'error conditions' do + shared_examples 'an error condition' do + it 'should raise an exception' do + expect { producer.push('rspec') }.to raise_error(RuntimeError) + end + end - it 'should add the result to the producers children' do - expect(producer.children).to be_empty - expect(subject).to be_instance_of Hermann::Result - expect(producer.children).to_not be_empty - end - end + context 'with a bad broker configuration' do + let(:brokers) { '' } + it_behaves_like 'an error condition' + end - describe '#tick_reactor' do - let(:timeout) { 0 } - let(:internal) { double('Hermann::Lib::Producer mock') } - subject(:tick) { producer.tick_reactor(timeout) } + context 'with a non-existing broker' do + let(:brokers) { 'localhost:13337' } + let(:timeout) { 2 } + let(:value) { 'rspec' } - before :each do - 3.times do - child = Hermann::Result.new(producer) - allow(child).to receive(:reap?) { reap } - producer.children << child + it 'should reject' do + future = result + expect(future).not_to be_nil + producer.tick_reactor(timeout) + expect(future).to be_rejected + end + end + + context 'with a bad topic' do + let(:topic) { '' } + it_behaves_like 'an error condition' + end + end + + context 'with a single value' do + let(:value) { 'hello' } + + it 'should invoke #push_single' do + expect(producer.internal).to receive(:push_single) + expect(result).to be_instance_of Hermann::Result + end + end + + context 'with an array value' do + let(:value) { ['hello', 'world'] } + + it 'should invoke #push_single for each element' do + value.each do |v| + expect(producer.internal).to receive(:push_single).with(v, anything) + end + + expect(result).to be_instance_of Array + result.each do |elem| + expect(elem).to be_instance_of Hermann::Result + end + end + end end - producer.instance_variable_set(:@internal, internal) - expect(internal).to receive(:tick) - end + describe '#create_result' do + subject { producer.create_result } - context 'with no reapable children' do - let(:reap) { false } + it { should be_instance_of Hermann::Result } - it 'should not reap the children' do - count = producer.children.size - expect(tick).to eql(0) - expect(producer.children.size).to eql(count) + it 'should add the result to the producers children' do + expect(producer.children).to be_empty + expect(subject).to be_instance_of Hermann::Result + expect(producer.children).to_not be_empty + end end - end - context 'with reapable children' do - let(:reap) { true } - it 'should not reap the children' do - count = producer.children.size - expect(tick).to eql(count) - expect(producer.children.size).to_not eql(count) + + describe '#connected?' do + subject { producer.connected? } + context 'by default' do + before :each do + expect(producer.internal).to receive(:connected?).and_call_original + end + + it { should be false } + end + end + + describe '#connect' do + let(:timeout) { 0 } + subject(:connect!) { producer.connect(timeout) } + + it 'should delegate connection to the underlying Producer' do + expect(producer.internal).to receive(:connect).and_call_original + connect! + end + end + + describe '#tick_reactor' do + let(:timeout) { 0 } + let(:internal) { double('Hermann::Lib::Producer mock') } + subject(:tick) { producer.tick_reactor(timeout) } + + before :each do + 3.times do + child = Hermann::Result.new(producer) + allow(child).to receive(:reap?) { reap } + producer.children << child + end + + producer.instance_variable_set(:@internal, internal) + expect(internal).to receive(:tick) + end + + context 'with no reapable children' do + let(:reap) { false } + + it 'should not reap the children' do + count = producer.children.size + expect(tick).to eql(0) + expect(producer.children.size).to eql(count) + end + end + + context 'with reapable children' do + let(:reap) { true } + + it 'should not reap the children' do + count = producer.children.size + expect(tick).to eql(count) + expect(producer.children.size).to_not eql(count) + end + end end end end + end diff --git a/spec/providers/java_producer_spec.rb b/spec/providers/java_producer_spec.rb index 6345c68..1ce660f 100644 --- a/spec/providers/java_producer_spec.rb +++ b/spec/providers/java_producer_spec.rb @@ -1,6 +1,5 @@ require 'spec_helper' require 'hermann/provider/java_producer' -require 'hermann_jars' describe Hermann::Provider::JavaProducer do subject(:producer) { described_class.new(topic, brokers) } @@ -16,7 +15,7 @@ describe Hermann::Provider::JavaProducer do it 'should be rejected' do promise = producer.push_single('rspec').execute.wait(1) expect(promise).to be_rejected - expect(promise.reason).to_not be_nil + expect { promise.value! }.to raise_error end end