WIP: start fleshing out an FFI provider
This commit is contained in:
parent
8a1d2d0f63
commit
2540ec770a
|
@ -0,0 +1,56 @@
|
|||
require 'ffi'
|
||||
|
||||
require 'hermann/errors'
|
||||
require 'hermann/provider/abstract_producer'
|
||||
|
||||
module Hermann
|
||||
module Provider
|
||||
class FFIProducer < AbstractProducer
|
||||
extend FFI::Library
|
||||
ffi_lib 'rdkafka'
|
||||
|
||||
attr_reader :brokers
|
||||
|
||||
# Initialize a librdkafka-backed Producer class which relies on FFI to
|
||||
# call into the shared library
|
||||
#
|
||||
# @param [String] brokers A list of comma separated host:port pairs
|
||||
# @return [FFIProducer]
|
||||
def initialize(brokers)
|
||||
validate_brokers!(brokers)
|
||||
@brokers = brokers
|
||||
end
|
||||
|
||||
# Connect to the Kafka brokers
|
||||
#
|
||||
# @param [Integer] timeout_sec Seconds to timeout a connection
|
||||
# attempt after
|
||||
# @return [Boolean] True if we've successfully connected
|
||||
def connect(timeout_sec)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def validate_brokers!(brokers)
|
||||
if brokers.nil?
|
||||
raise Hermann::Errors::ConfigurationError
|
||||
end
|
||||
|
||||
brokers.split(',').each do |broker|
|
||||
# Make sure we've got a host:port pair
|
||||
host, port = broker.split(':')
|
||||
if host.nil? || port.nil?
|
||||
raise Hermann::Errors::ConfigurationError
|
||||
end
|
||||
|
||||
port = port.to_i
|
||||
|
||||
# We should have a valid port number
|
||||
if (port <= 0) || (port > 65536)
|
||||
raise Hermann::Errors::ConfigurationError
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,45 @@
|
|||
require 'spec_helper'
|
||||
require 'hermann/provider/ffi_producer'
|
||||
|
||||
describe Hermann::Provider::FFIProducer do
|
||||
describe '.initialize' do
|
||||
subject(:producer) { described_class.new(brokers) }
|
||||
shared_examples_for 'a configuration error' do
|
||||
it 'should raise' do
|
||||
expect { subject }.to raise_error(Hermann::Errors::ConfigurationError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'with nil brokers' do
|
||||
let(:brokers) { nil }
|
||||
it_behaves_like 'a configuration error'
|
||||
end
|
||||
|
||||
context 'without host:port pairs' do
|
||||
let(:brokers) { 'foo' }
|
||||
it_behaves_like 'a configuration error'
|
||||
end
|
||||
|
||||
context 'without a numeric port' do
|
||||
let(:brokers) { 'rspec:rules' }
|
||||
it_behaves_like 'a configuration error'
|
||||
end
|
||||
end
|
||||
|
||||
context 'instance methods' do
|
||||
describe '#connect' do
|
||||
end
|
||||
|
||||
describe '#connected?' do
|
||||
end
|
||||
|
||||
describe '#errored?' do
|
||||
end
|
||||
|
||||
describe '#push_single' do
|
||||
end
|
||||
|
||||
describe '#tick' do
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue