mirror of https://github.com/reiseburo/hermann
updates specs, fixup producer exceptional handling
This commit is contained in:
parent
de03fa4ebc
commit
17b6db78b1
|
@ -55,6 +55,7 @@ module Hermann
|
|||
else
|
||||
if RUBY_PLATFORM == "java"
|
||||
result = @internal.push_single(value)
|
||||
@children << result
|
||||
else
|
||||
@internal.push_single(value, result)
|
||||
end
|
||||
|
|
|
@ -5,7 +5,7 @@ require 'concurrent'
|
|||
module Hermann
|
||||
module Provider
|
||||
class JavaProducer
|
||||
attr_accessor :topic, :producer
|
||||
attr_accessor :topic, :producer, :connected
|
||||
|
||||
def initialize(topic, brokers)
|
||||
@topic = topic
|
||||
|
@ -24,7 +24,7 @@ module Hermann
|
|||
Concurrent::Promise.new {
|
||||
data = ProducerUtil::KeyedMessage.new(@topic, msg)
|
||||
@producer.send(data)
|
||||
}.rescue { |reason| raise reason }
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -7,132 +7,155 @@ describe Hermann::Producer do
|
|||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
|
||||
describe '#connected?' do
|
||||
subject { producer.connected? }
|
||||
context 'by default' do
|
||||
before :each do
|
||||
expect(producer.internal).to receive(:connected?).and_call_original
|
||||
end
|
||||
context "java" do
|
||||
if RUBY_PLATFORM == "java"
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
|
||||
it { should be false }
|
||||
end
|
||||
end
|
||||
it { should be_instance_of Hermann::Result }
|
||||
|
||||
describe '#connect' do
|
||||
let(:timeout) { 0 }
|
||||
subject(:connect!) { producer.connect(timeout) }
|
||||
|
||||
it 'should delegate connection to the underlying Producer' do
|
||||
expect(producer.internal).to receive(:connect).and_call_original
|
||||
connect!
|
||||
end
|
||||
end
|
||||
|
||||
describe '#push' do
|
||||
subject(:result) { producer.push(value) }
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { producer.push('rspec') }.to raise_error(RuntimeError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad broker configuration' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
it 'should reject' do
|
||||
future = result
|
||||
expect(future).not_to be_nil
|
||||
producer.tick_reactor(timeout)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a single value' do
|
||||
let(:value) { 'hello' }
|
||||
|
||||
it 'should invoke #push_single' do
|
||||
expect(producer.internal).to receive(:push_single)
|
||||
expect(result).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
|
||||
context 'with an array value' do
|
||||
let(:value) { ['hello', 'world'] }
|
||||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
result.each do |elem|
|
||||
expect(elem).to be_instance_of Hermann::Result
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
context "not java" do
|
||||
if RUBY_PLATFORM != "java"
|
||||
describe '#push' do
|
||||
subject(:result) { producer.push(value) }
|
||||
|
||||
it { should be_instance_of Hermann::Result }
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { producer.push('rspec') }.to raise_error(RuntimeError)
|
||||
end
|
||||
end
|
||||
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
context 'with a bad broker configuration' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
describe '#tick_reactor' do
|
||||
let(:timeout) { 0 }
|
||||
let(:internal) { double('Hermann::Lib::Producer mock') }
|
||||
subject(:tick) { producer.tick_reactor(timeout) }
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
before :each do
|
||||
3.times do
|
||||
child = Hermann::Result.new(producer)
|
||||
allow(child).to receive(:reap?) { reap }
|
||||
producer.children << child
|
||||
it 'should reject' do
|
||||
future = result
|
||||
expect(future).not_to be_nil
|
||||
producer.tick_reactor(timeout)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a single value' do
|
||||
let(:value) { 'hello' }
|
||||
|
||||
it 'should invoke #push_single' do
|
||||
expect(producer.internal).to receive(:push_single)
|
||||
expect(result).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
|
||||
context 'with an array value' do
|
||||
let(:value) { ['hello', 'world'] }
|
||||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
result.each do |elem|
|
||||
expect(elem).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer.instance_variable_set(:@internal, internal)
|
||||
expect(internal).to receive(:tick)
|
||||
end
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
|
||||
context 'with no reapable children' do
|
||||
let(:reap) { false }
|
||||
it { should be_instance_of Hermann::Result }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(0)
|
||||
expect(producer.children.size).to eql(count)
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'with reapable children' do
|
||||
let(:reap) { true }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(count)
|
||||
expect(producer.children.size).to_not eql(count)
|
||||
|
||||
describe '#connected?' do
|
||||
subject { producer.connected? }
|
||||
context 'by default' do
|
||||
before :each do
|
||||
expect(producer.internal).to receive(:connected?).and_call_original
|
||||
end
|
||||
|
||||
it { should be false }
|
||||
end
|
||||
end
|
||||
|
||||
describe '#connect' do
|
||||
let(:timeout) { 0 }
|
||||
subject(:connect!) { producer.connect(timeout) }
|
||||
|
||||
it 'should delegate connection to the underlying Producer' do
|
||||
expect(producer.internal).to receive(:connect).and_call_original
|
||||
connect!
|
||||
end
|
||||
end
|
||||
|
||||
describe '#tick_reactor' do
|
||||
let(:timeout) { 0 }
|
||||
let(:internal) { double('Hermann::Lib::Producer mock') }
|
||||
subject(:tick) { producer.tick_reactor(timeout) }
|
||||
|
||||
before :each do
|
||||
3.times do
|
||||
child = Hermann::Result.new(producer)
|
||||
allow(child).to receive(:reap?) { reap }
|
||||
producer.children << child
|
||||
end
|
||||
|
||||
producer.instance_variable_set(:@internal, internal)
|
||||
expect(internal).to receive(:tick)
|
||||
end
|
||||
|
||||
context 'with no reapable children' do
|
||||
let(:reap) { false }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(0)
|
||||
expect(producer.children.size).to eql(count)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with reapable children' do
|
||||
let(:reap) { true }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(count)
|
||||
expect(producer.children.size).to_not eql(count)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
require 'spec_helper'
|
||||
require 'hermann/provider/java_producer'
|
||||
require 'hermann_jars'
|
||||
|
||||
describe Hermann::Provider::JavaProducer do
|
||||
subject(:producer) { described_class.new(topic, brokers) }
|
||||
|
@ -16,7 +15,7 @@ describe Hermann::Provider::JavaProducer do
|
|||
it 'should be rejected' do
|
||||
promise = producer.push_single('rspec').execute.wait(1)
|
||||
expect(promise).to be_rejected
|
||||
expect(promise.reason).to_not be_nil
|
||||
expect { promise.value! }.to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue