From c2540a84108473253b9ec910e7b657a2a12084f1 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Thu, 11 Jun 2015 15:28:41 -0700 Subject: [PATCH] update api to have #topic(topic) as well as #topics also add Partition#consumer to instantiate a coonsumer object on the particular partition --- lib/hermann/discovery/metadata.rb | 28 +++++++++++++++++++--------- scripts/metadata_mri.rb | 7 +++++++ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/lib/hermann/discovery/metadata.rb b/lib/hermann/discovery/metadata.rb index 6db2a09..e0f300b 100644 --- a/lib/hermann/discovery/metadata.rb +++ b/lib/hermann/discovery/metadata.rb @@ -1,4 +1,5 @@ require 'hermann_lib' +require 'hermann/consumer' module Hermann module Discovery @@ -8,9 +9,13 @@ module Hermann "#{host}:#{port}" end end - Topic = Struct.new(:name, :partitions) - Partition = Struct.new(:id, :leader, :replicas, :insync_replicas) + + Partition = Struct.new(:id, :leader, :replicas, :insync_replicas, :topic_name) do + def consumer(offset: :end) + Hermann::Consumer.new(topic_name, brokers: ([leader] + replicas).join(','), partition: id, offset: offset) + end + end DEFAULT_TIMEOUT_MS = 2_000 def initialize(brokers, options = {}) @@ -28,7 +33,17 @@ module Hermann brokers_from_metadata(@internal.metadata(nil, @timeout)) end - def topics(filter_topics = nil) + def topic(t) + get_topics(t)[t] + end + + def topics + get_topics + end + + private + + def get_topics(filter_topics = nil) md = @internal.metadata(filter_topics, @timeout) broker_hash = brokers_from_metadata(md).inject({}) do |h, broker| @@ -42,7 +57,7 @@ module Hermann leader = broker_hash[p[:leader_id]] all_replicas = p[:replica_ids].map { |i| broker_hash[i] } isr_replicas = p[:isr_ids].map { |i| broker_hash[i] } - Partition.new(p[:id], leader, all_replicas, isr_replicas) + Partition.new(p[:id], leader, all_replicas, isr_replicas, topic_name) end topic_hash[topic_name] = Topic.new(topic_name, partitions) @@ -50,11 +65,6 @@ module Hermann end end - def topic(t) - topics(t)[t] - end - - private def brokers_from_metadata(md) md[:brokers].map do |h| diff --git a/scripts/metadata_mri.rb b/scripts/metadata_mri.rb index 46ef5cd..5a192b5 100644 --- a/scripts/metadata_mri.rb +++ b/scripts/metadata_mri.rb @@ -1,8 +1,15 @@ require 'bundler/setup' require 'hermann' require 'hermann/discovery/metadata' +require 'hermann/consumer' c = Hermann::Discovery::Metadata.new( "localhost:9092" ) c.topic("maxwell") puts c.topic("maxwell").inspect + +puts c.brokers.inspect +consumer = Hermann::Consumer.new("maxwell", brokers: "localhost:9092, localhost:9092", partition: c.topic('maxwell').partitions.first.id, offset: :start) +consumer.consume do |c| + puts c +end