enhance multiple event loop test with async send from a different thread
This commit is contained in:
parent
78dbfe5a14
commit
525ddfef7d
|
@ -26,6 +26,7 @@
|
|||
import java.io.FileReader;
|
||||
import java.io.Reader;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -33,6 +34,8 @@ import org.testng.annotations.Test;
|
|||
import com.oracle.avatar.js.eventloop.EventLoop;
|
||||
import com.oracle.avatar.js.eventloop.ThreadPool;
|
||||
import com.oracle.avatar.js.log.Logging;
|
||||
import com.oracle.libuv.cb.AsyncCallback;
|
||||
import com.oracle.libuv.handles.AsyncHandle;
|
||||
|
||||
public class MultipleEventLoopTest {
|
||||
|
||||
|
@ -46,29 +49,76 @@ public class MultipleEventLoopTest {
|
|||
final int CONCURRENCY = 256;
|
||||
final Logging logging = new Logging(false);
|
||||
final EventLoop[] loops = new EventLoop[CONCURRENCY];
|
||||
final boolean[] didRun = new boolean[loops.length];
|
||||
final Thread[] threads = new Thread[loops.length];
|
||||
final AtomicBoolean[] initialized = new AtomicBoolean[loops.length];
|
||||
final AtomicBoolean[] onAsync = new AtomicBoolean[loops.length];
|
||||
final AsyncHandle[] asyncHandles = new AsyncHandle[loops.length];
|
||||
final Throwable[] exceptions = new Throwable[loops.length];
|
||||
|
||||
for (int i=0; i < loops.length; i++) {
|
||||
initialized[i] = new AtomicBoolean(false);
|
||||
onAsync[i] = new AtomicBoolean(false);
|
||||
exceptions[i] = null;
|
||||
}
|
||||
|
||||
for (int i=0; i < loops.length; i++) {
|
||||
didRun[i] = false;
|
||||
final EventLoop loop = loops[i] = new EventLoop(
|
||||
properties.getProperty("source.compatible.version"),
|
||||
properties.getProperty("libuv.compatible.version"),
|
||||
logging,
|
||||
System.getProperty("user.dir"),
|
||||
i,
|
||||
ThreadPool.getInstance(),
|
||||
true);
|
||||
final int fi = i;
|
||||
threads[i] = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final EventLoop loop = loops[fi] = new EventLoop(
|
||||
properties.getProperty("source.compatible.version"),
|
||||
properties.getProperty("libuv.compatible.version"),
|
||||
logging,
|
||||
System.getProperty("user.dir"),
|
||||
fi,
|
||||
ThreadPool.newInstance(1, 1, 1, Integer.MAX_VALUE),
|
||||
false);
|
||||
|
||||
final AsyncHandle async = asyncHandles[fi] = new AsyncHandle(loop.loop());
|
||||
async.setAsyncCallback(new AsyncCallback() {
|
||||
@Override
|
||||
public void onSend(int status) throws Exception {
|
||||
onAsync[fi].set(true);
|
||||
async.unref();
|
||||
}
|
||||
});
|
||||
|
||||
final AtomicBoolean init = initialized[fi];
|
||||
synchronized (init) {
|
||||
init.set(true);
|
||||
init.notifyAll();
|
||||
}
|
||||
|
||||
loop.run();
|
||||
} catch (Throwable ex) {
|
||||
exceptions[fi] = ex;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (int i=0; i < loops.length; i++) {
|
||||
final int fi = i;
|
||||
final AtomicBoolean init = initialized[fi];
|
||||
synchronized (init) {
|
||||
while (!init.get()) {
|
||||
init.wait();
|
||||
}
|
||||
}
|
||||
|
||||
final EventLoop loop = loops[fi];
|
||||
try (EventLoop.Handle handle = loop.acquire()) {
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println("++ " + fi);
|
||||
try {
|
||||
Thread.sleep((long) (Math.random() * 1000));
|
||||
didRun[fi] = true;
|
||||
} catch (InterruptedException ex) {
|
||||
} finally {
|
||||
System.out.println("-- " + fi);
|
||||
Thread.sleep(100 + (long) (Math.random() * 1000));
|
||||
} catch (Throwable th) {
|
||||
exceptions[fi] = th;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -76,11 +126,16 @@ public class MultipleEventLoopTest {
|
|||
}
|
||||
|
||||
for (int i=0; i < loops.length; i++) {
|
||||
loops[i].run();
|
||||
asyncHandles[i].send();
|
||||
}
|
||||
|
||||
for (int i=0; i < loops.length; i++) {
|
||||
Assert.assertTrue(didRun[i]);
|
||||
threads[i].join();
|
||||
final Throwable th = exceptions[i];
|
||||
if (th != null) {
|
||||
throw new AssertionError(th);
|
||||
}
|
||||
assert onAsync[i].get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue