mirror of https://github.com/reiseburo/hermann
Merge pull request #41 from jamescway/add-jruby-producer
Java Producer Provider
This commit is contained in:
commit
a08cfaa155
|
@ -7,3 +7,6 @@ ext/mkmf.log
|
|||
ext/Makefile
|
||||
pkg/
|
||||
tmp/
|
||||
*.jar
|
||||
.ruby-gemset
|
||||
.ruby-version
|
||||
|
|
40
Gemfile.lock
40
Gemfile.lock
|
@ -1,20 +1,40 @@
|
|||
PATH
|
||||
remote: .
|
||||
specs:
|
||||
hermann (0.16.0)
|
||||
mini_portile (~> 0.6.0)
|
||||
hermann (0.17.0)
|
||||
concurrent-ruby
|
||||
jar-dependencies (~> 0.1.2)
|
||||
|
||||
GEM
|
||||
remote: https://rubygems.org/
|
||||
specs:
|
||||
axiom-types (0.1.1)
|
||||
descendants_tracker (~> 0.0.4)
|
||||
ice_nine (~> 0.11.0)
|
||||
thread_safe (~> 0.3, >= 0.3.1)
|
||||
coderay (1.1.0)
|
||||
coercible (1.0.0)
|
||||
descendants_tracker (~> 0.0.1)
|
||||
concurrent-ruby (0.7.0-java)
|
||||
descendants_tracker (0.0.4)
|
||||
thread_safe (~> 0.3, >= 0.3.1)
|
||||
diff-lcs (1.2.5)
|
||||
equalizer (0.0.9)
|
||||
ffi (1.9.5-java)
|
||||
ice_nine (0.11.0)
|
||||
jar-dependencies (0.1.2)
|
||||
maven-tools (1.0.5)
|
||||
virtus (~> 1.0)
|
||||
method_source (0.8.2)
|
||||
mini_portile (0.6.0)
|
||||
pry (0.9.12.6)
|
||||
coderay (~> 1.0)
|
||||
method_source (~> 0.8)
|
||||
slop (~> 3.4)
|
||||
pry (0.9.12.6-java)
|
||||
coderay (~> 1.0)
|
||||
method_source (~> 0.8)
|
||||
slop (~> 3.4)
|
||||
spoon (~> 0.0)
|
||||
rake (10.3.2)
|
||||
rake-compiler (0.9.3)
|
||||
rake
|
||||
|
@ -33,10 +53,23 @@ GEM
|
|||
rspec-mocks (3.0.4)
|
||||
rspec-support (~> 3.0.0)
|
||||
rspec-support (3.0.4)
|
||||
ruby-maven (3.1.1.0.8)
|
||||
maven-tools (~> 1.0.1)
|
||||
ruby-maven-libs (= 3.1.1)
|
||||
ruby-maven-libs (3.1.1)
|
||||
slop (3.5.0)
|
||||
spoon (0.0.4)
|
||||
ffi
|
||||
system_timer (1.2.4)
|
||||
thread_safe (0.3.4-java)
|
||||
virtus (1.0.3)
|
||||
axiom-types (~> 0.1)
|
||||
coercible (~> 1.0)
|
||||
descendants_tracker (~> 0.0, >= 0.0.3)
|
||||
equalizer (~> 0.0, >= 0.0.9)
|
||||
|
||||
PLATFORMS
|
||||
java
|
||||
ruby
|
||||
|
||||
DEPENDENCIES
|
||||
|
@ -46,4 +79,5 @@ DEPENDENCIES
|
|||
rake-compiler
|
||||
rspec (~> 3.0.0)
|
||||
rspec-its
|
||||
ruby-maven (~> 3.1.1.0)
|
||||
system_timer
|
||||
|
|
|
@ -18,12 +18,26 @@ SPEC = Gem::Specification.new do |s|
|
|||
s.files += `git ls-files -- lib`.split($\)
|
||||
s.files += `git ls-files -- ext`.split($\)
|
||||
|
||||
s.extensions = Dir['ext/**/extconf.rb']
|
||||
|
||||
s.require_paths = ["lib", "ext/hermann"]
|
||||
s.rubygems_version = '2.2.2'
|
||||
|
||||
s.add_dependency('mini_portile', '~> 0.6.0')
|
||||
|
||||
s.specification_version = 3 if s.respond_to?(:specification_version)
|
||||
|
||||
s.add_dependency 'concurrent-ruby'
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
s.add_runtime_dependency 'jar-dependencies', '~>0.1.2'
|
||||
s.add_development_dependency 'ruby-maven', '~> 3.1.1.0'
|
||||
s.add_development_dependency 'rake'
|
||||
s.requirements << "jar org.apache.kafka:kafka_2.10, 0.8.1.1"
|
||||
s.requirements << "jar org.mod4j.org.eclipse.xtext:log4j, 1.2.15"
|
||||
s.requirements << "jar org.scala-lang:scala-library, 2.10.1"
|
||||
s.requirements << "jar com.yammer.metrics:metrics-core, 2.2.0"
|
||||
s.requirements << "jar org.slf4j:slf4j-api, 1.7.2"
|
||||
s.requirements << "jar com.101tec:zkclient, 0.3"
|
||||
s.require_paths = ["lib"]
|
||||
else
|
||||
s.add_dependency('mini_portile', '~> 0.6.0')
|
||||
s.extensions = Dir['ext/**/extconf.rb']
|
||||
s.require_paths = ["lib", "ext/hermann"]
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,2 +1,15 @@
|
|||
module Hermann
|
||||
if RUBY_PLATFORM == "java"
|
||||
require 'hermann_jars'
|
||||
|
||||
module JavaUtil
|
||||
include_package 'java.util'
|
||||
end
|
||||
module ProducerUtil
|
||||
include_package 'kafka.producer'
|
||||
end
|
||||
module JavaApiUtil
|
||||
include_package 'kafka.javaapi.producer'
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,12 @@
|
|||
require 'hermann'
|
||||
require 'hermann/result'
|
||||
require 'hermann_lib'
|
||||
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
require 'hermann/provider/java_producer'
|
||||
else
|
||||
require 'hermann_lib'
|
||||
end
|
||||
|
||||
module Hermann
|
||||
class Producer
|
||||
|
@ -9,7 +15,11 @@ module Hermann
|
|||
def initialize(topic, brokers)
|
||||
@topic = topic
|
||||
@brokers = brokers
|
||||
@internal = Hermann::Lib::Producer.new(topic, brokers)
|
||||
if RUBY_PLATFORM == "java"
|
||||
@internal = Hermann::Provider::JavaProducer.new(topic, brokers)
|
||||
else
|
||||
@internal = Hermann::Lib::Producer.new(topic, brokers)
|
||||
end
|
||||
# We're tracking children so we can make sure that at Producer exit we
|
||||
# make a reasonable attempt to clean up outstanding result objects
|
||||
@children = []
|
||||
|
@ -43,7 +53,12 @@ module Hermann
|
|||
if value.kind_of? Array
|
||||
return value.map { |e| self.push(e) }
|
||||
else
|
||||
@internal.push_single(value, result)
|
||||
if RUBY_PLATFORM == "java"
|
||||
result = @internal.push_single(value)
|
||||
@children << result
|
||||
else
|
||||
@internal.push_single(value, result)
|
||||
end
|
||||
end
|
||||
|
||||
return result
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
require 'java'
|
||||
require 'hermann'
|
||||
require 'concurrent'
|
||||
|
||||
module Hermann
|
||||
module Provider
|
||||
|
||||
# Kafka Producer class implemented with jruby and kafka client jar
|
||||
#
|
||||
# == Heading
|
||||
#
|
||||
# This class simulates the kafka producer class within a java environment.
|
||||
# If the producer throw an exception within the Promise a call to +.value!+
|
||||
# will raise the exception and the rejected flag will be set to true
|
||||
#
|
||||
class JavaProducer
|
||||
attr_accessor :topic, :producer, :connected
|
||||
|
||||
def initialize(topic, brokers)
|
||||
@topic = topic
|
||||
properties = create_properties(:brokers => brokers)
|
||||
config = create_config(properties)
|
||||
@producer = JavaApiUtil::Producer.new(config)
|
||||
end
|
||||
|
||||
DEFAULTS = {
|
||||
:string_encoder => 'kafka.serializer.StringEncoder',
|
||||
:partitioner => 'kafka.producer.DefaultPartitioner',
|
||||
:required_acks => "1"
|
||||
}.freeze
|
||||
|
||||
# Push a value onto the Kafka topic passed to this +Producer+
|
||||
#
|
||||
# @param [Object] value A single object to push
|
||||
#
|
||||
# @return +Concurrent::Promise+ Representa a promise to send the
|
||||
# data to the kafka broker. Upon execution the Promise's status
|
||||
# will be set
|
||||
def push_single(msg)
|
||||
Concurrent::Promise.new {
|
||||
data = ProducerUtil::KeyedMessage.new(@topic, msg)
|
||||
@producer.send(data)
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
# @return [ProducerConfig] - packaged config for +Producer+
|
||||
def create_config(properties)
|
||||
ProducerUtil::ProducerConfig.new(properties)
|
||||
end
|
||||
|
||||
# @return [Properties] properties object for creating +ProducerConfig+
|
||||
def create_properties(args={})
|
||||
brokers = args[:brokers]
|
||||
str_encoder = DEFAULTS[:string_encoder]
|
||||
partitioner = DEFAULTS[:partitioner]
|
||||
acks = DEFAULTS[:required_acks]
|
||||
|
||||
properties = JavaUtil::Properties.new
|
||||
properties.put('metadata.broker.list', brokers)
|
||||
properties.put('serializer.class', str_encoder)
|
||||
properties.put('partitioner.class', partitioner)
|
||||
properties.put('request.required.acks', acks)
|
||||
properties
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,3 +1,3 @@
|
|||
module Hermann
|
||||
VERSION = '0.16.0'
|
||||
VERSION = '0.17.0'
|
||||
end
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
# this is a generated file, to avoid over-writing it just delete this comment
|
||||
require 'jar_dependencies'
|
||||
|
||||
require_jar( 'org.slf4j', 'slf4j-api', '1.7.2' )
|
||||
require_jar( 'org.scala-lang', 'scala-library', '2.10.1' )
|
||||
require_jar( 'log4j', 'log4j', '1.2.14' )
|
||||
require_jar( 'com.yammer.metrics', 'metrics-core', '2.2.0' )
|
||||
require_jar( 'org.apache.zookeeper', 'zookeeper', '3.3.4' )
|
||||
require_jar( 'net.sf.jopt-simple', 'jopt-simple', '3.2' )
|
||||
require_jar( 'org.apache.kafka', 'kafka_2.10', '0.8.1.1' )
|
||||
require_jar( 'jline', 'jline', '0.9.94' )
|
||||
require_jar( 'com.101tec', 'zkclient', '0.3' )
|
||||
require_jar( 'org.mod4j.org.eclipse.xtext', 'log4j', '1.2.15' )
|
||||
require_jar( 'junit', 'junit', '3.8.1' )
|
||||
require_jar( 'org.xerial.snappy', 'snappy-java', '1.0.5' )
|
|
@ -7,132 +7,155 @@ describe Hermann::Producer do
|
|||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
|
||||
describe '#connected?' do
|
||||
subject { producer.connected? }
|
||||
context 'by default' do
|
||||
before :each do
|
||||
expect(producer.internal).to receive(:connected?).and_call_original
|
||||
end
|
||||
context "java" do
|
||||
if RUBY_PLATFORM == "java"
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
|
||||
it { should be false }
|
||||
end
|
||||
end
|
||||
it { should be_instance_of Hermann::Result }
|
||||
|
||||
describe '#connect' do
|
||||
let(:timeout) { 0 }
|
||||
subject(:connect!) { producer.connect(timeout) }
|
||||
|
||||
it 'should delegate connection to the underlying Producer' do
|
||||
expect(producer.internal).to receive(:connect).and_call_original
|
||||
connect!
|
||||
end
|
||||
end
|
||||
|
||||
describe '#push' do
|
||||
subject(:result) { producer.push(value) }
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { producer.push('rspec') }.to raise_error(RuntimeError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad broker configuration' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
it 'should reject' do
|
||||
future = result
|
||||
expect(future).not_to be_nil
|
||||
producer.tick_reactor(timeout)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a single value' do
|
||||
let(:value) { 'hello' }
|
||||
|
||||
it 'should invoke #push_single' do
|
||||
expect(producer.internal).to receive(:push_single)
|
||||
expect(result).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
|
||||
context 'with an array value' do
|
||||
let(:value) { ['hello', 'world'] }
|
||||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
result.each do |elem|
|
||||
expect(elem).to be_instance_of Hermann::Result
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
context "not java" do
|
||||
if RUBY_PLATFORM != "java"
|
||||
describe '#push' do
|
||||
subject(:result) { producer.push(value) }
|
||||
|
||||
it { should be_instance_of Hermann::Result }
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { producer.push('rspec') }.to raise_error(RuntimeError)
|
||||
end
|
||||
end
|
||||
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
context 'with a bad broker configuration' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
describe '#tick_reactor' do
|
||||
let(:timeout) { 0 }
|
||||
let(:internal) { double('Hermann::Lib::Producer mock') }
|
||||
subject(:tick) { producer.tick_reactor(timeout) }
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
before :each do
|
||||
3.times do
|
||||
child = Hermann::Result.new(producer)
|
||||
allow(child).to receive(:reap?) { reap }
|
||||
producer.children << child
|
||||
it 'should reject' do
|
||||
future = result
|
||||
expect(future).not_to be_nil
|
||||
producer.tick_reactor(timeout)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a single value' do
|
||||
let(:value) { 'hello' }
|
||||
|
||||
it 'should invoke #push_single' do
|
||||
expect(producer.internal).to receive(:push_single)
|
||||
expect(result).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
|
||||
context 'with an array value' do
|
||||
let(:value) { ['hello', 'world'] }
|
||||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
result.each do |elem|
|
||||
expect(elem).to be_instance_of Hermann::Result
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
producer.instance_variable_set(:@internal, internal)
|
||||
expect(internal).to receive(:tick)
|
||||
end
|
||||
describe '#create_result' do
|
||||
subject { producer.create_result }
|
||||
|
||||
context 'with no reapable children' do
|
||||
let(:reap) { false }
|
||||
it { should be_instance_of Hermann::Result }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(0)
|
||||
expect(producer.children.size).to eql(count)
|
||||
it 'should add the result to the producers children' do
|
||||
expect(producer.children).to be_empty
|
||||
expect(subject).to be_instance_of Hermann::Result
|
||||
expect(producer.children).to_not be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'with reapable children' do
|
||||
let(:reap) { true }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(count)
|
||||
expect(producer.children.size).to_not eql(count)
|
||||
|
||||
describe '#connected?' do
|
||||
subject { producer.connected? }
|
||||
context 'by default' do
|
||||
before :each do
|
||||
expect(producer.internal).to receive(:connected?).and_call_original
|
||||
end
|
||||
|
||||
it { should be false }
|
||||
end
|
||||
end
|
||||
|
||||
describe '#connect' do
|
||||
let(:timeout) { 0 }
|
||||
subject(:connect!) { producer.connect(timeout) }
|
||||
|
||||
it 'should delegate connection to the underlying Producer' do
|
||||
expect(producer.internal).to receive(:connect).and_call_original
|
||||
connect!
|
||||
end
|
||||
end
|
||||
|
||||
describe '#tick_reactor' do
|
||||
let(:timeout) { 0 }
|
||||
let(:internal) { double('Hermann::Lib::Producer mock') }
|
||||
subject(:tick) { producer.tick_reactor(timeout) }
|
||||
|
||||
before :each do
|
||||
3.times do
|
||||
child = Hermann::Result.new(producer)
|
||||
allow(child).to receive(:reap?) { reap }
|
||||
producer.children << child
|
||||
end
|
||||
|
||||
producer.instance_variable_set(:@internal, internal)
|
||||
expect(internal).to receive(:tick)
|
||||
end
|
||||
|
||||
context 'with no reapable children' do
|
||||
let(:reap) { false }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(0)
|
||||
expect(producer.children.size).to eql(count)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with reapable children' do
|
||||
let(:reap) { true }
|
||||
|
||||
it 'should not reap the children' do
|
||||
count = producer.children.size
|
||||
expect(tick).to eql(count)
|
||||
expect(producer.children.size).to_not eql(count)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
require 'spec_helper'
|
||||
require 'hermann/provider/java_producer'
|
||||
|
||||
describe Hermann::Provider::JavaProducer do
|
||||
subject(:producer) { described_class.new(topic, brokers) }
|
||||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
|
||||
describe '#push_single' do
|
||||
subject(:result) { producer.push_single(value) }
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should be rejected' do
|
||||
promise = producer.push_single('rspec').execute.wait(1)
|
||||
expect(promise).to be_rejected
|
||||
expect { promise.value! }.to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad broker configuration' do
|
||||
let(:brokers) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
|
||||
context 'with a non-existing broker' do
|
||||
let(:brokers) { 'localhost:13337' }
|
||||
let(:timeout) { 2 }
|
||||
let(:value) { 'rspec' }
|
||||
|
||||
it 'should reject' do
|
||||
future = result.execute.wait(1)
|
||||
expect(future).to be_rejected
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a bad topic' do
|
||||
let(:topic) { '' }
|
||||
it_behaves_like 'an error condition'
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue