This commit is contained in:
jway 2014-10-06 13:46:00 -07:00
parent 17b6db78b1
commit 06dda845b8
1 changed files with 18 additions and 0 deletions

View File

@ -4,6 +4,15 @@ require 'concurrent'
module Hermann
module Provider
# Kafka Producer class implemented with jruby and kafka client jar
#
# == Heading
#
# This class simulates the kafka producer class within a java environment.
# If the producer throw an exception within the Promise a call to +.value!+
# will raise the exception and the rejected flag will be set to true
#
class JavaProducer
attr_accessor :topic, :producer, :connected
@ -20,6 +29,13 @@ module Hermann
:required_acks => "1"
}.freeze
# Push a value onto the Kafka topic passed to this +Producer+
#
# @param [Object] value A single object to push
#
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg)
Concurrent::Promise.new {
data = ProducerUtil::KeyedMessage.new(@topic, msg)
@ -28,10 +44,12 @@ module Hermann
end
private
# @return [ProducerConfig] - packaged config for +Producer+
def create_config(properties)
ProducerUtil::ProducerConfig.new(properties)
end
# @return [Properties] properties object for creating +ProducerConfig+
def create_properties(args={})
brokers = args[:brokers]
str_encoder = DEFAULTS[:string_encoder]