mirror of https://github.com/reiseburo/hermann
Refactor out the necessity for Hermann::Timeout and just rely in a Ruby-based busyloop
This commit is contained in:
parent
9014551d10
commit
a0697f5b66
|
@ -52,29 +52,18 @@ module Hermann
|
|||
# @param [FixNum] timeout Seconds to block on the internal reactor
|
||||
# @return [FixNum] Number of +Hermann::Result+ children reaped
|
||||
def tick_reactor(timeout=0)
|
||||
# Handle negative numbers, those can be zero
|
||||
if (timeout < 0)
|
||||
timeout = 0
|
||||
end
|
||||
timeout = rounded_timeout(timeout)
|
||||
|
||||
# Since we're going to sleep for each second, round any potential floats
|
||||
# off
|
||||
if timeout.kind_of?(Float)
|
||||
timeout = timeout.round
|
||||
end
|
||||
|
||||
Hermann::Timeout.timeout(timeout) do
|
||||
if timeout == 0
|
||||
@internal.tick(0)
|
||||
else
|
||||
timeout.times do
|
||||
events = @internal.tick(0)
|
||||
# If we have events, that probably means we have a result
|
||||
break if events > 0
|
||||
# Sleep in Ruby instead of letting librdkafka handle the thread
|
||||
# sleeping
|
||||
sleep 1
|
||||
end
|
||||
if timeout == 0
|
||||
@internal.tick(0)
|
||||
else
|
||||
(timeout * 2).times do
|
||||
# We're going to Thread#sleep in Ruby to avoid a
|
||||
# pthread_cond_timedwait(3) inside of librdkafka
|
||||
events = @internal.tick(0)
|
||||
# If we find events, break out early
|
||||
break if events > 0
|
||||
sleep 0.5
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -89,11 +78,16 @@ module Hermann
|
|||
reaped = total_children - children.size
|
||||
end
|
||||
|
||||
# Creates a new Ruby thread to tick the reactor automatically
|
||||
#
|
||||
# @param [Hermann::Producer] producer
|
||||
# @return [Thread] thread created for ticking the reactor
|
||||
def self.run_reactor_for(producer)
|
||||
|
||||
private
|
||||
|
||||
def rounded_timeout(timeout)
|
||||
# Handle negative numbers, those can be zero
|
||||
return 0 if (timeout < 0)
|
||||
# Since we're going to sleep for each second, round any potential floats
|
||||
# off
|
||||
return timeout.round if timeout.kind_of?(Float)
|
||||
return timeout
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,7 +17,9 @@ message = ('b' * 4096) + 'a'
|
|||
puts ">> Producer#push('#{message}') (#{message.size} bytes)"
|
||||
r = p.push(message)
|
||||
|
||||
puts "#{r}>> #{r.value(5)}"
|
||||
puts Time.now
|
||||
puts "#{r}>> #{r.value(3)}"
|
||||
puts Time.now
|
||||
|
||||
puts "..exiting"
|
||||
|
||||
|
|
|
@ -40,27 +40,7 @@ describe Hermann::Lib::Producer do
|
|||
end
|
||||
|
||||
it 'should return successfully' do
|
||||
expect(result).to eql(0)
|
||||
end
|
||||
|
||||
context 'with a zero timeout' do
|
||||
it 'should not block on #tick' do
|
||||
start = Time.now.to_i
|
||||
expect(result).to eql(0)
|
||||
finish = Time.now.to_i
|
||||
expect(finish - start).to be < 1
|
||||
end
|
||||
end
|
||||
|
||||
context 'with a non-zero timeout' do
|
||||
let(:timeout) { 2000 }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
it 'should block #tick' do
|
||||
start = Time.now.to_i
|
||||
expect(result).to eql(0)
|
||||
finish = Time.now.to_i
|
||||
expect(finish - start).to be > 1
|
||||
end
|
||||
expect(result).not_to be_nil
|
||||
end
|
||||
end
|
||||
##########################################################################
|
||||
|
|
Loading…
Reference in New Issue