Use the appropriate pattern when doing a multipass with wildcards

Fixes #12
This commit is contained in:
R. Tyler Croy 2015-09-02 09:30:17 -07:00
parent 85c474de3b
commit fd7db966ff
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
1 changed files with 18 additions and 17 deletions

View File

@ -8,24 +8,24 @@ import groovy.util.logging.Slf4j
*/ */
@Slf4j @Slf4j
class OfftopicClient { class OfftopicClient {
public int clientId = 0 int clientId = 0
private Closure messageCallback = null private Closure messageCallback
private String topicsPattern = null private String topicsPattern
private ArrayList<KafkaSubscriber> subscribers = null private List<KafkaSubscriber> subscribers
private Configuration config = null private Configuration config
public OfftopicClient(Configuration configuration) { OfftopicClient(Configuration configuration) {
this.clientId = new Random().nextInt() this.clientId = new Random().nextInt()
this.config = configuration this.config = configuration
this.subscribers = new ArrayList<KafkaSubscriber>() this.subscribers = new ArrayList<KafkaSubscriber>()
} }
public ArrayList<KafkaSubscriber> getSubscribers() { List<KafkaSubscriber> getSubscribers() {
return this.subscribers return this.subscribers
} }
public void createSubscribersFor(String topicsPattern) { void createSubscribersFor(String topicsPattern) {
topicsFrom(topicsPattern).each { topic -> topicsFrom(topicsPattern).each { topic ->
if (topic.length() == 0) { if (topic.length() == 0) {
return return
@ -39,11 +39,11 @@ class OfftopicClient {
} }
} }
public void setOnMessageCallback(Closure c) { void setOnMessageCallback(Closure c) {
this.messageCallback = c this.messageCallback = c
} }
public void startSubscribers() { void startSubscribers() {
this.subscribers.each { subscriber -> this.subscribers.each { subscriber ->
Thread runner = new Thread({ Thread runner = new Thread({
subscriber.connect() subscriber.connect()
@ -55,20 +55,21 @@ class OfftopicClient {
} }
} }
public void shutdown() { void shutdown() {
this.subscribers.each { subscriber -> this.subscribers.each { subscriber ->
subscriber.shutdown() subscriber.shutdown()
} }
} }
public ArrayList<String> topicsFrom(String topicsPattern) { List <String> topicsFrom(String topicsPattern) {
ArrayList<String> topics = new ArrayList<String>() List<String> topics = []
topicsPattern.split("\\+").each { topic -> topicsPattern.split("\\+").each { topic ->
if (topic.length() == 0) { if (topic.length() == 0) {
return return
} }
if (topic.indexOf('*') >= 0) { if (topic.indexOf('*') >= 0) {
topics.addAll(lookupTopicsFor(topicsPattern)) /* in this case our `topic` is actually a topic pattern */
topics.addAll(lookupTopicsFor(topic))
} }
else { else {
topics.add(topic) topics.add(topic)
@ -77,8 +78,8 @@ class OfftopicClient {
return topics return topics
} }
private ArrayList<String> lookupTopicsFor(String topicPattern) { private List<String> lookupTopicsFor(String topicPattern) {
ArrayList<String> topics = new ArrayList<String>() List<String> topics = []
KafkaService.fetchTopics().each { topic -> KafkaService.fetchTopics().each { topic ->
if (topic =~ topicPattern) { if (topic =~ topicPattern) {
topics.add(topic) topics.add(topic)