Fix for AVATAR_JS-70, remove IdleHandle active polling, acquire/release and AsyncHandle

Reviewed-by: asquare
This commit is contained in:
jfdenise 2013-11-19 19:17:19 +01:00
parent 3540487d63
commit ef5b0ad07d
5 changed files with 96 additions and 36 deletions

View File

@ -902,7 +902,7 @@ public final class Crypto {
}
private void submitToLoop(final Callable<?> callable, final Callback cb) {
final EventLoop.Handle handle = eventLoop.grab();
final EventLoop.Handle handle = eventLoop.acquire();
eventLoop.submit(new Runnable() {
@Override
public void run() {

View File

@ -42,7 +42,7 @@ public final class DNS {
public void getHostByAddress(final String address,
final Callback callback) {
final EventLoop.Handle handle = eventLoop.grab();
final EventLoop.Handle handle = eventLoop.acquire();
eventLoop.submit(new Runnable() {
@Override
public void run() {
@ -64,7 +64,7 @@ public final class DNS {
public void getAddressByHost(final String hostname,
final Callback callback) {
final EventLoop.Handle handle = eventLoop.grab();
final EventLoop.Handle handle = eventLoop.acquire();
eventLoop.submit(new Runnable() {
@Override
public void run() {

View File

@ -35,19 +35,17 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.script.ScriptException;
import jdk.nashorn.api.scripting.NashornException;
import net.java.avatar.js.dns.DNS;
import net.java.avatar.js.eventloop.Callback;
import net.java.avatar.js.log.Logger;
import net.java.avatar.js.log.Logging;
import net.java.libuv.LibUV;
import net.java.libuv.cb.AsyncCallback;
import net.java.libuv.cb.CallbackExceptionHandler;
import net.java.libuv.cb.IdleCallback;
import net.java.libuv.handles.IdleHandle;
import net.java.libuv.handles.AsyncHandle;
import net.java.libuv.handles.LoopHandle;
public final class EventLoop {
@ -62,8 +60,7 @@ public final class EventLoop {
private final Logger LOG;
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
private final AtomicInteger hooks = new AtomicInteger(0);
private final IdleHandle idleHandle;
private final AtomicBoolean idleHandleStarted = new AtomicBoolean(false);
private final AsyncHandle asyncHandle;
private final Thread mainThread;
private Callback isHandlerRegistered = null;
@ -73,15 +70,23 @@ public final class EventLoop {
public static final class Handle implements AutoCloseable {
private final AtomicInteger hooks;
public Handle(final AtomicInteger hooks) {
private final AsyncHandle asyncHandle;
public Handle(final AtomicInteger hooks, final AsyncHandle asyncHandle) {
this.hooks = hooks;
hooks.incrementAndGet();
this.asyncHandle = asyncHandle;
if (hooks.incrementAndGet() == 1) {
asyncHandle.ref();
}
}
@Override
public void close() {
hooks.decrementAndGet();
asyncHandle.send();
}
public void release() {
close();
}
@Override
@ -90,8 +95,8 @@ public final class EventLoop {
}
}
public Handle grab() {
return new Handle(hooks);
public Handle acquire() {
return new Handle(hooks, asyncHandle);
}
public void setUncaughtExceptionHandler(final Callback registered, final Callback handler) {
@ -104,6 +109,10 @@ public final class EventLoop {
eventQueue.add(event);
}
public void post(final Callback cb, Object... args) {
eventQueue.add(new Event(null, cb, args));
}
public void post(final Event event) {
eventQueue.add(event);
}
@ -167,6 +176,7 @@ public final class EventLoop {
public void stop() {
executor.shutdown();
asyncHandle.close();
uvLoop.stop();
}
@ -270,15 +280,6 @@ public final class EventLoop {
}
};
}
// If submit is not called from main thread, there is no guarantee that idleHandle.start()
// will be taken into account by uv loop.
if (Thread.currentThread() != mainThread) {
assert idleHandleStarted.get() : "idleHandle not started although called "
+ "from non-event thread " + Thread.currentThread().getName();
}
if (idleHandleStarted.compareAndSet(false, true)) {
idleHandle.start();
}
return executor.submit(toSubmit);
}
@ -291,8 +292,7 @@ public final class EventLoop {
"tasks: " + executor.queuedTasksCount() + ", " +
"activeThreads: " + executor.getActiveCount() + ", " +
"threads: " + executor.getPoolSize() + ", " +
"pending: " + eventQueue.size() + ", " +
"idleHandleStarted: " + idleHandleStarted.get() +
"pending: " + eventQueue.size() +
"}}";
}
@ -337,20 +337,18 @@ public final class EventLoop {
LibUV.chdir(workDir);
LOG = logger("eventloop");
idleHandle = new IdleHandle(uvLoop);
idleHandle.setIdleCallback(new IdleCallback() {
asyncHandle = new AsyncHandle(uvLoop);
asyncHandle.setAsyncCallback(new AsyncCallback() {
@Override
public void call(int status) throws Exception {
// process pending events in this cycle
// have been posted by background threads
processQueuedEvents();
// No more bg task and no more Events to process, stop idleHandle
if (hooks.get() == 0 && eventQueue.peek() == null) {
idleHandle.stop();
idleHandleStarted.set(false);
public void onSend(int status) throws Exception {
// The side effect of this callback being called is that posted
// events have been processed.
if (hooks.get() <= 0) {
asyncHandle.unref();
}
}
});
asyncHandle.unref();
}

View File

@ -188,7 +188,7 @@ public abstract class Writer {
}
public void submitToLoop(final Callable<?> callable, final Callback cb) {
final EventLoop.Handle handle = eventLoop.grab();
final EventLoop.Handle handle = eventLoop.acquire();
eventLoop.submit(new Runnable() {
@Override
public void run() {

View File

@ -0,0 +1,62 @@
/*
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
var evtloop = __avatar.eventloop;
var count = new java.util.concurrent.atomic.AtomicInteger(0);
var NUM_THREADS = 1000;
var POSTED = 12;
function f2() {
count.incrementAndGet();
}
function createClosure(i) {
return function() {
java.lang.Thread.sleep(500);
print("Thread " + i + " started");
for(var j = 0; j < 10; j++) {
evtloop.post(f2);
}
java.lang.Thread.sleep(100);
evtloop.post(new Packages.net.java.avatar.js.eventloop.Event("", f2));
print("Thread " + i + " done");
evtloop.post(new Packages.net.java.avatar.js.eventloop.Event("", f2));
handle.release();
}
}
for(var i = 0 ; i < NUM_THREADS; i++) {
var handle = evtloop.acquire();
var thr = new java.lang.Thread(createClosure(i));
thr.setDaemon(true);
thr.start();
}
console.log("End synchronous script");
process.on('exit', function(e) {
print("Exiting ");
if(count.get() != NUM_THREADS * POSTED) {
throw new Error("COunt is not the expected one " + count.get());
}
})