ServerBootSignalIT.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.tools.serverboot;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.config.RepositoryConfig;
import org.eclipse.rdf4j.repository.config.RepositoryConfigException;
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig;
import org.eclipse.rdf4j.sail.memory.config.MemoryStoreConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@EnabledOnOs({ OS.LINUX, OS.MAC })
class ServerBootSignalIT {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerBootSignalIT.class);
private ExecutorService streamExecutor;
private final List<Runnable> cleanupActions = new ArrayList<>();
@BeforeEach
void setUp() {
streamExecutor = Executors.newFixedThreadPool(2, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("server-boot-signal-it");
return thread;
});
}
@AfterEach
void tearDown() {
for (Runnable cleanup : cleanupActions) {
try {
cleanup.run();
} catch (Exception ignored) {
// best-effort cleanup
}
}
streamExecutor.shutdownNow();
}
@Test
@Disabled("Disabled due to flakiness on CI servers")
void gracefullyStopsOnSigint() throws Exception {
assertGracefulShutdownWithSigintFallback();
}
@Test
void gracefullyStopsOnSigterm() throws Exception {
assertGracefulShutdown("TERM");
}
private void assertGracefulShutdownWithSigintFallback() throws Exception {
assertGracefulShutdown("INT", true);
}
private void assertGracefulShutdown(String signalName) throws Exception {
assertGracefulShutdown(signalName, false);
}
private void assertGracefulShutdown(String signalName, boolean allowSigtermFallback) throws Exception {
Path projectRoot = Path.of("").toAbsolutePath();
String javaBin = Path.of(System.getProperty("java.home"), "bin", "java").toString();
int serverPort = findFreePort();
int managementPort = findFreePort();
// Find the executable JAR
Path targetDir = projectRoot.resolve("target");
Path jarPath = Files.list(targetDir)
.sorted(Comparator.comparing(Path::toString))
.filter(p -> p.toString().endsWith(".jar"))
.filter(p -> !p.toString().endsWith("-sources.jar"))
.filter(p -> !p.toString().endsWith("-javadoc.jar"))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Could not find executable JAR in " + targetDir));
ProcessBuilder processBuilder = new ProcessBuilder(javaBin, "-jar", jarPath.toString(),
"--server.port=" + serverPort,
"--management.server.port=" + managementPort);
processBuilder.directory(projectRoot.toFile());
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
cleanupActions.add(() -> process.destroyForcibly());
CountDownLatch started = new CountDownLatch(1);
StringBuilder outputBuffer = new StringBuilder();
startStreamGobbler(process, started, outputBuffer);
boolean startedInTime = started.await(90, SECONDS);
assertThat(startedInTime)
.as(() -> "Server failed to start within timeout. Output:\\n" + outputBuffer)
.isTrue();
String serverUrl = serverUrl(serverPort);
exerciseRemoteRepository(serverUrl, outputBuffer);
long pid = process.pid();
sendSignal(pid, signalName);
boolean exited = process.waitFor(allowSigtermFallback ? 5 : 30, SECONDS);
if (!exited && allowSigtermFallback) {
LOGGER.warn("Server did not exit on SIGINT within 5 seconds. Sending SIGTERM.");
sendSignal(pid, "TERM");
exited = process.waitFor(5, SECONDS);
assertThat(exited)
.as(() -> "Process did not exit after SIGTERM. Output:\\n" + outputBuffer)
.isTrue();
}
assertThat(exited)
.as(() -> "Process did not exit after SIG" + signalName + ". Output:\\n" + outputBuffer)
.isTrue();
assertThat(process.exitValue())
.as(() -> "Process exit value after SIG" + signalName + ". Output:\\n" + outputBuffer)
.isEqualTo(0);
}
private void startStreamGobbler(Process process, CountDownLatch started, StringBuilder outputBuffer) {
AtomicBoolean signalLogged = new AtomicBoolean(false);
streamExecutor.submit(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
synchronized (outputBuffer) {
outputBuffer.append(line).append(System.lineSeparator());
}
if (!signalLogged.get() && (line.contains("Started Rdf4jServerWorkbenchApplication")
|| line.contains("Initializing Spring DispatcherServlet 'rdf4jServer'"))) {
started.countDown();
signalLogged.set(true);
}
}
} catch (IOException e) {
synchronized (outputBuffer) {
outputBuffer.append("Failed to read process output: ")
.append(e.getMessage())
.append(System.lineSeparator());
}
}
});
}
private void sendSignal(long pid, String signalName) throws IOException, InterruptedException {
Process signalProcess = new ProcessBuilder("kill", "-s", signalName, Long.toString(pid))
.start();
cleanupActions.add(() -> signalProcess.destroyForcibly());
if (!signalProcess.waitFor(5, SECONDS)) {
signalProcess.destroyForcibly();
signalProcess.waitFor(5, SECONDS);
}
}
private void exerciseRemoteRepository(String serverUrl, StringBuilder outputBuffer)
throws InterruptedException, RepositoryException, RepositoryConfigException {
RemoteRepositoryManager manager = awaitRepositoryManager(serverUrl, outputBuffer);
String repoId = "signal-" + UUID.randomUUID();
try {
RepositoryConfig config = new RepositoryConfig(repoId,
new SailRepositoryConfig(new MemoryStoreConfig()));
manager.addRepositoryConfig(config);
Repository repository = manager.getRepository(repoId);
repository.init();
ValueFactory valueFactory = SimpleValueFactory.getInstance();
IRI subject = valueFactory.createIRI("urn:signal:test");
IRI predicate = valueFactory.createIRI("urn:signal:predicate");
Literal object = valueFactory.createLiteral("signal");
try (RepositoryConnection connection = repository.getConnection()) {
connection.add(subject, predicate, object);
TupleQuery query = connection.prepareTupleQuery(
"select ?o where { <urn:signal:test> <urn:signal:predicate> ?o }");
try (TupleQueryResult result = query.evaluate()) {
assertThat(result.hasNext())
.as("Tuple query returned a result row")
.isTrue();
assertThat(result.next().getValue("o"))
.as("Tuple query binding value")
.isEqualTo(object);
}
} finally {
repository.shutDown();
}
} finally {
try {
manager.removeRepository(repoId);
} catch (RepositoryException ignored) {
// best-effort cleanup
}
manager.shutDown();
}
}
private RemoteRepositoryManager awaitRepositoryManager(String serverUrl, StringBuilder outputBuffer)
throws InterruptedException {
RepositoryException lastException = null;
long deadline = System.nanoTime() + SECONDS.toNanos(90);
while (System.nanoTime() < deadline) {
RemoteRepositoryManager manager = null;
try {
manager = RemoteRepositoryManager.getInstance(serverUrl);
manager.getRepositoryIDs();
return manager;
} catch (RepositoryException e) {
lastException = e;
if (manager != null) {
try {
manager.shutDown();
} catch (RepositoryException ignored) {
// ignore cleanup failure
}
}
Thread.sleep(500);
}
}
String errorMessage = "Timed out connecting to " + serverUrl + " Output:\\n" + outputBuffer
+ (lastException == null ? "" : ("\nLast error: " + lastException));
fail(errorMessage);
return null;
}
private String serverUrl(int port) {
return "http://localhost:" + port + "/rdf4j-server";
}
private int findFreePort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
return socket.getLocalPort();
}
}
}