AsyncConcurrencyTest.java
package com.fasterxml.jackson.core.json.async;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.core.async.AsyncTestBase;
import com.fasterxml.jackson.core.testsupport.AsyncReaderWrapper;
import static org.junit.jupiter.api.Assertions.fail;
class AsyncConcurrencyTest extends AsyncTestBase
{
final static JsonFactory JSON_F = new JsonFactory();
static {
// To make it pass, try:
// JSON_F.disable(JsonFactory.Feature.USE_THREAD_LOCAL_FOR_BUFFER_RECYCLING);
}
private final static String TEXT1 = "Short";
private final static String TEXT2 = "Some longer text";
private final static String TEXT3 = "and yet more";
private final static String TEXT4 = "... Longest yet although not superbly long still (see 'apos'?)";
final static byte[] JSON_DOC = utf8Bytes(String.format(
"[\"%s\", \"%s\",\n\"%s\",\"%s\" ]", TEXT1, TEXT2, TEXT3, TEXT4));
class WorkUnit implements AutoCloseable {
private int stage = 0;
private AsyncReaderWrapper parser;
private boolean errored = false;
public boolean process() throws Exception {
// short-cut through if this instance has already failed
if (errored) {
return false;
}
try {
switch (stage++) {
case 0:
parser = createParser();
break;
case 1:
_assert(JsonToken.START_ARRAY);
break;
case 2:
_assert(TEXT1);
break;
case 3:
_assert(TEXT2);
break;
case 4:
_assert(TEXT3);
break;
case 5:
_assert(TEXT4);
break;
case 6:
_assert(JsonToken.END_ARRAY);
break;
default:
/*
if (parser.nextToken() != null) {
throw new IOException("Unexpected token at "+stage+"; expected `null`, got "+parser.currentToken());
}
*/
close();
return true;
}
} catch (Exception e) {
errored = true;
throw e;
}
return false;
}
private void _assert(String exp) throws IOException {
_assert(JsonToken.VALUE_STRING);
String str = parser.currentText();
if (!exp.equals(str)) {
throw new IOException("Unexpected VALUE_STRING: expected '"+exp+"', got '"+str+"'");
}
}
private void _assert(JsonToken exp) throws IOException {
JsonToken t = parser.nextToken();
if (t != exp) {
throw new IOException("Unexpected token at "+stage+"; expected "+exp+", got "+t);
}
}
@Override
public void close() throws Exception {
if (parser != null) {
parser.close();
parser = null;
stage = 0;
}
}
}
// [jackson-core#476]
@Test
void concurrentAsync() throws Exception
{
final int MAX_ROUNDS = 30;
for (int i = 0; i < MAX_ROUNDS; ++i) {
_testConcurrentAsyncOnce(i, MAX_ROUNDS);
}
}
void _testConcurrentAsyncOnce(final int round, final int maxRounds) throws Exception
{
final int numThreads = 3;
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
final AtomicInteger errorCount = new AtomicInteger(0);
final AtomicInteger completedCount = new AtomicInteger(0);
final AtomicReference<String> errorRef = new AtomicReference<>();
// First, add a few shared work units
final ArrayBlockingQueue<WorkUnit> q = new ArrayBlockingQueue<>(20);
for (int i = 0; i < 7; ++i) {
q.add(new WorkUnit());
}
// then invoke swarm of workers on it...
final int REP_COUNT = 99000;
ArrayList<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < REP_COUNT; i++) {
Callable<Void> c = () -> {
WorkUnit w = q.take();
try {
if (w.process()) {
completedCount.incrementAndGet();
}
} catch (Throwable t) {
if (errorCount.getAndIncrement() == 0) {
errorRef.set(t.toString());
}
} finally {
q.add(w);
}
return null;
};
futures.add(executor.submit(c));
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
int count = errorCount.get();
if (count > 0) {
fail("Expected no problems (round "+round+"/"+maxRounds
+"); got "+count+", first with: "+errorRef.get());
}
final int EXP_COMPL = ((REP_COUNT + 7) / 8);
int compl = completedCount.get();
if (compl < (EXP_COMPL-10) || compl > EXP_COMPL) {
fail("Expected about "+EXP_COMPL+" completed rounds, got: "+compl);
}
while (!q.isEmpty()) {
q.take().close();
}
}
protected AsyncReaderWrapper createParser() throws IOException {
return asyncForBytes(JSON_F, 100, JSON_DOC, 0);
}
}