mirror of https://github.com/reiseburo/whoas
Move InMemoryQueue into Java, refactor some Groovy-specific interfaces away
This commit is contained in:
parent
a79048c9e8
commit
4fa2667c16
|
@ -28,12 +28,12 @@ abstract class AbstractHookQueue {
|
|||
* Return the size of the queue, may not be implemented by some providers
|
||||
* in which case it will return -1
|
||||
*/
|
||||
abstract Long getSize()
|
||||
abstract int getSize()
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
abstract void pop(Closure action)
|
||||
abstract void pop(QueueAction action) throws Exception
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -1,44 +1,44 @@
|
|||
package com.github.lookout.whoas
|
||||
package com.github.lookout.whoas;
|
||||
|
||||
import java.util.Queue
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;;
|
||||
|
||||
|
||||
/**
|
||||
* A simple in-memory queue that offers no persistence between process restarts
|
||||
*/
|
||||
class InMemoryQueue extends AbstractHookQueue {
|
||||
private Queue<HookRequest> internalQueue
|
||||
private Logger logger = LoggerFactory.getLogger(InMemoryQueue.class)
|
||||
public class InMemoryQueue extends AbstractHookQueue {
|
||||
private BlockingQueue<HookRequest> internalQueue;
|
||||
private Logger logger = LoggerFactory.getLogger(InMemoryQueue.class);
|
||||
|
||||
/**
|
||||
* Create the InMemoryQueue from configuration
|
||||
*/
|
||||
InMemoryQueue(WhoasQueueConfig queueConfig) {
|
||||
this.internalQueue = new LinkedBlockingQueue<HookRequest>()
|
||||
public InMemoryQueue(WhoasQueueConfig queueConfig) {
|
||||
this.internalQueue = new LinkedBlockingQueue<HookRequest>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
*/
|
||||
InMemoryQueue() {
|
||||
this.internalQueue = new LinkedBlockingQueue<HookRequest>()
|
||||
public InMemoryQueue() {
|
||||
this.internalQueue = new LinkedBlockingQueue<HookRequest>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the InMemoryQueue with the given Queue object
|
||||
*/
|
||||
InMemoryQueue(Queue<HookRequest> queue) {
|
||||
this.internalQueue = queue
|
||||
public InMemoryQueue(BlockingQueue<HookRequest> queue) {
|
||||
this.internalQueue = queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of elements in the queue
|
||||
*/
|
||||
Long getSize() {
|
||||
return this.internalQueue.size()
|
||||
public int getSize() {
|
||||
return this.internalQueue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -48,21 +48,20 @@ class InMemoryQueue extends AbstractHookQueue {
|
|||
* If the Closure throws an exception, the dequeued item will be returned
|
||||
* to the tail end of the queue
|
||||
*/
|
||||
void pop(Closure action) {
|
||||
public void pop(QueueAction action) throws InterruptedException, Exception {
|
||||
if (action == null) {
|
||||
throw new Exception("Must provide a Closure to InMemoryQueue.pop()")
|
||||
throw new Exception("Must provide a Closure to InMemoryQueue.pop()");
|
||||
}
|
||||
|
||||
Object item = this.internalQueue.take()
|
||||
HookRequest item = this.internalQueue.take();
|
||||
|
||||
try {
|
||||
action.call(item)
|
||||
action.call(item);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
||||
/* Put this back on the tail end of the queue */
|
||||
logger.info("\"Pop\" on in-memory queue failed, putting it back on the tail-end", ex)
|
||||
this.internalQueue.put(item)
|
||||
logger.info("\"Pop\" on in-memory queue failed, putting it back on the tail-end", ex);
|
||||
this.internalQueue.put(item);
|
||||
}
|
||||
finally {
|
||||
}
|
||||
|
@ -74,7 +73,7 @@ class InMemoryQueue extends AbstractHookQueue {
|
|||
* If the request cannot be inserted, this method will return false,
|
||||
* otherwise true.
|
||||
*/
|
||||
Boolean push(HookRequest request) {
|
||||
return this.internalQueue.offer(request)
|
||||
public Boolean push(HookRequest request) {
|
||||
return this.internalQueue.offer(request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package com.github.lookout.whoas;
|
||||
|
||||
|
||||
public interface QueueAction {
|
||||
public void call(HookRequest request);
|
||||
}
|
|
@ -36,7 +36,7 @@ class RedisQueue extends AbstractHookQueue {
|
|||
/**
|
||||
* Return the number of elements in the queue
|
||||
*/
|
||||
Long getSize() {
|
||||
int getSize() {
|
||||
if (!this.started) {
|
||||
throw new Exception("Queue must be started before this operation is invoked")
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ class RedisQueue extends AbstractHookQueue {
|
|||
* If the Closure throws an exception, the dequeued item will be returned
|
||||
* to the tail end of the queue
|
||||
*/
|
||||
void pop(Closure action) {
|
||||
void pop(QueueAction action) {
|
||||
if (action == null) {
|
||||
throw new Exception("Must provide a Closure to RedisQueue.pop()")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue