Tie Hermann::Result#value to the underlying reactor to bring values up to Ruby

This ensures that we're getting async values out of librdkafka into the calling
Ruby thread. Currently errors aren't being brought up properly, but we're getting there

Example:

    [14:47:30] tyler:Hermann git:(issues/11-producer-feedback*) $ pry -I lib -r 'hermann/producer'
    [1] pry(main)> p = Hermann::Producer.new('topic', 'kafka0.REDACTED.com:6667')
    => #<Hermann::Producer:0x00000803b3b450
    @brokers="kafka0.REDACTED.com:6667",
    @children=[],
    @internal=#<Hermann::Lib::Producer:0x00000803b3b3d8>,
    @topic="topic">
    [2] pry(main)> r = p.push('hello world!')
    => #<Hermann::Result:0x00000803b8cb20
    @producer=
    #<Hermann::Producer:0x00000803b3b450
    @brokers="kafka0.REDACTED.com:6667",
    @children=[#<Hermann::Result:0x00000803b8cb20 ...>],
    @internal=#<Hermann::Lib::Producer:0x00000803b3b3d8>,
    @topic="topic">,
    @reason=nil,
    @state=:unfulfilled,
    @value=nil>
    [3] pry(main)> r.state
    => :unfulfilled
    [4] pry(main)> r.value
    ticking rdkafka reactor
    ticked
    => "hello world!"
    [5] pry(main)> r.state
    => :fulfilled
    [6] pry(main)> r.rejected?
    => false
    [7] pry(main)>
    [14:47:56] tyler:Hermann git:(issues/11-producer-feedback*) $

Fixes #11
This commit is contained in:
R. Tyler Croy 2014-09-04 14:29:19 -07:00
parent ed36300265
commit 9bfd7ad2e6
4 changed files with 46 additions and 8 deletions

View File

@ -26,12 +26,14 @@ module Hermann
result = create_result
if value.kind_of? Array
value.each { |element| self.push(element) }
return value.map do |element|
self.push(element)
end
else
@internal.push_single(value, result)
end
return value
return result
end
# Create a +Hermann::Result+ that is tracked in the Producer's children
@ -43,19 +45,27 @@ module Hermann
return @children.last
end
# Tick the underlying librdkafka reacter and clean up any unreaped but
# reapable children results
#
# @param [FixNum] timeout Milliseconds to block on the internal reactor
# @return [NilClass]
def tick_reactor(timeout=0)
# Filter all children who are no longer pending/fulfilled
@children = @children.reject { |c| c.reap? }
puts 'ticking'
# Punt rd_kafka reactor
@internal.tick(0)
puts 'ticked'
@internal.tick(timeout)
return nil
end
# Creates a new Ruby thread to tick the reactor automatically
#
# @param [Hermann::Producer] producer
# @return [Thread] thread created for ticking the reactor
def self.run_reactor_for(producer)
end
end
end

View File

@ -28,6 +28,11 @@ module Hermann
return false
end
def value(timeout=0)
@producer.tick_reactor(timeout)
return @value
end
# INTERNAL METHOD ONLY. Do not use
#
# This method will be invoked by the underlying extension to indicate set
@ -37,10 +42,10 @@ module Hermann
# @param [Boolean] is_error True if the result was errored for whatever
# reason
def internal_set_value(value, is_error)
puts "Hermann::Result#set_internal_value(#{value.class}:\"#{value}\", error?:#{is_error})"
@value = value
if is_error
puts "Hermann::Result#set_internal_value(#{value.class}:\"#{value}\", error?:#{is_error})"
@state = :rejected
else
@state = :fulfilled

View File

@ -33,7 +33,7 @@ describe Hermann::Producer do
it 'should invoke #push_single' do
expect(producer.internal).to receive(:push_single)
expect(result).not_to be_nil
expect(result).to be_instance_of Hermann::Result
end
end
@ -45,7 +45,10 @@ describe Hermann::Producer do
expect(producer.internal).to receive(:push_single).with(v, anything)
end
expect(result).not_to be_nil
expect(result).to be_instance_of Array
result.each do |elem|
expect(elem).to be_instance_of Hermann::Result
end
end
end
end

View File

@ -5,6 +5,26 @@ describe Hermann::Result do
let(:producer) { double('Mock Hermann::Producer') }
subject(:result) { described_class.new(producer) }
describe '#value' do
let(:timeout) { 0 }
subject { result.value(timeout) }
before :each do
# We cannot resolve a value unless we've ticked the reactor at least once
expect(producer).to receive(:tick_reactor).with(timeout)
end
context 'by default' do
it { should be_nil }
end
context 'after a value has been set internally' do
let(:value) { 'rspec-payload-value' }
before(:each) { result.internal_set_value(value, false) }
it { should eql(value) }
end
end
describe '#reap?' do
subject { result.reap? }