add specs for java_producer

This commit is contained in:
jway 2014-10-06 08:08:30 -07:00
parent b898709042
commit 07a1dcec8e
7 changed files with 143 additions and 77 deletions

2
.gitignore vendored
View File

@ -8,3 +8,5 @@ ext/Makefile
pkg/
tmp/
*.jar
.ruby-gemset
.ruby-version

View File

@ -3,17 +3,29 @@ PATH
specs:
hermann (0.17.0)
concurrent-ruby
mini_portile (~> 0.6.0)
jar-dependencies (~> 0.1.2)
GEM
remote: https://rubygems.org/
specs:
axiom-types (0.1.1)
descendants_tracker (~> 0.0.4)
ice_nine (~> 0.11.0)
thread_safe (~> 0.3, >= 0.3.1)
coderay (1.1.0)
concurrent-ruby (0.7.0)
coercible (1.0.0)
descendants_tracker (~> 0.0.1)
concurrent-ruby (0.7.0-java)
descendants_tracker (0.0.4)
thread_safe (~> 0.3, >= 0.3.1)
diff-lcs (1.2.5)
equalizer (0.0.9)
ffi (1.9.5-java)
ice_nine (0.11.0)
jar-dependencies (0.1.2)
maven-tools (1.0.5)
virtus (~> 1.0)
method_source (0.8.2)
mini_portile (0.6.0)
pry (0.9.12.6)
coderay (~> 1.0)
method_source (~> 0.8)
@ -41,10 +53,20 @@ GEM
rspec-mocks (3.0.4)
rspec-support (~> 3.0.0)
rspec-support (3.0.4)
ruby-maven (3.1.1.0.8)
maven-tools (~> 1.0.1)
ruby-maven-libs (= 3.1.1)
ruby-maven-libs (3.1.1)
slop (3.5.0)
spoon (0.0.4)
ffi
system_timer (1.2.4)
thread_safe (0.3.4-java)
virtus (1.0.3)
axiom-types (~> 0.1)
coercible (~> 1.0)
descendants_tracker (~> 0.0, >= 0.0.3)
equalizer (~> 0.0, >= 0.0.9)
PLATFORMS
java
@ -57,4 +79,5 @@ DEPENDENCIES
rake-compiler
rspec (~> 3.0.0)
rspec-its
ruby-maven (~> 3.1.1.0)
system_timer

View File

@ -1,2 +1,13 @@
module Hermann
if RUBY_PLATFORM == "java"
module JavaUtil
include_package 'java.util'
end
module ProducerUtil
include_package 'kafka.producer'
end
module JavaApiUtil
include_package 'kafka.javaapi.producer'
end
end
end

View File

@ -3,7 +3,7 @@ require 'hermann/result'
if RUBY_PLATFORM == "java"
require 'hermann/providers/java_producer'
require 'hermann/provider/java_producer'
else
require 'hermann_lib'
end
@ -16,7 +16,7 @@ module Hermann
@topic = topic
@brokers = brokers
if RUBY_PLATFORM == "java"
@internal = Hermann::Providers::JavaProducer.new(topic, brokers)
@internal = Hermann::Provider::JavaProducer.new(topic, brokers)
else
@internal = Hermann::Lib::Producer.new(topic, brokers)
end
@ -49,11 +49,17 @@ module Hermann
# result from the broker
def push(value)
result = create_result
if value.kind_of? Array
return value.map { |e| self.push(e) }
else
@internal.push_single(value, result)
if RUBY_PLATFORM == "java"
result = @internal.push_single(value)
else
@internal.push_single(value, result)
end
end
return result
end

View File

@ -0,0 +1,50 @@
require 'java'
require 'hermann'
require 'concurrent'
module Hermann
module Provider
class JavaProducer
attr_accessor :topic, :producer
def initialize(topic, brokers)
@topic = topic
properties = create_properties(:brokers => brokers)
config = create_config(properties)
@producer = JavaApiUtil::Producer.new(config)
end
DEFAULTS = {
:string_encoder => 'kafka.serializer.StringEncoder',
:partitioner => 'kafka.producer.DefaultPartitioner',
:required_acks => "1"
}.freeze
def push_single(msg)
Concurrent::Promise.new {
data = ProducerUtil::KeyedMessage.new(@topic, msg)
@producer.send(data)
}.rescue { |reason| raise reason }
end
private
def create_config(properties)
ProducerUtil::ProducerConfig.new(properties)
end
def create_properties(args={})
brokers = args[:brokers]
str_encoder = DEFAULTS[:string_encoder]
partitioner = DEFAULTS[:partitioner]
acks = DEFAULTS[:required_acks]
properties = JavaUtil::Properties.new
properties.put('metadata.broker.list', brokers)
properties.put('serializer.class', str_encoder)
properties.put('partitioner.class', partitioner)
properties.put('request.required.acks', acks)
properties
end
end
end
end

View File

@ -1,70 +0,0 @@
require 'java'
require 'hermann'
require 'concurrent'
module JavaUtil
include_package 'java.util'
end
module ProducerUtil
include_package 'kafka.producer'
end
module JavaApiUtil
include_package 'kafka.javaapi.producer'
end
module Hermann
module Providers
class JavaProducer
attr_accessor :topic, :producer
def initialize(topic, brokers)
@topic = topic
properties = create_properties(brokers: brokers)
config = create_config(properties)
@producer = JavaApiUtil::Producer.new(config)
end
def defaults
{
string_encoder: 'kafka.serializer.StringEncoder',
partitioner: 'kafka.producer.DefaultPartitioner',
required_acks: "1"
}
end
def create_config(properties)
ProducerUtil::ProducerConfig.new(properties)
end
def create_properties(args={})
brokers = args[:brokers]
str_encoder = defaults[:string_encoder]
partitioner = defaults[:partitioner]
acks = defaults[:required_acks]
properties = JavaUtil::Properties.new
properties.put('metadata.broker.list', brokers)
properties.put('serializer.class', str_encoder)
properties.put('partitioner.class', partitioner)
properties.put('request.required.acks', acks)
properties
end
def push(*messages)
messages.flatten.map do |msg|
_push(msg)
end
end
def push_single(msg, result)
_push(msg)
end
private
def _push(msg)
data = ProducerUtil::KeyedMessage.new(@topic, msg)
@producer.send(data)
end
end
end
end

View File

@ -1 +1,45 @@
java_producer_spec.rb
require 'spec_helper'
require 'hermann/provider/java_producer'
require 'hermann_jars'
describe Hermann::Provider::JavaProducer do
subject(:producer) { described_class.new(topic, brokers) }
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
describe '#push_single' do
subject(:result) { producer.push_single(value) }
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec').execute.wait(1)
expect(promise).to be_rejected
expect(promise.reason).to_not be_nil
end
end
context 'with a bad broker configuration' do
let(:brokers) { '' }
it_behaves_like 'an error condition'
end
context 'with a non-existing broker' do
let(:brokers) { 'localhost:13337' }
let(:timeout) { 2 }
let(:value) { 'rspec' }
it 'should reject' do
future = result.execute.wait(1)
expect(future).to be_rejected
end
end
context 'with a bad topic' do
let(:topic) { '' }
it_behaves_like 'an error condition'
end
end
end
end