Initial coding, tests for RedisQueue for Redis as a queue provider

References #3
This commit is contained in:
Mahesh V Kelkar 2015-03-18 11:40:02 -04:00 committed by R. Tyler Croy
parent a702b21f75
commit 000737c163
6 changed files with 374 additions and 30 deletions

View File

@ -35,7 +35,9 @@ dependencies {
compile 'com.fasterxml.jackson.core:jackson-databind:2.3.3+'
/* Needed for better time management/sanity */
compile 'joda-time:joda-time:2.6+'
/* redis client */
compile 'redis.clients:jedis:2.6+'
compile 'com.fiftyonred:mock-jedis:0.4.0'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
testCompile 'cglib:cglib-nodep:2.2.+'

View File

@ -7,6 +7,23 @@ package com.github.lookout.whoas
* This allows for different queueing implementations behind whoas
*/
abstract class AbstractHookQueue {
protected Boolean started = false
void start() {
if (started) {
throw new IllegalStateException()
}
started = true
}
void stop() {
if (!started) {
throw new IllegalStateException()
return
}
started = false
}
/**
* Return the size of the queue, may not be implemented by some providers
* in which case it will return -1

View File

@ -3,12 +3,23 @@ package com.github.lookout.whoas
import com.fasterxml.jackson.annotation.JsonProperty
import org.joda.time.DateTime
class HookRequest {
private Long retries
private String url
private String postData
private DateTime deliverAfter
class HookRequest {
@JsonProperty
private Long id
@JsonProperty
private Long retries
@JsonProperty
private String url
@JsonProperty
private String postData
@JsonProperty
private DateTime deliverAfter
/** Constructor for Jackson */
HookRequest() { }
@ -22,28 +33,4 @@ class HookRequest {
this.url = hookUrl
this.postData = hookData
}
@JsonProperty
Long getRetries() {
return this.retries
}
void setRetries(Long newRetries) {
this.retries = newRetries
}
@JsonProperty
String getUrl() {
return this.url
}
@JsonProperty
String getPostData() {
return this.postData
}
@JsonProperty
String getDeliverAfter() {
return this.deliverAfter.toString()
}
}

View File

@ -0,0 +1,62 @@
package com.github.lookout.whoas
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisClientFactory {
private static final RedisClientFactory instance = new RedisClientFactory();
private static JedisPool pool;
private RedisClientFactory() {}
public final static RedisClientFactory getInstance() {
return instance;
}
public void start() {
/* Set JedisPoolConfig */
JedisPoolConfig poolConfig = new JedisPoolConfig();
/* Set max active connections to Redis instance */
poolConfig.setMaxTotal(10);
/* Tests whether connection is dead when connection
* retrieval method is called
*/
poolConfig.setTestOnBorrow(true);
/* Tests whether connection is dead when returning a connection
* to the pool
* */
poolConfig.setTestOnReturn(true);
/* Number of connections to Redis that just sit there and do
* nothing
* */
poolConfig.setMaxIdle(5);
/* Minimum number of idle connections to Redis
* These can be seen as always open and ready to serve
* */
poolConfig.setMinIdle(1);
/* Tests whether connections are dead during idle periods */
poolConfig.setTestWhileIdle(true);
/* Maximum number of connections to test in each idle check */
poolConfig.setNumTestsPerEvictionRun(10);
/* Idle connection checking period */
poolConfig.setTimeBetweenEvictionRunsMillis(60000);
/* Create the jedisPool */
pool = new JedisPool(poolConfig, "localhost", 6379);
}
public void stop() {
pool.destroy();
}
public Jedis getJedis() {
return pool.getResource();
}
public void returnJedis(Jedis jedis) {
jedis.close()
}
}

View File

@ -0,0 +1,125 @@
package com.github.lookout.whoas
import com.fasterxml.jackson.databind.ObjectMapper
import redis.clients.jedis.Jedis;
/**
* A simple in-memory queue that offers no persistence between process restarts
*/
class RedisQueue extends AbstractHookQueue {
private final RedisClientFactory redisClientFactory
private static nextId = 0
/**
* Create the RedisQueue
*/
RedisQueue() {
redisClientFactory = RedisClientFactory.getInstance()
}
/**
* Return the number of elements in the queue
*/
Long getSize() {
if (!this.started) {
throw new Exception("Queue must be started before this operation is invoked")
}
Jedis redisclient = null
Long queueSize = 0
try {
redisclient = redisClientFactory.getJedis()
queueSize = redisclient.llen("queue")
} catch (all) {
} finally {
if (redisclient != null) {
redisClientFactory.returnJedis(redisclient)
}
}
return queueSize
}
/**
* Setup the Redis client factory
*/
@Override
void start() {
super.start()
redisClientFactory.start()
}
/**
* Stop the Redis client factory
*/
@Override
void stop() {
super.stop()
redisClientFactory.stop()
}
/**
* Performs a blocking pop on the queue and invokes the closure with the
* item popped from the queue
*
* If the Closure throws an exception, the dequeued item will be returned
* to the tail end of the queue
*/
void pop(Closure action) {
if (action == null) {
throw new Exception("Must provide a Closure to RedisQueue.pop()")
}
if (!this.started) {
throw new Exception("Queue must be started before this operation is invoked")
}
List<String> messages = null
Jedis redisclient = null
try {
redisclient = redisClientFactory.getJedis()
messages = redisclient.blpop(0, "queue");
/* Decode message */
if (messages) {
ObjectMapper mapper = new ObjectMapper()
HookRequest request = mapper.readValue(messages.get(1), HookRequest.class)
action.call(request)
}
} catch (all) {
/* Put this back on the front of the queue */
if (messages) {
redisclient.lpush("queue", messages.get(1))
}
} finally {
if (redisclient != null) {
redisClientFactory.returnJedis(redisclient)
}
}
}
/**
* Attempt to insert the request into the queue
*
* If the request cannot be inserted, this method will return false,
* otherwise true.
*/
Boolean push(HookRequest request) {
if (!this.started) {
throw new Exception("Queue must be started before this operation is invoked")
}
request.id = ++nextId
ObjectMapper mapper = new ObjectMapper()
String jsonPayload = mapper.writeValueAsString(request)
Jedis redisclient = null
Integer ret = 0
try {
redisclient = redisClientFactory.getJedis()
ret = redisclient.rpush("queue", jsonPayload)
} catch (Exception e) {
} finally {
if (redisclient != null) {
redisClientFactory.returnJedis(redisclient)
}
}
return ret == 1;
}
}

View File

@ -0,0 +1,151 @@
package com.github.lookout.whoas
import com.fiftyonred.mock_jedis.MockJedis
import redis.clients.jedis.Jedis;
import spock.lang.*
class RedisQueueSpec extends Specification {
def "getSize()ing without a start should throw"() {
given:
RedisQueue q = new RedisQueue()
when:
q.getSize()
then:
thrown Exception
}
def "getSize() should return 0 by default"() {
given:
RedisQueue queue = new RedisQueue()
//Jedis redisClient = new MockJedis("test");
//queue.redisClientFactory.metaClass.getJedis = {redisClient}
when:
queue.start()
then:
queue.getSize() == 0
}
def "pop()ing without a closure should throw"() {
given:
RedisQueue q = new RedisQueue()
when:
q.pop()
then:
thrown Exception
}
def "pop()ing without a start should throw"() {
given:
RedisQueue q = new RedisQueue()
when:
q.pop()
then:
thrown Exception
}
def "push()ing without a start should throw"() {
given:
RedisQueue q = new RedisQueue()
when:
queue.push(new HookRequest())
then:
thrown Exception
}
def "push() should put onto the internal queue"() {
given:
RedisQueue queue = new RedisQueue()
Jedis redisClient = new MockJedis("test");
queue.redisClientFactory.metaClass.getJedis = {redisClient}
redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
when:
queue.start()
queue.push(new HookRequest())
queue.push(new HookRequest())
then:
queue.getSize() == 2
}
def "pop() after push should receive a request"() {
given:
RedisQueue queue = new RedisQueue()
Jedis redisClient = new MockJedis("test");
queue.redisClientFactory.metaClass.getJedis = {redisClient}
redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
redisClient.metaClass.blpop = {Integer timeout, String key -> [key, redisClient.lpop(key)]}
when:
queue.start()
HookRequest test = new HookRequest()
queue.push(test)
then:
queue.getSize() == 1
queue.pop() { HookRequest fetched -> fetched == test}
queue.getSize() == 0
}
def "push() on rpush exception should return false"() {
given:
RedisQueue queue = new RedisQueue()
Jedis redisClient = new MockJedis("test");
queue.redisClientFactory.metaClass.getJedis = {redisClient}
redisClient.metaClass.rpush = {String key, String payload -> throw new Exception("Test Exception")}
when:
queue.start()
then:
queue.push(new HookRequest()) == false
}
def "pop() on blpop exception simple return, nothing to requeue "() {
given:
RedisQueue queue = new RedisQueue()
Jedis redisClient = new MockJedis("test");
queue.redisClientFactory.metaClass.getJedis = {redisClient}
redisClient.metaClass.blpop = {Integer timeout, String key -> throw new Exception("Test Exception")}
when:
queue.start()
queue.pop() { }
then:
queue.getSize() == 0
}
def "pop() on exception while executing closure should requeue"() {
given:
RedisQueue queue = new RedisQueue()
Jedis redisClient = new MockJedis("test");
queue.redisClientFactory.metaClass.getJedis = {redisClient}
redisClient.metaClass.rpush = {String key, String payload -> redisClient.lpush(key, payload)}
redisClient.metaClass.blpop = {Integer timeout, String key -> [key, redisClient.lpop(key)]}
redisClient.metaClass.blpop = {Integer timeout, String key -> }
when:
queue.start()
queue.push(new HookRequest())
queue.pop() { throw new Exception("Test Exception") }
then:
queue.getSize() == 1
}
}