Merge pull request #62 from lookout/consume_topic

Allow passing topic into consume method
This commit is contained in:
R. Tyler Croy 2014-10-27 14:04:10 -07:00
commit 1f52bad871
4 changed files with 32 additions and 16 deletions

View File

@ -379,13 +379,12 @@ void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
}
/**
* Hermann::Consumer.consume
*
* Begin listening on the configured topic for messages. msg_consume will be called on each message received.
* Hermann::Lib::Consumer.consume
*
* @param VALUE self the Ruby object for this consumer
* @param VALUE topic the Ruby string representing a topic to consume
*/
static VALUE consumer_consume(VALUE self) {
static VALUE consumer_consume(VALUE self, VALUE topic) {
HermannInstanceConfig* consumerConfig;
@ -1012,7 +1011,7 @@ void Init_hermann_lib() {
rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1);
/* Consumer has method 'consume' */
rb_define_method( c_consumer, "consume", consumer_consume, 0 );
rb_define_method( c_consumer, "consume", consumer_consume, 1);
/* ---- Define the producer class ---- */
c_producer = rb_define_class_under(lib_module, "Producer", rb_cObject);

View File

@ -38,8 +38,8 @@ module Hermann
end
end
def consume(&block)
@internal.consume(&block)
def consume(topic=nil, &block)
@internal.consume(topic, &block)
end
end
end

View File

@ -40,15 +40,17 @@ module Hermann
# Starts infinite loop to consume messages. hasNext() blocks until a
# message is available at which point it is yielded to the block
#
# @params [String] optional topic to override initialized topic
#
# ==== Examples
#
# consumer.consume do |message|
# puts "Received: #{message}"
# end
#
def consume
def consume(topic=nil)
begin
stream = get_stream
stream = get_stream(topic)
it = stream.iterator
while it.hasNext do
yield it.next.message.to_s
@ -74,12 +76,15 @@ module Hermann
# Gets the message stream of the topic. Creates message streams for
# a topic and the number of threads requested. In this case the default
# number of threads is NUM_THREADS.
def get_stream
#
# @params [String] optional topic to override initialized topic
def get_stream(topic)
current_topic = topic || @topic
@topicCountMap = JavaUtil::HashMap.new
@value = NUM_THREADS.to_java Java::int
@topicCountMap.put("#{@topic}", @value)
@topicCountMap.put("#{current_topic}", @value)
consumerMap = @consumer.createMessageStreams(@topicCountMap)
consumerMap[@topic].first
consumerMap[current_topic].first
end
# Creates a ConsumerConfig object

View File

@ -39,13 +39,25 @@ describe Hermann::Provider::JavaSimpleConsumer, :platform => :java do
end
describe '#get_stream' do
subject { consumer.send(:get_stream) }
subject { consumer.send(:get_stream, topic) }
let(:map) { { topic => ['foo'] } }
it 'gets the consumer stream' do
allow(internal_consumer).to receive(:createMessageStreams) { map }
expect(subject).to eq 'foo'
context 'without topic' do
let(:topic) { nil }
it 'gets the consumer stream' do
allow(internal_consumer).to receive(:createMessageStreams) { map }
expect(subject).to eq 'foo'
end
end
context 'with topic' do
let(:topic) { 'topic' }
it 'gets the consumer stream' do
allow(internal_consumer).to receive(:createMessageStreams) { map }
expect(map).to receive(:[]).with(topic) { ['foo'] }
expect(subject).to eq 'foo'
end
end
end