MediumConcurrencyTestIT.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
/**
* This test is particularly flaky, and it is very hard to debug the root cause.
*/
public class MediumConcurrencyTestIT extends SPARQLBaseTest {
static final String[] queries = new String[] {
"query01", "query02", "query03", "query04", "query05", "query06", "query07", "query08", "query09",
"query10", "query11", "query12"
};
private static ExecutorService executor;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@BeforeAll
public static void beforeClass() {
executor = Executors.newFixedThreadPool(10);
}
@AfterAll
public static void afterClass() throws InterruptedException {
if (executor != null) {
executor.shutdownNow();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
}
// @RepeatedTest(1000)
@Test
@Disabled("Test is very flaky, but it's too difficult to figure out what's wrong.")
public void queryMix() throws Throwable {
/* test select query retrieving all persons (2 endpoints) */
prepareTest(Arrays.asList("/tests/medium/data1.ttl", "/tests/medium/data2.ttl", "/tests/medium/data3.ttl",
"/tests/medium/data4.ttl"));
final int MAX_QUERIES = 500;
final Random rand = new Random(12345);
final List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < MAX_QUERIES; i++) {
Future<String> f = submit(queries[rand.nextInt(queries.length)], i);
futures.add(f);
}
countDownLatch.countDown();
try {
final String message = Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
for (Future<String> f : futures) {
f.get(30, TimeUnit.SECONDS);
}
return "OK";
});
Assertions.assertEquals("OK", message);
} catch (Throwable t) {
futures.forEach(future -> future.cancel(true));
throw t;
}
log.info("Done");
}
protected Future<String> submit(final String query, final int queryId) {
return executor.submit(() -> {
countDownLatch.await();
log.info("Executing query " + queryId + ": " + query);
execute("/tests/medium/" + query + ".rq", "/tests/medium/" + query + ".srx", false, true);
// uncomment to simulate canceling case
// executeReadPartial("/tests/medium/" + query + ".rq");
return "Ok";
});
}
}