mirror of https://github.com/reiseburo/whoas
Finish implementing the InMemoryQueue
This will provide just the simplest of queueing operations for testing of Whoas but also for integration testing in users of Whoas Fixes #2
This commit is contained in:
parent
8087920b71
commit
f57db21bba
|
@ -10,7 +10,7 @@ class HookRequest {
|
|||
private DateTime deliverAfter
|
||||
|
||||
|
||||
/* Constructor for Jackson */
|
||||
/** Constructor for Jackson */
|
||||
HookRequest() { }
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,5 +22,5 @@ interface IHookQueue {
|
|||
/**
|
||||
*
|
||||
*/
|
||||
Boolean enqueue(HookRequest request)
|
||||
Boolean push(HookRequest request)
|
||||
}
|
||||
|
|
|
@ -7,29 +7,61 @@ import java.util.concurrent.LinkedBlockingQueue
|
|||
* A simple in-memory queue that offers no persistence between process restarts
|
||||
*/
|
||||
class InMemoryQueue implements IHookQueue {
|
||||
private Queue internalQueue
|
||||
private Queue<HookRequest> internalQueue
|
||||
|
||||
/**
|
||||
* Create the InMemoryQueue with it's own internal queueing implementation
|
||||
*/
|
||||
InMemoryQueue() {
|
||||
this.internalQueue = new LinkedBlockingQueue()
|
||||
this.internalQueue = new LinkedBlockingQueue<HookRequest>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the InMemoryQueue with the given Queue object
|
||||
*/
|
||||
InMemoryQueue(Queue queue) {
|
||||
InMemoryQueue(Queue<HookRequest> queue) {
|
||||
this.internalQueue = queue
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of elements in the queue
|
||||
*/
|
||||
Long getSize() {
|
||||
return this.internalQueue.size()
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a blocking pop on the queue and invokes the closure with the
|
||||
* item popped from the queue
|
||||
*
|
||||
* If the Closure throws an exception, the dequeued item will be returned
|
||||
* to the tail end of the queue
|
||||
*/
|
||||
void pop(Closure action) {
|
||||
if (action == null) {
|
||||
throw new Exception("Must provide a Closure to InMemoryQueue.pop()")
|
||||
}
|
||||
|
||||
Object item = this.internalQueue.take()
|
||||
|
||||
try {
|
||||
action.call(item)
|
||||
}
|
||||
catch (Exception ex) {
|
||||
/* Put this back on the tail end of the queue */
|
||||
this.internalQueue.put(item)
|
||||
}
|
||||
finally {
|
||||
}
|
||||
}
|
||||
|
||||
Boolean enqueue(HookRequest request) {
|
||||
/**
|
||||
* Attempt to insert the request into the queue
|
||||
*
|
||||
* If the request cannot be inserted, this method will return false,
|
||||
* otherwise true.
|
||||
*/
|
||||
Boolean push(HookRequest request) {
|
||||
return this.internalQueue.offer(request)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,16 +12,89 @@ class InMemoryQueueSpec extends Specification {
|
|||
expect:
|
||||
queue.size == 0
|
||||
}
|
||||
|
||||
def "pop()ing without a closure should throw"() {
|
||||
given:
|
||||
InMemoryQueue q = new InMemoryQueue()
|
||||
|
||||
when:
|
||||
q.pop()
|
||||
|
||||
then:
|
||||
thrown Exception
|
||||
}
|
||||
|
||||
def "push() should put onto the internal queue"() {
|
||||
given:
|
||||
InMemoryQueue queue = new InMemoryQueue()
|
||||
|
||||
when:
|
||||
queue.push(new HookRequest())
|
||||
|
||||
then:
|
||||
queue.size == 1
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class InMemoryQueueWithGivenQueueSpec extends Specification {
|
||||
def "getSize() should return 0 by default"() {
|
||||
given:
|
||||
ArrayBlockingQueue internal = new ArrayBlockingQueue(1)
|
||||
InMemoryQueue queue = new InMemoryQueue(internal)
|
||||
|
||||
protected ArrayBlockingQueue internal
|
||||
protected InMemoryQueue queue
|
||||
|
||||
def setup() {
|
||||
internal = new ArrayBlockingQueue(1)
|
||||
queue = new InMemoryQueue(internal)
|
||||
}
|
||||
|
||||
def "getSize() should return the size of the intenral queue"() {
|
||||
expect:
|
||||
queue.size == 0
|
||||
queue.size == internal.size()
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryQueueSpecWithMessage extends InMemoryQueueWithGivenQueueSpec {
|
||||
|
||||
private HookRequest request
|
||||
|
||||
def setup() {
|
||||
request = new HookRequest()
|
||||
/* Throw our req on the internal queue */
|
||||
internal.put(request)
|
||||
}
|
||||
|
||||
def "pop() should call the closure"() {
|
||||
given:
|
||||
Boolean executedClosure = false
|
||||
Boolean receivedMessage = false
|
||||
|
||||
when:
|
||||
queue.pop {
|
||||
executedClosure = true
|
||||
receivedMessage = (it == request)
|
||||
}
|
||||
|
||||
then:
|
||||
executedClosure
|
||||
receivedMessage
|
||||
}
|
||||
|
||||
def "pop() should requeue on exceptions"() {
|
||||
setup:
|
||||
queue.pop {
|
||||
throw new Exception("Spock'd!")
|
||||
}
|
||||
|
||||
expect:
|
||||
queue.size == 1
|
||||
}
|
||||
|
||||
def "push()ing more than the internal queue can handle should return false"() {
|
||||
setup:
|
||||
queue.push(request)
|
||||
|
||||
expect:
|
||||
queue.size == 1
|
||||
queue.push(request) == false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue