WhoasQueueConfig
Whoas Queue Configuration
diff --git a/docs/html5/javadoc/com/github/lookout/whoas/package-tree.html b/docs/html5/javadoc/com/github/lookout/whoas/package-tree.html
index 1d20bae..daf3954 100644
--- a/docs/html5/javadoc/com/github/lookout/whoas/package-tree.html
+++ b/docs/html5/javadoc/com/github/lookout/whoas/package-tree.html
@@ -2,9 +2,9 @@
-
+
com.github.lookout.whoas Class Hierarchy (whoas 0.3.0 API)
-
+
@@ -78,6 +78,7 @@
com.github.lookout.whoas.AbstractHookQueue
com.github.lookout.whoas.AbstractHookRunner
@@ -95,6 +96,7 @@
Interface Hierarchy
diff --git a/docs/html5/javadoc/constant-values.html b/docs/html5/javadoc/constant-values.html
index a6b6587..9fe20e2 100644
--- a/docs/html5/javadoc/constant-values.html
+++ b/docs/html5/javadoc/constant-values.html
@@ -2,9 +2,9 @@
-
+
Constant Field Values (whoas 0.3.0 API)
-
+
diff --git a/docs/html5/javadoc/deprecated-list.html b/docs/html5/javadoc/deprecated-list.html
index e724468..2a157fd 100644
--- a/docs/html5/javadoc/deprecated-list.html
+++ b/docs/html5/javadoc/deprecated-list.html
@@ -2,9 +2,9 @@
-
+
Deprecated List (whoas 0.3.0 API)
-
+
diff --git a/docs/html5/javadoc/help-doc.html b/docs/html5/javadoc/help-doc.html
index 7f38328..d26b90b 100644
--- a/docs/html5/javadoc/help-doc.html
+++ b/docs/html5/javadoc/help-doc.html
@@ -2,9 +2,9 @@
-
+
API Help (whoas 0.3.0 API)
-
+
diff --git a/docs/html5/javadoc/index-all.html b/docs/html5/javadoc/index-all.html
index 1068af5..34d919b 100644
--- a/docs/html5/javadoc/index-all.html
+++ b/docs/html5/javadoc/index-all.html
@@ -2,9 +2,9 @@
-
+
Index (whoas 0.3.0 API)
-
+
@@ -108,6 +108,8 @@
call(HookRequest) - Method in interface com.github.lookout.whoas.QueueAction
+call(Jedis) - Method in interface com.github.lookout.whoas.RedisQueueAction
+
com.github.lookout.whoas - package com.github.lookout.whoas
contentType - Variable in class com.github.lookout.whoas.HookRequest
@@ -134,6 +136,10 @@
Return the number of elements in the queue
+getSize() - Method in class com.github.lookout.whoas.RedisQueue
+
+Return the number of elements in the queue
+
getWhoasFactory(T) - Method in interface com.github.lookout.whoas.WhoasConfiguration
@@ -219,6 +225,14 @@
If the Closure throws an exception, the dequeued item will be returned
to the tail end of the queue
+pop(QueueAction) - Method in class com.github.lookout.whoas.RedisQueue
+
+Performs a blocking pop on the queue and invokes the closure with the
+ item popped from the queue
+
+ If the Closure throws an exception, the dequeued item will be returned
+ to the tail end of the queue
+
port - Variable in class com.github.lookout.whoas.WhoasQueueConfig
Port number of the distributed queue server
@@ -250,6 +264,13 @@
If the request cannot be inserted, this method will return false,
otherwise true.
+push(HookRequest) - Method in class com.github.lookout.whoas.RedisQueue
+
+Attempt to insert the request into the queue
+
+ If the request cannot be inserted, this method will return false,
+ otherwise true.
+
@@ -270,6 +291,24 @@
R
+RedisQueue - Class in com.github.lookout.whoas
+
+A redis queue that offers distributed and persistent queue
+
+RedisQueue(WhoasQueueConfig) - Constructor for class com.github.lookout.whoas.RedisQueue
+
+Create the RedisQueue with valid config
+
+RedisQueue() - Constructor for class com.github.lookout.whoas.RedisQueue
+
+Default constructor
+
+RedisQueue(JedisPool) - Constructor for class com.github.lookout.whoas.RedisQueue
+
+Allow users to provide their own JedisPool
instance
+
+RedisQueueAction <T > - Interface in com.github.lookout.whoas
+
retries - Variable in class com.github.lookout.whoas.HookRequest
run() - Method in class com.github.lookout.whoas.AbstractHookRunner
@@ -305,6 +344,10 @@
start() - Method in class com.github.lookout.whoas.AbstractHookQueue
+start() - Method in class com.github.lookout.whoas.RedisQueue
+
+Setup the Redis client
+
started - Variable in class com.github.lookout.whoas.AbstractHookQueue
stop() - Method in class com.github.lookout.whoas.AbstractHookQueue
@@ -316,6 +359,10 @@
This will only come into effect after the runner has completed it's
currently executing work
+stop() - Method in class com.github.lookout.whoas.RedisQueue
+
+Stop the Redis client
+
@@ -361,6 +408,10 @@
WhoasQueueConfig() - Constructor for class com.github.lookout.whoas.WhoasQueueConfig
+withRedis(RedisQueueAction) - Method in class com.github.lookout.whoas.RedisQueue
+
+Allocate redis client from the pool
+
A B C D G H I K L P Q R S T U W
diff --git a/docs/html5/javadoc/index.html b/docs/html5/javadoc/index.html
index 5a9def9..e468cd2 100644
--- a/docs/html5/javadoc/index.html
+++ b/docs/html5/javadoc/index.html
@@ -2,7 +2,7 @@
-
+
whoas 0.3.0 API
@@ -82,6 +82,7 @@
com.github.lookout.whoas.AbstractHookQueue
com.github.lookout.whoas.AbstractHookRunner
@@ -99,6 +100,7 @@
Interface Hierarchy
diff --git a/src/main/java/com/github/lookout/whoas/AbstractHookQueue.java b/src/main/java/com/github/lookout/whoas/AbstractHookQueue.java
index 2459ef2..501e957 100644
--- a/src/main/java/com/github/lookout/whoas/AbstractHookQueue.java
+++ b/src/main/java/com/github/lookout/whoas/AbstractHookQueue.java
@@ -24,8 +24,9 @@ public abstract class AbstractHookQueue {
/**
* @return Size of the queue, if not implemented by the provider, returns -1
+ * @throws Exception implementors may throw Exceptions
*/
- public abstract int getSize();
+ public abstract int getSize() throws Exception;
/**
* @param action a {@code QueueAction} to invoke
@@ -36,6 +37,7 @@ public abstract class AbstractHookQueue {
/**
* @param request A valid {@code HookRequest}
* @return true if the {@code HookRequest} was successfully added to the queue
+ * @throws Exception implementors may throw Exceptions
*/
- public abstract Boolean push(HookRequest request);
+ public abstract Boolean push(HookRequest request) throws Exception;
}
diff --git a/src/main/java/com/github/lookout/whoas/InMemoryQueue.java b/src/main/java/com/github/lookout/whoas/InMemoryQueue.java
index 6a1e0d7..1b9fce0 100644
--- a/src/main/java/com/github/lookout/whoas/InMemoryQueue.java
+++ b/src/main/java/com/github/lookout/whoas/InMemoryQueue.java
@@ -3,7 +3,7 @@ package com.github.lookout.whoas;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;;
+import org.slf4j.LoggerFactory;
/**
diff --git a/src/main/java/com/github/lookout/whoas/Publisher.java b/src/main/java/com/github/lookout/whoas/Publisher.java
index f49785a..2f09d83 100644
--- a/src/main/java/com/github/lookout/whoas/Publisher.java
+++ b/src/main/java/com/github/lookout/whoas/Publisher.java
@@ -49,14 +49,14 @@ public class Publisher {
String responseBody = response.readEntity(String.class);
}
catch (ProcessingException exc) {
- logger.warn("\"POST\" to url: \"${request.url}\" failed", exc);
+ logger.warn("POST to url \"{}\" failed", request.url, exc);
retryableExc = true;
}
if ((retryableExc) || (shouldRetry(response))) {
if (request.retries >= this.maxRetries) {
- logger.error("Giving up on \"POST\" to url: \"${request.url}\" " +
- "after ${request.retries} retries");
+ logger.error("Giving up on POST to url \"{}\" after {} retries",
+ request.url, request.retries);
return false;
}
request.retries = (request.retries + 1);
@@ -64,7 +64,7 @@ public class Publisher {
return this.publish(request);
}
- logger.debug("\"POST\" to url: \"${request.url}\" succeeded");
+ logger.debug("POST to url \"{}\" succeeded", request.url);
return true;
}
@@ -80,7 +80,8 @@ public class Publisher {
}
/* Enhance your calm and try again */
- if (response.getStatus() == 420) {
+ if ((response.getStatus() == 420) ||
+ (response.getStatus() == 429)) {
return true;
}
diff --git a/src/main/java/com/github/lookout/whoas/RedisQueue.java b/src/main/java/com/github/lookout/whoas/RedisQueue.java
index eba9f0f..add99ae 100644
--- a/src/main/java/com/github/lookout/whoas/RedisQueue.java
+++ b/src/main/java/com/github/lookout/whoas/RedisQueue.java
@@ -1,59 +1,83 @@
-package com.github.lookout.whoas
+package com.github.lookout.whoas;
-import com.fasterxml.jackson.databind.ObjectMapper
-import redis.clients.jedis.Jedis
-import redis.clients.jedis.JedisPool
-import redis.clients.jedis.JedisPoolConfig
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
+import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A redis queue that offers distributed and persistent queue
*/
-class RedisQueue extends AbstractHookQueue {
- private WhoasQueueConfig queueConfig
- private JedisPool pool = null
- private static Integer maxActiveConnections = 10
- private static Integer maxIdleConnections = 5
- private static Integer minIdleConnections = 1
- private Logger logger = LoggerFactory.getLogger(RedisQueue.class)
+public class RedisQueue extends AbstractHookQueue {
+ private WhoasQueueConfig queueConfig;
+ private JedisPool pool = null;
+ private static Integer maxActiveConnections = 10;
+ private static Integer maxIdleConnections = 5;
+ private static Integer minIdleConnections = 1;
+ private Logger logger = LoggerFactory.getLogger(RedisQueue.class);
/**
* Create the RedisQueue with valid config
+ *
+ * @param queueConfig necessary configuration to connect to Redis
*/
- RedisQueue(WhoasQueueConfig queueConfig) {
- this.queueConfig = queueConfig
+ public RedisQueue(WhoasQueueConfig queueConfig) {
+ this.queueConfig = queueConfig;
}
/**
* Default constructor
*/
- RedisQueue() {
- queueConfig = new WhoasQueueConfig()
+ public RedisQueue() {
+ queueConfig = new WhoasQueueConfig();
+ }
+
+ /**
+ * Allow users to provide their own {@code JedisPool} instance
+ *
+ * @param pool an already set up pool
+ */
+ public RedisQueue(JedisPool pool) {
+ this();
+ this.pool = pool;
}
/**
* Return the number of elements in the queue
*/
- int getSize() {
+ public int getSize() throws Exception {
if (!this.started) {
- throw new Exception("Queue must be started before this operation is invoked")
- }
- return withRedis() { Jedis redisClient ->
- return redisClient.llen(this.queueConfig.key)
+ throw new Exception("Queue must be started before this operation is invoked");
}
+ return ((Integer)withRedis(new RedisQueueAction() {
+ @Override
+ public Integer call(Jedis redisClient) {
+ Long size = redisClient.llen(queueConfig.key);
+ return size.intValue();
+ }
+ })).intValue();
}
/**
* Setup the Redis client
*/
@Override
- void start() {
- super.start()
+ public void start() {
+ super.start();
+
+ /* Bail early if we already have a valid pool */
+ if (this.pool instanceof JedisPool) {
+ return;
+ }
logger.debug("Setting up redis queue \"${this.queueConfig.key}\" on the server " +
- "\"${this.queueConfig.hostname}:${this.queueConfig.port}")
+ "\"${this.queueConfig.hostname}:${this.queueConfig.port}");
+
/**
* Setup jedis pool
@@ -63,37 +87,26 @@ class RedisQueue extends AbstractHookQueue {
* multiple jedis instances and use them reliably and efficiently across different
* threads
*/
- JedisPoolConfig poolConfig = new JedisPoolConfig()
- poolConfig.setMaxTotal(maxActiveConnections)
- poolConfig.setTestOnBorrow(true)
- poolConfig.setTestOnReturn(true)
- poolConfig.setMaxIdle(maxIdleConnections)
- poolConfig.setMinIdle(minIdleConnections)
- poolConfig.setTestWhileIdle(true)
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(maxActiveConnections);
+ poolConfig.setTestOnBorrow(true);
+ poolConfig.setTestOnReturn(true);
+ poolConfig.setMaxIdle(maxIdleConnections);
+ poolConfig.setMinIdle(minIdleConnections);
+ poolConfig.setTestWhileIdle(true);
/* Create the pool */
- pool = new JedisPool(poolConfig, this.queueConfig.hostname, this.queueConfig.port)
+ pool = new JedisPool(poolConfig, this.queueConfig.hostname, this.queueConfig.port);
}
/**
* Stop the Redis client
*/
@Override
- void stop() {
- super.stop()
- pool.destroy()
- pool = null
- }
-
- /** Allocate redis client from the pool */
- Object withRedis(Closure closure) {
- Jedis redisClient = pool.resource
- try {
- return closure.call(redisClient)
- }
- finally {
- redisClient.close()
- }
+ public void stop() {
+ super.stop();
+ pool.destroy();
+ pool = null;
}
/**
@@ -103,35 +116,38 @@ class RedisQueue extends AbstractHookQueue {
* If the Closure throws an exception, the dequeued item will be returned
* to the tail end of the queue
*/
- void pop(QueueAction action) {
+ public void pop(final QueueAction action) throws Exception {
if (action == null) {
- throw new Exception("Must provide a Closure to RedisQueue.pop()")
+ throw new Exception("Must provide a Closure to RedisQueue.pop()");
}
if (!this.started) {
- throw new Exception("Queue must be started before this operation is invoked")
+ throw new Exception("Queue must be started before this operation is invoked");
}
- withRedis() { Jedis redisClient ->
+ withRedis(new RedisQueueAction() {
+ @Override
+ public Long call(Jedis redisClient) throws Exception {
+ /**
+ * The blpop returns list of strings (key and value)
+ */
+ List messages = redisClient.blpop(0, queueConfig.key);
- /**
- * The blpop returns list of strings (key and value)
- */
- List messages = redisClient.blpop(0, this.queueConfig.key)
-
- /* If valid, decode message */
- if (messages) {
- ObjectMapper mapper = new ObjectMapper()
- HookRequest request = mapper.readValue(messages.get(1), HookRequest.class)
- try {
- action.call(request)
- } catch (Exception ex) {
- /* Put this back on the front of the queue */
- logger.info("\"Pop\" on redis queue failed, pushing it back on front of the queue", ex)
- redisClient.lpush(this.queueConfig.key, messages.get(1))
+ /* If valid, decode message */
+ if ((messages != null) && (!messages.isEmpty())) {
+ ObjectMapper mapper = new ObjectMapper();
+ HookRequest request = mapper.readValue(messages.get(1), HookRequest.class);
+ try {
+ action.call(request);
+ } catch (Exception ex) {
+ /* Put this back on the front of the queue */
+ logger.info("\"Pop\" on redis queue failed, pushing it back on front of the queue", ex);
+ return redisClient.lpush(queueConfig.key, messages.get(1));
+ }
}
+ return new Long(-1);
}
- }
+ });
}
/**
@@ -139,16 +155,43 @@ class RedisQueue extends AbstractHookQueue {
*
* If the request cannot be inserted, this method will return false,
* otherwise true.
+ *
+ * @param request A {@code HookRequest} to enqueue
*/
- Boolean push(HookRequest request) {
+ @Override
+ public Boolean push(HookRequest request) throws JsonProcessingException, Exception {
if (!this.started) {
- throw new Exception("Queue must be started before this operation is invoked")
+ throw new Exception("Queue must be started before this operation is invoked");
}
- ObjectMapper mapper = new ObjectMapper()
- String jsonPayload = mapper.writeValueAsString(request)
- return withRedis() { Jedis redisClient ->
- return redisClient.rpush(this.queueConfig.key, jsonPayload) != 0
+ ObjectMapper mapper = new ObjectMapper();
+ final String jsonPayload = mapper.writeValueAsString(request);
+ return (Boolean)withRedis(new RedisQueueAction() {
+ @Override
+ public Boolean call(Jedis redisClient) {
+ System.out.println(String.format("%s %s", queueConfig.key, jsonPayload));
+ redisClient.rpush(queueConfig.key, jsonPayload);
+ return true;
+ }
+ });
+ }
+
+ /** Allocate redis client from the pool
+ *
+ * @param action callback to invoke with a {@code Jedis} object from the
+ * pool
+ * @throws Exception propogates underlying Jedis exceptions
+ * @return propogates a generic {@code Object} up from the {@code RedisQueueAction}
+ */
+ protected Object withRedis(RedisQueueAction action) throws Exception {
+ Jedis redisClient = this.pool.getResource();
+ System.out.println(redisClient.toString());
+ try {
+ return action.call(redisClient);
+ }
+ finally {
+ redisClient.close();
}
}
+
}
diff --git a/src/main/java/com/github/lookout/whoas/RedisQueueAction.java b/src/main/java/com/github/lookout/whoas/RedisQueueAction.java
new file mode 100644
index 0000000..aab3059
--- /dev/null
+++ b/src/main/java/com/github/lookout/whoas/RedisQueueAction.java
@@ -0,0 +1,7 @@
+package com.github.lookout.whoas;
+
+import redis.clients.jedis.Jedis;
+
+public interface RedisQueueAction {
+ public T call(Jedis jedis) throws Exception;
+}
diff --git a/src/test/groovy/com/github/lookout/whoas/PublisherSpec.groovy b/src/test/groovy/com/github/lookout/whoas/PublisherSpec.groovy
index d7d505d..6b25080 100644
--- a/src/test/groovy/com/github/lookout/whoas/PublisherSpec.groovy
+++ b/src/test/groovy/com/github/lookout/whoas/PublisherSpec.groovy
@@ -40,6 +40,7 @@ class PublisherSpec extends Specification {
201 | false
400 | false
420 | true
+ 429 | true
500 | true
599 | true
}
diff --git a/src/test/groovy/com/github/lookout/whoas/RedisQueueSpec.groovy b/src/test/groovy/com/github/lookout/whoas/RedisQueueSpec.groovy
index 6c7df57..57614e0 100644
--- a/src/test/groovy/com/github/lookout/whoas/RedisQueueSpec.groovy
+++ b/src/test/groovy/com/github/lookout/whoas/RedisQueueSpec.groovy
@@ -1,11 +1,18 @@
package com.github.lookout.whoas
import com.fiftyonred.mock_jedis.MockJedis
+import com.fiftyonred.mock_jedis.MockJedisPool
import redis.clients.jedis.Jedis
+import redis.clients.jedis.JedisPool
+import redis.clients.jedis.JedisPoolConfig
+
import spock.lang.*
class RedisQueueSpec extends Specification {
+ protected MockJedisPool mockPool() {
+ return new MockJedisPool(new JedisPoolConfig(), 'example.com')
+ }
def "getSize()ing without a start should throw"() {
given:
@@ -20,15 +27,13 @@ class RedisQueueSpec extends Specification {
def "getSize() should return 0 by default"() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
+ RedisQueue queue = new RedisQueue(mockPool())
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
then:
- queue.getSize() == 0
+ queue.size == 0
}
def "pop()ing without a closure should throw"() {
@@ -64,50 +69,62 @@ class RedisQueueSpec extends Specification {
thrown Exception
}
+}
+
+/** Spec for testing Jedis interactions with mock-jedis */
+class RedisQueueMockIntegrationSpec extends RedisQueueSpec {
+ protected RedisQueue queue
+ protected Jedis client
+ protected List store = []
+
+ def setup() {
+ this.queue = new RedisQueue(mockPool())
+ this.client = Mock(MockJedis, constructorArgs: ['example.com'])
+ this.queue.pool.client = this.client
+
+ _ * client.llen(_) >> { this.store.size }
+ }
+
def "push() should put onto the internal queue"() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
- redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
+ 2 * client.rpush(_, _) >> { key, payload -> store << payload[]; return 1 }
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
queue.push(new HookRequest())
queue.push(new HookRequest())
then:
- queue.getSize() == 2
+ queue.size == 2
}
def "pop() after push should receive a request"() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
- redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
- redisClient.metaClass.blpop = {Integer timeout, String key -> [key, redisClient.lpop(key)]}
+ /* Mock rpush() and just say it pushed one element
+ * Due to some weirdness in Spock's mocking, `payload` comes in as a
+ * list of strings
+ */
+ 1 * client.rpush(_, _) >> { key, payload -> store << payload[0]; return 1 }
+ /* Need to return a stupid List since jedis */
+ 1 * client.blpop(_, _) >> { [null, store.pop()] }
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
HookRequest test = new HookRequest()
queue.push(test)
then:
- queue.getSize() == 1
- queue.pop() { HookRequest fetched -> fetched == test}
- queue.getSize() == 0
+ store.size == 1
+ queue.pop { HookRequest fetched -> fetched == test}
+ store.size == 0
}
def "push() on rpush exception should return false"() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
- redisClient.metaClass.rpush = {String key, String payload -> throw new Exception("Test Exception")}
+ 1 * client.rpush(*_) >> { throw new Exception('Spockd!') }
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
queue.push(new HookRequest())
then:
@@ -116,13 +133,10 @@ class RedisQueueSpec extends Specification {
def "pop() on blpop exception simple return, nothing to requeue "() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
- redisClient.metaClass.blpop = {Integer timeout, String key -> throw new Exception("Test Exception")}
+ 1 * client.blpop(*_) >> { throw new Exception('Spockd!') }
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
queue.pop() { }
then:
@@ -131,19 +145,16 @@ class RedisQueueSpec extends Specification {
def "pop() on exception while executing closure should requeue"() {
given:
- RedisQueue queue = new RedisQueue()
- Jedis redisClient = new MockJedis("test")
- redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
- redisClient.metaClass.blpop = {Integer timeout, String key -> [key, redisClient.lpop(key)]}
+ 1 * client.rpush(_, _) >> { key, payload -> this.store << payload[0]; return 1 }
+ 1 * client.blpop(*_) >> { [null, this.store.pop()] }
+ 1 * client.lpush(_, _) >> { key, payload -> this.store << payload[0]; return 1 }
when:
queue.start()
- queue.pool.metaClass.getResource = {redisClient}
queue.push(new HookRequest())
queue.pop() { throw new Exception("Test Exception") }
then:
- queue.getSize() == 1
+ queue.size == 1
}
}
-