make acquire safe to call from non-main thread

Reviewed-by: batsatt
This commit is contained in:
akhil 2014-02-02 10:39:24 -08:00
parent edf0d64020
commit e3dd1db9eb
1 changed files with 46 additions and 24 deletions

View File

@ -50,6 +50,7 @@ import com.oracle.libuv.cb.CallbackHandler;
import com.oracle.libuv.cb.CallbackHandlerFactory;
import com.oracle.libuv.cb.ContextProvider;
import com.oracle.libuv.handles.AsyncHandle;
import com.oracle.libuv.handles.CheckHandle;
import com.oracle.libuv.handles.LoopHandle;
import jdk.nashorn.api.scripting.NashornException;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
@ -68,7 +69,9 @@ public final class EventLoop {
private final Logger LOG;
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
private final AtomicInteger hooks = new AtomicInteger(0);
private final AsyncHandle asyncHandle;
private final CheckHandle checkHandle;
private final AsyncHandle refHandle;
private final AsyncHandle unrefHandle;
private final AsyncHandle interruptMainLoopHandle;
private final Thread mainThread;
private final boolean sharedExecutor;
@ -83,20 +86,25 @@ public final class EventLoop {
public static final class Handle implements AutoCloseable {
private final AtomicInteger hooks;
private final AsyncHandle asyncHandle;
private final AsyncHandle refHandle;
private final AsyncHandle unrefHandle;
public Handle(final AtomicInteger hooks, final AsyncHandle asyncHandle) {
public Handle(final AtomicInteger hooks,
final AsyncHandle refHandle,
final AsyncHandle unrefHandle) {
this.hooks = hooks;
this.asyncHandle = asyncHandle;
this.refHandle = refHandle;
this.unrefHandle = unrefHandle;
if (hooks.incrementAndGet() == 1) {
asyncHandle.ref();
this.refHandle.send();
}
}
@Override
public void close() {
hooks.decrementAndGet();
asyncHandle.send();
if (hooks.decrementAndGet() == 0) {
this.unrefHandle.send();
}
}
public void release() {
@ -110,7 +118,7 @@ public final class EventLoop {
}
public Handle acquire() {
return new Handle(hooks, asyncHandle);
return new Handle(hooks, refHandle, unrefHandle);
}
public void setUncaughtExceptionHandler(final Callback registered, final Callback handler) {
@ -138,13 +146,17 @@ public final class EventLoop {
public void run() throws Throwable {
assert Thread.currentThread() == mainThread : "called from non-event thread " + Thread.currentThread().getName();
executor.allowCoreThreadTimeOut(true);
uvLoop.run();
// throw pending exception, if any
if (pendingException != null) {
final Throwable pex = pendingException;
pendingException = null;
throw pex;
}
do {
processQueuedEvents();
uvLoop.run();
// throw pending exception, if any
if (pendingException != null) {
final Throwable pex = pendingException;
pendingException = null;
throw pex;
}
} while (hooks.get() > 0 || eventQueue.peek() != null);
}
/**
@ -216,7 +228,9 @@ public final class EventLoop {
if (!sharedExecutor) {
executor.shutdown();
}
asyncHandle.close();
checkHandle.close();
refHandle.close();
unrefHandle.close();
uvLoop.stop();
}
}
@ -429,18 +443,26 @@ public final class EventLoop {
LibUV.chdir(workDir);
LOG = logger("eventloop");
asyncHandle = new AsyncHandle(uvLoop);
asyncHandle.setAsyncCallback(new AsyncCallback() {
checkHandle = new CheckHandle(uvLoop);
checkHandle.unref();
refHandle = new AsyncHandle(uvLoop);
refHandle.setAsyncCallback(new AsyncCallback() {
@Override
public void onSend(int status) throws Exception {
// The side effect of this callback being called is that posted
// events have been processed.
if (hooks.get() <= 0) {
asyncHandle.unref();
}
checkHandle.ref();
}
});
asyncHandle.unref();
refHandle.unref();
unrefHandle = new AsyncHandle(uvLoop);
unrefHandle.setAsyncCallback(new AsyncCallback() {
@Override
public void onSend(int status) throws Exception {
checkHandle.unref();
}
});
unrefHandle.unref();
interruptMainLoopHandle = new AsyncHandle(uvLoop);
interruptMainLoopHandle.setAsyncCallback(new AsyncCallback() {