mirror of https://github.com/reiseburo/hermann
Merge pull request #51 from rtyler/mri-multi-topics
Multiple topics for MRI
This commit is contained in:
commit
d87608645f
|
@ -11,3 +11,5 @@ tmp/
|
|||
.ruby-gemset
|
||||
.ruby-version
|
||||
Gemfile.lock
|
||||
Jarfile.lock
|
||||
.jbundler/
|
||||
|
|
|
@ -44,6 +44,78 @@ class RdKafkaRecipe < MiniPortile
|
|||
raise 'Checksum error!'
|
||||
end
|
||||
end
|
||||
|
||||
def download_file_http(url, full_path, count = 3)
|
||||
filename = File.basename(full_path)
|
||||
uri = URI.parse(url)
|
||||
|
||||
if ENV['http_proxy']
|
||||
_, userinfo, p_host, p_port = URI.split(ENV['http_proxy'])
|
||||
proxy_user, proxy_pass = userinfo.split(/:/) if userinfo
|
||||
http = Net::HTTP.new(uri.host, uri.port, p_host, p_port, proxy_user, proxy_pass)
|
||||
else
|
||||
http = Net::HTTP.new(uri.host, uri.port)
|
||||
|
||||
if URI::HTTPS === uri
|
||||
http.use_ssl = true
|
||||
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
|
||||
|
||||
store = OpenSSL::X509::Store.new
|
||||
|
||||
# Auto-include system-provided certificates
|
||||
store.set_default_paths
|
||||
|
||||
if ENV.has_key?("SSL_CERT_FILE") && File.exist?(ENV["SSL_CERT_FILE"])
|
||||
store.add_file ENV["SSL_CERT_FILE"]
|
||||
end
|
||||
|
||||
http.cert_store = store
|
||||
end
|
||||
end
|
||||
|
||||
message "Downloading #{filename} "
|
||||
http.start do |h|
|
||||
h.request_get(uri.path, 'Accept-Encoding' => 'identity') do |response|
|
||||
case response
|
||||
when Net::HTTPNotFound
|
||||
output "404 - Not Found"
|
||||
return false
|
||||
|
||||
when Net::HTTPClientError
|
||||
output "Error: Client Error: #{response.inspect}"
|
||||
return false
|
||||
|
||||
when Net::HTTPRedirection
|
||||
raise "Too many redirections for the original URL, halting." if count <= 0
|
||||
url = response["location"]
|
||||
return download_file(url, full_path, count - 1)
|
||||
|
||||
when Net::HTTPOK
|
||||
return with_tempfile(filename, full_path) do |temp_file|
|
||||
size = 0
|
||||
progress = 0
|
||||
puts "HEADER: #{response.header['Content-Length']}"
|
||||
total = response.header["Content-Length"].to_i
|
||||
|
||||
if total == 0
|
||||
puts response.headers
|
||||
raise "Failed to properly download o_O"
|
||||
end
|
||||
response.read_body do |chunk|
|
||||
temp_file << chunk
|
||||
size += chunk.size
|
||||
new_progress = (size * 100) / total
|
||||
unless new_progress == progress
|
||||
message "\rDownloading %s (%3d%%) " % [filename, new_progress]
|
||||
end
|
||||
progress = new_progress
|
||||
end
|
||||
output
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
################################################################################
|
||||
|
||||
|
|
|
@ -534,16 +534,19 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
|
|||
*
|
||||
* @param self VALUE the Ruby producer instance
|
||||
* @param message VALUE the ruby String containing the outgoing message.
|
||||
* @param topic VALUE the ruby String containing the topic to use for the
|
||||
* outgoing message.
|
||||
* @param result VALUE the Hermann::Result object to be fulfilled when the
|
||||
* push completes
|
||||
*/
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
|
||||
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
|
||||
|
||||
HermannInstanceConfig* producerConfig;
|
||||
/* Context pointer, pointing to `result`, for the librdkafka delivery
|
||||
* callback
|
||||
*/
|
||||
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
|
||||
rd_kafka_topic_t *rkt = NULL;
|
||||
|
||||
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
|
||||
|
||||
|
@ -554,10 +557,9 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
|
|||
|
||||
TRACER("producerConfig: %p\n", producerConfig);
|
||||
|
||||
if ((NULL == producerConfig->topic) ||
|
||||
(0 == strlen(producerConfig->topic))) {
|
||||
fprintf(stderr, "Topic is null!\n");
|
||||
rb_raise(rb_eRuntimeError, "Topic cannot be empty");
|
||||
if ((Qnil == topic) ||
|
||||
(0 == RSTRING_LEN(topic))) {
|
||||
rb_raise(rb_eArgError, "Topic cannot be empty");
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -567,6 +569,15 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
|
|||
|
||||
TRACER("kafka initialized\n");
|
||||
|
||||
rkt = rd_kafka_topic_new(producerConfig->rk,
|
||||
RSTRING_PTR(topic),
|
||||
NULL);
|
||||
|
||||
if (NULL == rkt) {
|
||||
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
|
||||
return self;
|
||||
}
|
||||
|
||||
/* Only pass result through if it's non-nil */
|
||||
if (Qnil != result) {
|
||||
delivery_ctx->result = result;
|
||||
|
@ -576,7 +587,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
|
|||
TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));
|
||||
|
||||
/* Send/Produce message. */
|
||||
if (-1 == rd_kafka_produce(producerConfig->rkt,
|
||||
if (-1 == rd_kafka_produce(rkt,
|
||||
producerConfig->partition,
|
||||
RD_KAFKA_MSG_F_COPY,
|
||||
RSTRING_PTR(message),
|
||||
|
@ -590,6 +601,10 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
|
|||
/* TODO: raise a Ruby exception here, requires a test though */
|
||||
}
|
||||
|
||||
if (NULL != rkt) {
|
||||
rd_kafka_topic_destroy(rkt);
|
||||
}
|
||||
|
||||
TRACER("returning\n");
|
||||
|
||||
return self;
|
||||
|
@ -913,11 +928,9 @@ static VALUE producer_allocate(VALUE klass) {
|
|||
* Set up the configuration context for the Producer instance
|
||||
*
|
||||
* @param self VALUE the Producer instance
|
||||
* @param topic VALUE the Ruby string naming the topic
|
||||
* @param brokers VALUE a Ruby string containing host:port pairs separated by commas
|
||||
*/
|
||||
static VALUE producer_initialize(VALUE self,
|
||||
VALUE topic,
|
||||
VALUE brokers) {
|
||||
|
||||
HermannInstanceConfig* producerConfig;
|
||||
|
@ -926,12 +939,9 @@ static VALUE producer_initialize(VALUE self,
|
|||
|
||||
TRACER("initialize Producer ruby object\n");
|
||||
|
||||
|
||||
topicPtr = StringValuePtr(topic);
|
||||
brokersPtr = StringValuePtr(brokers);
|
||||
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
|
||||
|
||||
producerConfig->topic = topicPtr;
|
||||
producerConfig->brokers = brokersPtr;
|
||||
/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
|
||||
* partition
|
||||
|
@ -1011,11 +1021,11 @@ void Init_hermann_lib() {
|
|||
rb_define_alloc_func(c_producer, producer_allocate);
|
||||
|
||||
/* Initialize */
|
||||
rb_define_method(c_producer, "initialize", producer_initialize, 2);
|
||||
rb_define_method(c_producer, "initialize", producer_initialize, 1);
|
||||
rb_define_method(c_producer, "initialize_copy", producer_init_copy, 1);
|
||||
|
||||
/* Producer.push_single(msg) */
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 2);
|
||||
rb_define_method(c_producer, "push_single", producer_push_single, 3);
|
||||
|
||||
/* Producer.tick */
|
||||
rb_define_method(c_producer, "tick", producer_tick, 1);
|
||||
|
|
|
@ -12,13 +12,17 @@ module Hermann
|
|||
class Producer
|
||||
attr_reader :topic, :brokers, :internal, :children
|
||||
|
||||
# Initialize a producer object with a default topic and broker list
|
||||
#
|
||||
# @param [String] topic The default topic to use for pushing messages
|
||||
# @param [Array] brokers An array of "host:port" strings for the brokers
|
||||
def initialize(topic, brokers)
|
||||
@topic = topic
|
||||
@brokers = brokers
|
||||
if RUBY_PLATFORM == "java"
|
||||
@internal = Hermann::Provider::JavaProducer.new(brokers)
|
||||
else
|
||||
@internal = Hermann::Lib::Producer.new(topic, brokers)
|
||||
@internal = Hermann::Lib::Producer.new(brokers)
|
||||
end
|
||||
# We're tracking children so we can make sure that at Producer exit we
|
||||
# make a reasonable attempt to clean up outstanding result objects
|
||||
|
@ -42,28 +46,31 @@ module Hermann
|
|||
|
||||
# Push a value onto the Kafka topic passed to this +Producer+
|
||||
#
|
||||
# @param [Array] value An array of values to push, will push each one
|
||||
# separately
|
||||
# @param [Object] value A single object to push
|
||||
#
|
||||
# @param [Hash] opts to pass to push method
|
||||
# @params opts [String] :topic The topic to push messages to
|
||||
# @option opts [String] :topic The topic to push messages to
|
||||
#
|
||||
# @return [Hermann::Result] A future-like object which will store the
|
||||
# result from the broker
|
||||
def push(value, opts={})
|
||||
topic = opts[:topic] || @topic
|
||||
result = create_result
|
||||
result = nil
|
||||
|
||||
if value.kind_of? Array
|
||||
return value.map { |e| self.push(e) }
|
||||
else
|
||||
if RUBY_PLATFORM == "java"
|
||||
result = @internal.push_single(value, topic)
|
||||
end
|
||||
|
||||
if RUBY_PLATFORM == "java"
|
||||
result = @internal.push_single(value, topic)
|
||||
unless result.nil?
|
||||
@children << result
|
||||
else
|
||||
@internal.push_single(value, result)
|
||||
end
|
||||
# Reaping children on the push just to make sure that it does get
|
||||
# called correctly and we don't leak memory
|
||||
reap_children
|
||||
else
|
||||
result = create_result
|
||||
@internal.push_single(value, topic, result)
|
||||
end
|
||||
|
||||
return result
|
||||
|
|
|
@ -7,7 +7,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
subject(:producer) { Hermann::Lib::Producer.new(topic, brokers) }
|
||||
subject(:producer) { Hermann::Lib::Producer.new(brokers) }
|
||||
let(:timeout) { 3000 }
|
||||
|
||||
it { should respond_to :push_single }
|
||||
|
@ -41,7 +41,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
let(:brokers) { 'localhost:13337' }
|
||||
|
||||
it 'should error after attempting to connect' do |example|
|
||||
producer.push_single(example.full_description, nil)
|
||||
producer.push_single(example.full_description, 'test-topic', nil)
|
||||
begin
|
||||
producer.tick(timeout)
|
||||
rescue StandardError => ex
|
||||
|
@ -62,7 +62,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
|
||||
describe '#push_single', :type => :integration do
|
||||
let(:message) { |example| example.full_description }
|
||||
subject(:push) { producer.push_single(message, nil) }
|
||||
subject(:push) { producer.push_single(message, topic, nil) }
|
||||
|
||||
it 'should return' do
|
||||
expect(push).not_to be_nil
|
||||
|
@ -105,7 +105,7 @@ describe 'Hermann::Lib::Producer', :platform => :mri do
|
|||
|
||||
context 'with a single queued request' do
|
||||
before :each do
|
||||
producer.push_single('hello', nil)
|
||||
producer.push_single('hello', topic, nil)
|
||||
end
|
||||
|
||||
it 'should return successfully' do
|
||||
|
|
|
@ -38,14 +38,14 @@ describe Hermann::Producer do
|
|||
end
|
||||
end
|
||||
|
||||
context "not java", :platform => :mri do
|
||||
context "on C ruby", :platform => :mri do
|
||||
describe '#push' do
|
||||
subject(:result) { producer.push(value) }
|
||||
|
||||
context 'error conditions' do
|
||||
shared_examples 'an error condition' do
|
||||
it 'should raise an exception' do
|
||||
expect { producer.push('rspec') }.to raise_error(RuntimeError)
|
||||
expect { producer.push('rspec') }.to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -87,7 +87,7 @@ describe Hermann::Producer do
|
|||
|
||||
it 'should invoke #push_single for each element' do
|
||||
value.each do |v|
|
||||
expect(producer.internal).to receive(:push_single).with(v, anything)
|
||||
expect(producer.internal).to receive(:push_single).with(v, topic, anything)
|
||||
end
|
||||
|
||||
expect(result).to be_instance_of Array
|
||||
|
|
Loading…
Reference in New Issue