Add suppot for optionally using Curator if it's on the classpath for broker discovery

This change makes the loading of the 'zk' gem optional and implicit, as well as
the reliance on Curator being available in the JRuby runtime's classpath.

If neither Zookeeper client implementation are available, an exception will be
raised when attmepting to use Zookeeper-based discovery

Fixes #75
This commit is contained in:
R. Tyler Croy 2014-11-21 18:06:56 -08:00
parent c46156a7d1
commit 136e0ace47
7 changed files with 184 additions and 75 deletions

View File

@ -3,9 +3,10 @@ source "https://rubygems.org"
gemspec
group :development do
gem 'jbundler', :platform => :jruby
gem 'rake'
gem' ruby-maven', :platform => :java
gem 'jar-dependencies', :platform => :java
gem 'ruby-maven', '~> 3.1.1.0', :platform => :jruby
gem 'jar-dependencies', :platform => :jruby
gem 'rake-compiler'
gem 'pry'

12
HACKING.md Normal file
View File

@ -0,0 +1,12 @@
# Hacking on Hermann
### Integration Testing
* Download Kafka
* Start Zookeeper
* set port 2181
* Start Kafka
* Set properties file ```zookeeper.connect=localhost:2181```
* ```bundle exec jruby -S rspec spec/integration```

1
Jarfile Normal file
View File

@ -0,0 +1 @@
jar 'org.apache.curator:curator-framework:2.4.0'

View File

@ -93,15 +93,6 @@ end
* ```jbundle install```
* Test out one of the Producer/Consumer examples above
### Integration Testing
* Download Kafka
* Start Zookeeper
* set port 2181
* Start Kafka
* Set properties file ```zookeeper.connect=localhost:2181```
* ```bundle exec jruby -S rspec spec/integration```

View File

@ -8,10 +8,10 @@ Gem::Specification.new do |s|
s.name = "hermann"
s.version = Hermann::VERSION
s.authors = ['R. Tyler Croy', "Stan Campbell"]
s.authors = ['R. Tyler Croy', 'James Way', "Stan Campbell"]
s.description = 'Ruby gem for talking to Kafka'
s.summary = 'A Kafka consumer/producer gem supporting both MRI and JRuby'
s.email = ['rtyler.croy@lookout.com', 'stan.campbell3@gmail.com']
s.email = ['rtyler.croy@lookout.com', 'james.way@lookout.com', 'stan.campbell3@gmail.com']
s.homepage = 'https://github.com/lookout/Hermann'
s.licenses = ['MIT']

View File

@ -1,44 +1,51 @@
require 'hermann'
require 'zk'
if RUBY_PLATFORM == 'java'
require 'java'
end
require 'json'
require 'hermann/errors'
module Hermann
module Discovery
# Communicates with Zookeeper to discover kafka broker ids
#
class Zookeeper
attr_reader :zookeepers
attr_reader :zookeepers, :impl
BROKERS_PATH = "/brokers/ids".freeze
def initialize(zookeepers)
@zookeepers = zookeepers
end
# Gets comma separated string of brokers
#
# @param [Fixnum] timeout to connect to zookeeper, "2 times the
# tickTime (as set in the server configuration) and a maximum
# of 20 times the tickTime2 times the tick time set on server"
#
# @return [Array] List of brokers from ZK
# @raises [NoBrokersError] if could not discover brokers thru zookeeper
def get_brokers(timeout=0)
brokers = []
ZK.open(zookeepers, {:timeout => timeout}) do |zk|
brokers = fetch_brokers(zk)
# The ZkGemImpl class is an implementation of simple broker discovery
# using the `zk` gem if it is available
class ZkGemImpl
def self.usable?
begin
require 'zk'
return true
rescue LoadError
return false
end
end
if brokers.empty?
raise Hermann::Errors::NoBrokersError
def initialize(zks)
@zookeepers = zks
end
return brokers
end
private
def each_broker(&block)
brokers = []
ZK.open(@zookeepers, {:timeout => timeout}) do |zk|
brokers = fetch_brokers(zk)
if block_given?
brokers.each do |broker|
yield broker
end
end
end
return brokers
end
private
# Gets an Array of broker strings
#
# @param [ZK::Client] zookeeper client
@ -49,9 +56,9 @@ module Hermann
zk.children(BROKERS_PATH).each do |id|
node = fetch_znode(zk, id)
next if node.nil? # whatever error could happen from ZK#get
brokers << format_broker_from_znode(node)
brokers << node
end
brokers.compact
return brokers
end
# Gets node from zookeeper
@ -65,20 +72,89 @@ module Hermann
rescue ZK::Exceptions::NoNode
nil
end
end
# Formats the node data into string
#
# @param [String] node data
#
# @return [String] formatted node data or empty string if error
def format_broker_from_znode(znode)
hash = JSON.parse(znode)
host = hash['host']
port = hash['port']
host && port ? "#{host}:#{port}" : nil
rescue JSON::ParserError
nil
# The CuratorImpl is an implementation of simple broker discovery using
# Apache Curator libraries, if they are made available on the classpath
# for the process running Hermann::Discovery::Zookeeper.
#
# For a number of reasons this is preferred over the `zk` gem, namely
# being a much more simple and mature Zookeeper client interface
class CuratorImpl
def self.usable?
begin
Java::OrgApacheCuratorFramework::CuratorFrameworkFactory
return true
rescue NameError
return false
end
end
def initialize(zks)
retry_policy = Java::OrgApacheCuratorRetry::ExponentialBackoffRetry.new(1000, 3)
@curator = Java::OrgApacheCuratorFramework::CuratorFrameworkFactory.newClient(zks, retry_policy)
@curator.start
end
def each_broker(&block)
brokers = []
@curator.children.for_path(BROKERS_PATH).each do |id|
path = "#{BROKERS_PATH}/#{id}"
data = @curator.data.for_path(path).to_s
if block_given?
yield data
else
brokers << data
end
end
return brokers
end
end
def initialize(zookeepers)
@zookeepers = zookeepers
@impl = nil
if CuratorImpl.usable?
@impl = CuratorImpl.new(zookeepers)
elsif ZkGemImpl.usable?
@impl = ZkGemImpl.new(zookeepers)
else
raise Hermann::Errors::GeneralError, "Could not find a usable Zookeeper implementation, please make sure either the `zk` gem is installed or Curator is on the classpath"
end
end
# Gets comma separated string of brokers
#
# @param [Fixnum] timeout to connect to zookeeper, "2 times the
# tickTime (as set in the server configuration) and a maximum
# of 20 times the tickTime2 times the tick time set on server"
#
# @return [Array] List of brokers from ZK
# @raises [NoBrokersError] if could not discover brokers thru zookeeper
def get_brokers(timeout=0)
brokers = impl.each_broker.map { |b| format_broker_from_znode(b) }
if brokers.empty?
raise Hermann::Errors::NoBrokersError
end
return brokers
end
private
# Formats the node data into string
#
# @param [String] node data
#
# @return [String] formatted node data or empty string if error
def format_broker_from_znode(znode)
hash = JSON.parse(znode)
host = hash['host']
port = hash['port']
host && port ? "#{host}:#{port}" : nil
rescue JSON::ParserError
nil
end
end
end
end

View File

@ -10,45 +10,73 @@ describe Hermann::Discovery::Zookeeper do
subject { described_class.new(zookeepers) }
describe '#get_brokers' do
let(:broker_array) { ['f:1','a:2'] }
before do
allow(ZK).to receive(:open).with(any_args).and_yield(zk)
allow(subject).to receive(:fetch_brokers).with(any_args) { brokers }
let(:broker_array) do
[
JSON.dump({:host => 'f', :port => 1}),
JSON.dump({:host => 'g', :port => 2}),
]
end
before do
impl = double('ZK underlying impl')
allow(subject).to receive(:impl).and_return(impl)
expect(impl).to receive(:each_broker).and_return(broker_array)
end
context 'with valid brokers' do
let(:brokers) { broker_array }
it 'gets valid string' do
expect(subject.get_brokers).to eq broker_array
expect(subject.get_brokers).to eq ['f:1', 'g:2']
end
end
context 'with no brokers' do
let(:brokers) { [] }
let(:broker_array) { [] }
it 'raises an error' do
expect{ subject.get_brokers }.to raise_error(Hermann::Errors::NoBrokersError)
end
end
end
describe '#fetch_brokers' do
let(:broker_ids) { [1] }
it 'fetches the formatted broker list' do
allow(zk).to receive(:children).with(any_args) { broker_ids }
allow(subject).to receive(:fetch_znode).with(any_args) { node }
expect(subject.send(:fetch_brokers, zk)).to eq ['f:1']
end
# Not implementing many tests here on purpose, the use of the Curator
# libraries are relatively straight-forward and adding unit tetss to the
# CuratorImpl doesn't seem worth the trouble of getting the curator libraries
# properly loaded into the RSpec runtime
describe Hermann::Discovery::Zookeeper::CuratorImpl do
subject { described_class }
it { should respond_to :usable? }
end
describe '#fetch_znode' do
let(:id) { 1 }
let(:result) { ['foo'] }
it 'fetches the znode from zookeeper' do
allow(zk).to receive(:get).with(any_args) { result }
expect(subject.send(:fetch_znode, zk, id)).to eq result.first
describe Hermann::Discovery::Zookeeper::ZkGemImpl do
context 'class methods' do
subject { described_class }
it { should respond_to :usable? }
end
it 'returns nil node not found' do
allow(zk).to receive(:get).and_raise(ZK::Exceptions::NoNode)
expect(subject.send(:fetch_znode, zk, id)).to be_nil
context 'instance methods' do
subject { described_class.new(zookeepers) }
describe '#each_broker' do
let(:broker_ids) { [1] }
it 'fetches the formatted broker list' do
allow(zk).to receive(:children).with(any_args) { broker_ids }
allow(subject).to receive(:fetch_znode).with(any_args) { node }
expect(subject.send(:fetch_brokers, zk)).to be_instance_of Array
end
end
describe '#fetch_znode' do
let(:id) { 1 }
let(:result) { ['foo'] }
it 'fetches the znode from zookeeper' do
allow(zk).to receive(:get).with(any_args) { result }
expect(subject.send(:fetch_znode, zk, id)).to eq result.first
end
it 'returns nil node not found' do
allow(zk).to receive(:get).and_raise(ZK::Exceptions::NoNode)
expect(subject.send(:fetch_znode, zk, id)).to be_nil
end
end
end
end