From b89870904264cdb1fe56b0c6807d84639ac155b3 Mon Sep 17 00:00:00 2001 From: jway Date: Fri, 3 Oct 2014 11:56:14 -0700 Subject: [PATCH 1/5] first pass adding java kafka provider --- .gitignore | 1 + Gemfile.lock | 13 ++++- hermann.gemspec | 24 +++++++-- lib/hermann/producer.rb | 16 ++++-- lib/hermann/providers/java_producer.rb | 70 ++++++++++++++++++++++++++ lib/hermann/version.rb | 2 +- lib/hermann_jars.rb | 15 ++++++ spec/providers/java_producer_spec.rb | 1 + 8 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 lib/hermann/providers/java_producer.rb create mode 100644 lib/hermann_jars.rb create mode 100644 spec/providers/java_producer_spec.rb diff --git a/.gitignore b/.gitignore index d8b81cf..8c90eae 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ ext/mkmf.log ext/Makefile pkg/ tmp/ +*.jar diff --git a/Gemfile.lock b/Gemfile.lock index ab1266e..36c99a8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,20 +1,28 @@ PATH remote: . specs: - hermann (0.16.0) + hermann (0.17.0) + concurrent-ruby mini_portile (~> 0.6.0) GEM remote: https://rubygems.org/ specs: coderay (1.1.0) + concurrent-ruby (0.7.0) diff-lcs (1.2.5) + ffi (1.9.5-java) method_source (0.8.2) mini_portile (0.6.0) pry (0.9.12.6) coderay (~> 1.0) method_source (~> 0.8) slop (~> 3.4) + pry (0.9.12.6-java) + coderay (~> 1.0) + method_source (~> 0.8) + slop (~> 3.4) + spoon (~> 0.0) rake (10.3.2) rake-compiler (0.9.3) rake @@ -34,9 +42,12 @@ GEM rspec-support (~> 3.0.0) rspec-support (3.0.4) slop (3.5.0) + spoon (0.0.4) + ffi system_timer (1.2.4) PLATFORMS + java ruby DEPENDENCIES diff --git a/hermann.gemspec b/hermann.gemspec index 6314099..24de6f6 100644 --- a/hermann.gemspec +++ b/hermann.gemspec @@ -18,12 +18,26 @@ SPEC = Gem::Specification.new do |s| s.files += `git ls-files -- lib`.split($\) s.files += `git ls-files -- ext`.split($\) - s.extensions = Dir['ext/**/extconf.rb'] - s.require_paths = ["lib", "ext/hermann"] s.rubygems_version = '2.2.2' - - s.add_dependency('mini_portile', '~> 0.6.0') - s.specification_version = 3 if s.respond_to?(:specification_version) + + s.add_dependency 'concurrent-ruby' + + if RUBY_PLATFORM == "java" + s.add_runtime_dependency 'jar-dependencies', '~>0.1.2' + s.add_development_dependency 'ruby-maven', '~> 3.1.1.0' + s.add_development_dependency 'rake' + s.requirements << "jar org.apache.kafka:kafka_2.10, 0.8.1.1" + s.requirements << "jar org.mod4j.org.eclipse.xtext:log4j, 1.2.15" + s.requirements << "jar org.scala-lang:scala-library, 2.10.1" + s.requirements << "jar com.yammer.metrics:metrics-core, 2.2.0" + s.requirements << "jar org.slf4j:slf4j-api, 1.7.2" + s.requirements << "jar com.101tec:zkclient, 0.3" + s.require_paths = ["lib"] + else + s.add_dependency('mini_portile', '~> 0.6.0') + s.extensions = Dir['ext/**/extconf.rb'] + s.require_paths = ["lib", "ext/hermann"] + end end diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index 41a2907..d23b8d6 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -1,6 +1,12 @@ require 'hermann' require 'hermann/result' -require 'hermann_lib' + + +if RUBY_PLATFORM == "java" + require 'hermann/providers/java_producer' +else + require 'hermann_lib' +end module Hermann class Producer @@ -9,7 +15,11 @@ module Hermann def initialize(topic, brokers) @topic = topic @brokers = brokers - @internal = Hermann::Lib::Producer.new(topic, brokers) + if RUBY_PLATFORM == "java" + @internal = Hermann::Providers::JavaProducer.new(topic, brokers) + else + @internal = Hermann::Lib::Producer.new(topic, brokers) + end # We're tracking children so we can make sure that at Producer exit we # make a reasonable attempt to clean up outstanding result objects @children = [] @@ -39,13 +49,11 @@ module Hermann # result from the broker def push(value) result = create_result - if value.kind_of? Array return value.map { |e| self.push(e) } else @internal.push_single(value, result) end - return result end diff --git a/lib/hermann/providers/java_producer.rb b/lib/hermann/providers/java_producer.rb new file mode 100644 index 0000000..36823e2 --- /dev/null +++ b/lib/hermann/providers/java_producer.rb @@ -0,0 +1,70 @@ +require 'java' +require 'hermann' +require 'concurrent' + +module JavaUtil + include_package 'java.util' +end +module ProducerUtil + include_package 'kafka.producer' +end +module JavaApiUtil + include_package 'kafka.javaapi.producer' +end + + +module Hermann + module Providers + class JavaProducer + attr_accessor :topic, :producer + + def initialize(topic, brokers) + @topic = topic + properties = create_properties(brokers: brokers) + config = create_config(properties) + @producer = JavaApiUtil::Producer.new(config) + end + + def defaults + { + string_encoder: 'kafka.serializer.StringEncoder', + partitioner: 'kafka.producer.DefaultPartitioner', + required_acks: "1" + } + end + + def create_config(properties) + ProducerUtil::ProducerConfig.new(properties) + end + + def create_properties(args={}) + brokers = args[:brokers] + str_encoder = defaults[:string_encoder] + partitioner = defaults[:partitioner] + acks = defaults[:required_acks] + + properties = JavaUtil::Properties.new + properties.put('metadata.broker.list', brokers) + properties.put('serializer.class', str_encoder) + properties.put('partitioner.class', partitioner) + properties.put('request.required.acks', acks) + properties + end + + def push(*messages) + messages.flatten.map do |msg| + _push(msg) + end + end + def push_single(msg, result) + _push(msg) + end + + private + def _push(msg) + data = ProducerUtil::KeyedMessage.new(@topic, msg) + @producer.send(data) + end + end + end +end \ No newline at end of file diff --git a/lib/hermann/version.rb b/lib/hermann/version.rb index 673bc01..f90206c 100644 --- a/lib/hermann/version.rb +++ b/lib/hermann/version.rb @@ -1,3 +1,3 @@ module Hermann - VERSION = '0.16.0' + VERSION = '0.17.0' end diff --git a/lib/hermann_jars.rb b/lib/hermann_jars.rb new file mode 100644 index 0000000..ab6132f --- /dev/null +++ b/lib/hermann_jars.rb @@ -0,0 +1,15 @@ +# this is a generated file, to avoid over-writing it just delete this comment +require 'jar_dependencies' + +require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' ) +require_jar( 'org.scala-lang', 'scala-library', '2.10.1' ) +require_jar( 'log4j', 'log4j', '1.2.14' ) +require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' ) +require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' ) +require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' ) +require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' ) +require_jar( 'jline', 'jline', '0.9.94' ) +require_jar( 'com.101tec', 'zkclient', '0.3' ) +require_jar( 'org.mod4j.org.eclipse.xtext', 'log4j', '1.2.15' ) +require_jar( 'junit', 'junit', '3.8.1' ) +require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' ) diff --git a/spec/providers/java_producer_spec.rb b/spec/providers/java_producer_spec.rb new file mode 100644 index 0000000..7aa8404 --- /dev/null +++ b/spec/providers/java_producer_spec.rb @@ -0,0 +1 @@ +java_producer_spec.rb \ No newline at end of file From 07a1dcec8eef10f7b21cd6597330841bd4ac4578 Mon Sep 17 00:00:00 2001 From: jway Date: Mon, 6 Oct 2014 08:08:30 -0700 Subject: [PATCH 2/5] add specs for java_producer --- .gitignore | 2 + Gemfile.lock | 29 +++++++++-- lib/hermann.rb | 11 ++++ lib/hermann/producer.rb | 12 +++-- lib/hermann/provider/java_producer.rb | 50 ++++++++++++++++++ lib/hermann/providers/java_producer.rb | 70 -------------------------- spec/providers/java_producer_spec.rb | 46 ++++++++++++++++- 7 files changed, 143 insertions(+), 77 deletions(-) create mode 100644 lib/hermann/provider/java_producer.rb delete mode 100644 lib/hermann/providers/java_producer.rb diff --git a/.gitignore b/.gitignore index 8c90eae..0045b5a 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ ext/Makefile pkg/ tmp/ *.jar +.ruby-gemset +.ruby-version diff --git a/Gemfile.lock b/Gemfile.lock index 36c99a8..fad3c97 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,17 +3,29 @@ PATH specs: hermann (0.17.0) concurrent-ruby - mini_portile (~> 0.6.0) + jar-dependencies (~> 0.1.2) GEM remote: https://rubygems.org/ specs: + axiom-types (0.1.1) + descendants_tracker (~> 0.0.4) + ice_nine (~> 0.11.0) + thread_safe (~> 0.3, >= 0.3.1) coderay (1.1.0) - concurrent-ruby (0.7.0) + coercible (1.0.0) + descendants_tracker (~> 0.0.1) + concurrent-ruby (0.7.0-java) + descendants_tracker (0.0.4) + thread_safe (~> 0.3, >= 0.3.1) diff-lcs (1.2.5) + equalizer (0.0.9) ffi (1.9.5-java) + ice_nine (0.11.0) + jar-dependencies (0.1.2) + maven-tools (1.0.5) + virtus (~> 1.0) method_source (0.8.2) - mini_portile (0.6.0) pry (0.9.12.6) coderay (~> 1.0) method_source (~> 0.8) @@ -41,10 +53,20 @@ GEM rspec-mocks (3.0.4) rspec-support (~> 3.0.0) rspec-support (3.0.4) + ruby-maven (3.1.1.0.8) + maven-tools (~> 1.0.1) + ruby-maven-libs (= 3.1.1) + ruby-maven-libs (3.1.1) slop (3.5.0) spoon (0.0.4) ffi system_timer (1.2.4) + thread_safe (0.3.4-java) + virtus (1.0.3) + axiom-types (~> 0.1) + coercible (~> 1.0) + descendants_tracker (~> 0.0, >= 0.0.3) + equalizer (~> 0.0, >= 0.0.9) PLATFORMS java @@ -57,4 +79,5 @@ DEPENDENCIES rake-compiler rspec (~> 3.0.0) rspec-its + ruby-maven (~> 3.1.1.0) system_timer diff --git a/lib/hermann.rb b/lib/hermann.rb index f201414..77d7f9c 100644 --- a/lib/hermann.rb +++ b/lib/hermann.rb @@ -1,2 +1,13 @@ module Hermann + if RUBY_PLATFORM == "java" + module JavaUtil + include_package 'java.util' + end + module ProducerUtil + include_package 'kafka.producer' + end + module JavaApiUtil + include_package 'kafka.javaapi.producer' + end + end end diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index d23b8d6..0678e8a 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -3,7 +3,7 @@ require 'hermann/result' if RUBY_PLATFORM == "java" - require 'hermann/providers/java_producer' + require 'hermann/provider/java_producer' else require 'hermann_lib' end @@ -16,7 +16,7 @@ module Hermann @topic = topic @brokers = brokers if RUBY_PLATFORM == "java" - @internal = Hermann::Providers::JavaProducer.new(topic, brokers) + @internal = Hermann::Provider::JavaProducer.new(topic, brokers) else @internal = Hermann::Lib::Producer.new(topic, brokers) end @@ -49,11 +49,17 @@ module Hermann # result from the broker def push(value) result = create_result + if value.kind_of? Array return value.map { |e| self.push(e) } else - @internal.push_single(value, result) + if RUBY_PLATFORM == "java" + result = @internal.push_single(value) + else + @internal.push_single(value, result) + end end + return result end diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb new file mode 100644 index 0000000..64d71ca --- /dev/null +++ b/lib/hermann/provider/java_producer.rb @@ -0,0 +1,50 @@ +require 'java' +require 'hermann' +require 'concurrent' + +module Hermann + module Provider + class JavaProducer + attr_accessor :topic, :producer + + def initialize(topic, brokers) + @topic = topic + properties = create_properties(:brokers => brokers) + config = create_config(properties) + @producer = JavaApiUtil::Producer.new(config) + end + + DEFAULTS = { + :string_encoder => 'kafka.serializer.StringEncoder', + :partitioner => 'kafka.producer.DefaultPartitioner', + :required_acks => "1" + }.freeze + + def push_single(msg) + Concurrent::Promise.new { + data = ProducerUtil::KeyedMessage.new(@topic, msg) + @producer.send(data) + }.rescue { |reason| raise reason } + end + + private + def create_config(properties) + ProducerUtil::ProducerConfig.new(properties) + end + + def create_properties(args={}) + brokers = args[:brokers] + str_encoder = DEFAULTS[:string_encoder] + partitioner = DEFAULTS[:partitioner] + acks = DEFAULTS[:required_acks] + + properties = JavaUtil::Properties.new + properties.put('metadata.broker.list', brokers) + properties.put('serializer.class', str_encoder) + properties.put('partitioner.class', partitioner) + properties.put('request.required.acks', acks) + properties + end + end + end +end \ No newline at end of file diff --git a/lib/hermann/providers/java_producer.rb b/lib/hermann/providers/java_producer.rb deleted file mode 100644 index 36823e2..0000000 --- a/lib/hermann/providers/java_producer.rb +++ /dev/null @@ -1,70 +0,0 @@ -require 'java' -require 'hermann' -require 'concurrent' - -module JavaUtil - include_package 'java.util' -end -module ProducerUtil - include_package 'kafka.producer' -end -module JavaApiUtil - include_package 'kafka.javaapi.producer' -end - - -module Hermann - module Providers - class JavaProducer - attr_accessor :topic, :producer - - def initialize(topic, brokers) - @topic = topic - properties = create_properties(brokers: brokers) - config = create_config(properties) - @producer = JavaApiUtil::Producer.new(config) - end - - def defaults - { - string_encoder: 'kafka.serializer.StringEncoder', - partitioner: 'kafka.producer.DefaultPartitioner', - required_acks: "1" - } - end - - def create_config(properties) - ProducerUtil::ProducerConfig.new(properties) - end - - def create_properties(args={}) - brokers = args[:brokers] - str_encoder = defaults[:string_encoder] - partitioner = defaults[:partitioner] - acks = defaults[:required_acks] - - properties = JavaUtil::Properties.new - properties.put('metadata.broker.list', brokers) - properties.put('serializer.class', str_encoder) - properties.put('partitioner.class', partitioner) - properties.put('request.required.acks', acks) - properties - end - - def push(*messages) - messages.flatten.map do |msg| - _push(msg) - end - end - def push_single(msg, result) - _push(msg) - end - - private - def _push(msg) - data = ProducerUtil::KeyedMessage.new(@topic, msg) - @producer.send(data) - end - end - end -end \ No newline at end of file diff --git a/spec/providers/java_producer_spec.rb b/spec/providers/java_producer_spec.rb index 7aa8404..6345c68 100644 --- a/spec/providers/java_producer_spec.rb +++ b/spec/providers/java_producer_spec.rb @@ -1 +1,45 @@ -java_producer_spec.rb \ No newline at end of file +require 'spec_helper' +require 'hermann/provider/java_producer' +require 'hermann_jars' + +describe Hermann::Provider::JavaProducer do + subject(:producer) { described_class.new(topic, brokers) } + + let(:topic) { 'rspec' } + let(:brokers) { 'localhost:1337' } + + describe '#push_single' do + subject(:result) { producer.push_single(value) } + + context 'error conditions' do + shared_examples 'an error condition' do + it 'should be rejected' do + promise = producer.push_single('rspec').execute.wait(1) + expect(promise).to be_rejected + expect(promise.reason).to_not be_nil + end + end + + context 'with a bad broker configuration' do + let(:brokers) { '' } + it_behaves_like 'an error condition' + end + + context 'with a non-existing broker' do + let(:brokers) { 'localhost:13337' } + let(:timeout) { 2 } + let(:value) { 'rspec' } + + it 'should reject' do + future = result.execute.wait(1) + expect(future).to be_rejected + end + end + + context 'with a bad topic' do + let(:topic) { '' } + it_behaves_like 'an error condition' + end + end + end +end From de03fa4ebc2c0e52183cb51b8866f3a7b3c031a2 Mon Sep 17 00:00:00 2001 From: jway Date: Mon, 6 Oct 2014 09:33:35 -0700 Subject: [PATCH 3/5] add require jars to top level module --- lib/hermann.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/hermann.rb b/lib/hermann.rb index 77d7f9c..1fd0ca7 100644 --- a/lib/hermann.rb +++ b/lib/hermann.rb @@ -1,5 +1,7 @@ module Hermann if RUBY_PLATFORM == "java" + require 'hermann_jars' + module JavaUtil include_package 'java.util' end From 17b6db78b19f4032ed218d2cfbfe138e232fed5b Mon Sep 17 00:00:00 2001 From: jway Date: Mon, 6 Oct 2014 11:23:32 -0700 Subject: [PATCH 4/5] updates specs, fixup producer exceptional handling --- lib/hermann/producer.rb | 1 + lib/hermann/provider/java_producer.rb | 4 +- spec/producer_spec.rb | 237 ++++++++++++++------------ spec/providers/java_producer_spec.rb | 3 +- 4 files changed, 134 insertions(+), 111 deletions(-) diff --git a/lib/hermann/producer.rb b/lib/hermann/producer.rb index 0678e8a..8807413 100644 --- a/lib/hermann/producer.rb +++ b/lib/hermann/producer.rb @@ -55,6 +55,7 @@ module Hermann else if RUBY_PLATFORM == "java" result = @internal.push_single(value) + @children << result else @internal.push_single(value, result) end diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb index 64d71ca..d69e0b7 100644 --- a/lib/hermann/provider/java_producer.rb +++ b/lib/hermann/provider/java_producer.rb @@ -5,7 +5,7 @@ require 'concurrent' module Hermann module Provider class JavaProducer - attr_accessor :topic, :producer + attr_accessor :topic, :producer, :connected def initialize(topic, brokers) @topic = topic @@ -24,7 +24,7 @@ module Hermann Concurrent::Promise.new { data = ProducerUtil::KeyedMessage.new(@topic, msg) @producer.send(data) - }.rescue { |reason| raise reason } + } end private diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index e17121e..6f86c6d 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -7,132 +7,155 @@ describe Hermann::Producer do let(:topic) { 'rspec' } let(:brokers) { 'localhost:1337' } - describe '#connected?' do - subject { producer.connected? } - context 'by default' do - before :each do - expect(producer.internal).to receive(:connected?).and_call_original - end + context "java" do + if RUBY_PLATFORM == "java" + describe '#create_result' do + subject { producer.create_result } - it { should be false } - end - end + it { should be_instance_of Hermann::Result } - describe '#connect' do - let(:timeout) { 0 } - subject(:connect!) { producer.connect(timeout) } - - it 'should delegate connection to the underlying Producer' do - expect(producer.internal).to receive(:connect).and_call_original - connect! - end - end - - describe '#push' do - subject(:result) { producer.push(value) } - - context 'error conditions' do - shared_examples 'an error condition' do - it 'should raise an exception' do - expect { producer.push('rspec') }.to raise_error(RuntimeError) - end - end - - context 'with a bad broker configuration' do - let(:brokers) { '' } - it_behaves_like 'an error condition' - end - - context 'with a non-existing broker' do - let(:brokers) { 'localhost:13337' } - let(:timeout) { 2 } - let(:value) { 'rspec' } - - it 'should reject' do - future = result - expect(future).not_to be_nil - producer.tick_reactor(timeout) - expect(future).to be_rejected - end - end - - context 'with a bad topic' do - let(:topic) { '' } - it_behaves_like 'an error condition' - end - end - - context 'with a single value' do - let(:value) { 'hello' } - - it 'should invoke #push_single' do - expect(producer.internal).to receive(:push_single) - expect(result).to be_instance_of Hermann::Result - end - end - - context 'with an array value' do - let(:value) { ['hello', 'world'] } - - it 'should invoke #push_single for each element' do - value.each do |v| - expect(producer.internal).to receive(:push_single).with(v, anything) - end - - expect(result).to be_instance_of Array - result.each do |elem| - expect(elem).to be_instance_of Hermann::Result + it 'should add the result to the producers children' do + expect(producer.children).to be_empty + expect(subject).to be_instance_of Hermann::Result + expect(producer.children).to_not be_empty end end end end - describe '#create_result' do - subject { producer.create_result } + context "not java" do + if RUBY_PLATFORM != "java" + describe '#push' do + subject(:result) { producer.push(value) } - it { should be_instance_of Hermann::Result } + context 'error conditions' do + shared_examples 'an error condition' do + it 'should raise an exception' do + expect { producer.push('rspec') }.to raise_error(RuntimeError) + end + end - it 'should add the result to the producers children' do - expect(producer.children).to be_empty - expect(subject).to be_instance_of Hermann::Result - expect(producer.children).to_not be_empty - end - end + context 'with a bad broker configuration' do + let(:brokers) { '' } + it_behaves_like 'an error condition' + end - describe '#tick_reactor' do - let(:timeout) { 0 } - let(:internal) { double('Hermann::Lib::Producer mock') } - subject(:tick) { producer.tick_reactor(timeout) } + context 'with a non-existing broker' do + let(:brokers) { 'localhost:13337' } + let(:timeout) { 2 } + let(:value) { 'rspec' } - before :each do - 3.times do - child = Hermann::Result.new(producer) - allow(child).to receive(:reap?) { reap } - producer.children << child + it 'should reject' do + future = result + expect(future).not_to be_nil + producer.tick_reactor(timeout) + expect(future).to be_rejected + end + end + + context 'with a bad topic' do + let(:topic) { '' } + it_behaves_like 'an error condition' + end + end + + context 'with a single value' do + let(:value) { 'hello' } + + it 'should invoke #push_single' do + expect(producer.internal).to receive(:push_single) + expect(result).to be_instance_of Hermann::Result + end + end + + context 'with an array value' do + let(:value) { ['hello', 'world'] } + + it 'should invoke #push_single for each element' do + value.each do |v| + expect(producer.internal).to receive(:push_single).with(v, anything) + end + + expect(result).to be_instance_of Array + result.each do |elem| + expect(elem).to be_instance_of Hermann::Result + end + end + end end - producer.instance_variable_set(:@internal, internal) - expect(internal).to receive(:tick) - end + describe '#create_result' do + subject { producer.create_result } - context 'with no reapable children' do - let(:reap) { false } + it { should be_instance_of Hermann::Result } - it 'should not reap the children' do - count = producer.children.size - expect(tick).to eql(0) - expect(producer.children.size).to eql(count) + it 'should add the result to the producers children' do + expect(producer.children).to be_empty + expect(subject).to be_instance_of Hermann::Result + expect(producer.children).to_not be_empty + end end - end - context 'with reapable children' do - let(:reap) { true } - it 'should not reap the children' do - count = producer.children.size - expect(tick).to eql(count) - expect(producer.children.size).to_not eql(count) + + describe '#connected?' do + subject { producer.connected? } + context 'by default' do + before :each do + expect(producer.internal).to receive(:connected?).and_call_original + end + + it { should be false } + end + end + + describe '#connect' do + let(:timeout) { 0 } + subject(:connect!) { producer.connect(timeout) } + + it 'should delegate connection to the underlying Producer' do + expect(producer.internal).to receive(:connect).and_call_original + connect! + end + end + + describe '#tick_reactor' do + let(:timeout) { 0 } + let(:internal) { double('Hermann::Lib::Producer mock') } + subject(:tick) { producer.tick_reactor(timeout) } + + before :each do + 3.times do + child = Hermann::Result.new(producer) + allow(child).to receive(:reap?) { reap } + producer.children << child + end + + producer.instance_variable_set(:@internal, internal) + expect(internal).to receive(:tick) + end + + context 'with no reapable children' do + let(:reap) { false } + + it 'should not reap the children' do + count = producer.children.size + expect(tick).to eql(0) + expect(producer.children.size).to eql(count) + end + end + + context 'with reapable children' do + let(:reap) { true } + + it 'should not reap the children' do + count = producer.children.size + expect(tick).to eql(count) + expect(producer.children.size).to_not eql(count) + end + end end end end + end diff --git a/spec/providers/java_producer_spec.rb b/spec/providers/java_producer_spec.rb index 6345c68..1ce660f 100644 --- a/spec/providers/java_producer_spec.rb +++ b/spec/providers/java_producer_spec.rb @@ -1,6 +1,5 @@ require 'spec_helper' require 'hermann/provider/java_producer' -require 'hermann_jars' describe Hermann::Provider::JavaProducer do subject(:producer) { described_class.new(topic, brokers) } @@ -16,7 +15,7 @@ describe Hermann::Provider::JavaProducer do it 'should be rejected' do promise = producer.push_single('rspec').execute.wait(1) expect(promise).to be_rejected - expect(promise.reason).to_not be_nil + expect { promise.value! }.to raise_error end end From 06dda845b836a08f442b7306c1c50a76b55abb55 Mon Sep 17 00:00:00 2001 From: jway Date: Mon, 6 Oct 2014 13:46:00 -0700 Subject: [PATCH 5/5] add rdoc --- lib/hermann/provider/java_producer.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb index d69e0b7..9a1ace1 100644 --- a/lib/hermann/provider/java_producer.rb +++ b/lib/hermann/provider/java_producer.rb @@ -4,6 +4,15 @@ require 'concurrent' module Hermann module Provider + + # Kafka Producer class implemented with jruby and kafka client jar + # + # == Heading + # + # This class simulates the kafka producer class within a java environment. + # If the producer throw an exception within the Promise a call to +.value!+ + # will raise the exception and the rejected flag will be set to true + # class JavaProducer attr_accessor :topic, :producer, :connected @@ -20,6 +29,13 @@ module Hermann :required_acks => "1" }.freeze + # Push a value onto the Kafka topic passed to this +Producer+ + # + # @param [Object] value A single object to push + # + # @return +Concurrent::Promise+ Representa a promise to send the + # data to the kafka broker. Upon execution the Promise's status + # will be set def push_single(msg) Concurrent::Promise.new { data = ProducerUtil::KeyedMessage.new(@topic, msg) @@ -28,10 +44,12 @@ module Hermann end private + # @return [ProducerConfig] - packaged config for +Producer+ def create_config(properties) ProducerUtil::ProducerConfig.new(properties) end + # @return [Properties] properties object for creating +ProducerConfig+ def create_properties(args={}) brokers = args[:brokers] str_encoder = DEFAULTS[:string_encoder]