From fad37247d3e714feabca3560638e4c05a3f5021e Mon Sep 17 00:00:00 2001 From: jway Date: Thu, 30 Oct 2014 17:59:34 -0700 Subject: [PATCH] exploratory work --- lib/hermann/provider/java_producer.rb | 12 ++++++---- lib/hermann/provider/java_simple_consumer.rb | 2 +- spec/integration/producer_spec.rb | 24 ++++++++++++-------- spec/spec_helper.rb | 1 + 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/lib/hermann/provider/java_producer.rb b/lib/hermann/provider/java_producer.rb index 680eeac..2a6ec09 100644 --- a/lib/hermann/provider/java_producer.rb +++ b/lib/hermann/provider/java_producer.rb @@ -46,12 +46,16 @@ module Hermann # will be set def push_single(msg, topic, unused) Concurrent::Promise.execute { - data = ProducerUtil::KeyedMessage.new(topic, msg) begin + data = ProducerUtil::KeyedMessage.new(topic, msg) + # begin @producer.send(data) - rescue Java::KafkaCommon::FailedToSendMessageException => jexc - raise Hermann::Errors::ConnectivityError.new(jexc.message, - :java_exception => jexc) + rescue Exception => e + # rescue Java::KafkaCommon::FailedToSendMessageException => jexc + # raise Hermann::Errors::ConnectivityError.new(jexc.message, + # :java_exception => jexc) + puts "............#{e.message}" + puts e.backtrace.join("\n") end } end diff --git a/lib/hermann/provider/java_simple_consumer.rb b/lib/hermann/provider/java_simple_consumer.rb index 9f1447e..a9cd181 100644 --- a/lib/hermann/provider/java_simple_consumer.rb +++ b/lib/hermann/provider/java_simple_consumer.rb @@ -60,7 +60,7 @@ module Hermann stream = get_stream(topic) it = stream.iterator while it.hasNext do - yield it.next.message.to_s + yield it.next end rescue Exception => e puts "#{self.class.name}#consume exception: #{e.class.name}" diff --git a/spec/integration/producer_spec.rb b/spec/integration/producer_spec.rb index 4b927d2..22f909e 100644 --- a/spec/integration/producer_spec.rb +++ b/spec/integration/producer_spec.rb @@ -12,7 +12,7 @@ describe 'producer' do include_context 'integration test context' let(:timeout) { 10 } - let(:message) { 'msg' } + let(:message) { 'msg' } let(:consumer) do Hermann::Consumer.new(topic, 'rspec-group', zookeepers) end @@ -22,33 +22,37 @@ describe 'producer' do puts "consuming off `#{topic}`" consumer.consume(topic) do |dequeued| puts "received the message: #{dequeued.inspect}" - value = dequeued + + buffer = dequeued.message.to_a + # value = String.from_java_bytes(dequeued.message) + value = buffer.pack('C*').encode('UTF-8', {:invalid => :replace, :undef => :replace, :replace => ''}) + # puts ".......................encoding: #{buffer.pack('c*')} #{buffer.pack('c*').encoding}" + + # value = buffer.pack('c*').encode("ASCII-8BIT", {:invalid => :replace, :undef => :replace, :replace => ''}) + puts value consumer.shutdown end # Return this out of the block - next value + value end end - let(:brokers) do + let(:brokers_array) do broker_ids = Hermann::Discovery::Zookeeper.new(zookeepers).get_brokers puts "using ZK discovered brokers: #{broker_ids}" broker_ids end - let(:producer) { Hermann::Producer.new(nil, brokers) } - - + let(:producer) { Hermann::Producer.new(nil, brokers_array) } it 'produces and consumes messages', :type => :integration, :platform => :java do producer.push(message, :topic => topic).value!(timeout) expect(consumer_promise.value!(timeout)).to eql(message) end - context 'with binary data', :type => :integration, :platform => :java do let(:event) do Hermann::TestEvent.new(:name => 'rspec', :state => 3, - :bogomips => 9001) + :bogomips => 9001) end let(:message) { event.encode } @@ -56,7 +60,7 @@ describe 'producer' do it 'should be a thing' do producer.push(message, :topic => topic).value!(timeout) dequeued = consumer_promise.value!(timeout) - expect(dequeued).to eql(message) + expect(dequeued).to eql(message.encode!('UTF-8', {:invalid => :replace, :undef => :replace, :replace => ''})) expect { Hermann::TestEvent.decode(dequeued) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 33d6ab7..195e6b5 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,6 +1,7 @@ require 'rubygems' require 'yaml' require 'rspec' +require 'pry' # Add ext/ to the load path so we can load `hermann_lib` $LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__) + '/../ext/'))