ServerBootSignalIT.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.tools.serverboot;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.config.RepositoryConfig;
import org.eclipse.rdf4j.repository.config.RepositoryConfigException;
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
import org.eclipse.rdf4j.repository.sail.config.SailRepositoryConfig;
import org.eclipse.rdf4j.sail.memory.config.MemoryStoreConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EnabledOnOs({ OS.LINUX, OS.MAC })
class ServerBootSignalIT {

	private static final Logger LOGGER = LoggerFactory.getLogger(ServerBootSignalIT.class);

	private ExecutorService streamExecutor;
	private final List<Runnable> cleanupActions = new ArrayList<>();

	@BeforeEach
	void setUp() {
		streamExecutor = Executors.newFixedThreadPool(2, runnable -> {
			Thread thread = new Thread(runnable);
			thread.setDaemon(true);
			thread.setName("server-boot-signal-it");
			return thread;
		});
	}

	@AfterEach
	void tearDown() {
		for (Runnable cleanup : cleanupActions) {
			try {
				cleanup.run();
			} catch (Exception ignored) {
				// best-effort cleanup
			}
		}
		streamExecutor.shutdownNow();
	}

	@Test
	@Disabled("Disabled due to flakiness on CI servers")
	void gracefullyStopsOnSigint() throws Exception {
		assertGracefulShutdownWithSigintFallback();
	}

	@Test
	void gracefullyStopsOnSigterm() throws Exception {
		assertGracefulShutdown("TERM");
	}

	private void assertGracefulShutdownWithSigintFallback() throws Exception {
		assertGracefulShutdown("INT", true);
	}

	private void assertGracefulShutdown(String signalName) throws Exception {
		assertGracefulShutdown(signalName, false);
	}

	private void assertGracefulShutdown(String signalName, boolean allowSigtermFallback) throws Exception {
		Path projectRoot = Path.of("").toAbsolutePath();
		String javaBin = Path.of(System.getProperty("java.home"), "bin", "java").toString();
		int serverPort = findFreePort();
		int managementPort = findFreePort();

		// Find the executable JAR
		Path targetDir = projectRoot.resolve("target");
		Path jarPath = Files.list(targetDir)
				.sorted(Comparator.comparing(Path::toString))
				.filter(p -> p.toString().endsWith(".jar"))
				.filter(p -> !p.toString().endsWith("-sources.jar"))
				.filter(p -> !p.toString().endsWith("-javadoc.jar"))
				.findFirst()
				.orElseThrow(() -> new IllegalStateException("Could not find executable JAR in " + targetDir));

		ProcessBuilder processBuilder = new ProcessBuilder(javaBin, "-jar", jarPath.toString(),
				"--server.port=" + serverPort,
				"--management.server.port=" + managementPort);
		processBuilder.directory(projectRoot.toFile());
		processBuilder.redirectErrorStream(true);

		Process process = processBuilder.start();
		cleanupActions.add(() -> process.destroyForcibly());

		CountDownLatch started = new CountDownLatch(1);
		StringBuilder outputBuffer = new StringBuilder();
		startStreamGobbler(process, started, outputBuffer);

		boolean startedInTime = started.await(90, SECONDS);
		assertThat(startedInTime)
				.as(() -> "Server failed to start within timeout. Output:\\n" + outputBuffer)
				.isTrue();

		String serverUrl = serverUrl(serverPort);
		exerciseRemoteRepository(serverUrl, outputBuffer);

		long pid = process.pid();
		sendSignal(pid, signalName);

		boolean exited = process.waitFor(allowSigtermFallback ? 5 : 30, SECONDS);
		if (!exited && allowSigtermFallback) {
			LOGGER.warn("Server did not exit on SIGINT within 5 seconds. Sending SIGTERM.");
			sendSignal(pid, "TERM");
			exited = process.waitFor(5, SECONDS);
			assertThat(exited)
					.as(() -> "Process did not exit after SIGTERM. Output:\\n" + outputBuffer)
					.isTrue();
		}
		assertThat(exited)
				.as(() -> "Process did not exit after SIG" + signalName + ". Output:\\n" + outputBuffer)
				.isTrue();
		assertThat(process.exitValue())
				.as(() -> "Process exit value after SIG" + signalName + ". Output:\\n" + outputBuffer)
				.isEqualTo(0);
	}

	private void startStreamGobbler(Process process, CountDownLatch started, StringBuilder outputBuffer) {
		AtomicBoolean signalLogged = new AtomicBoolean(false);
		streamExecutor.submit(() -> {
			try (BufferedReader reader = new BufferedReader(
					new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
				String line;
				while ((line = reader.readLine()) != null) {
					synchronized (outputBuffer) {
						outputBuffer.append(line).append(System.lineSeparator());
					}
					if (!signalLogged.get() && (line.contains("Started Rdf4jServerWorkbenchApplication")
							|| line.contains("Initializing Spring DispatcherServlet 'rdf4jServer'"))) {
						started.countDown();
						signalLogged.set(true);
					}
				}
			} catch (IOException e) {
				synchronized (outputBuffer) {
					outputBuffer.append("Failed to read process output: ")
							.append(e.getMessage())
							.append(System.lineSeparator());
				}
			}
		});
	}

	private void sendSignal(long pid, String signalName) throws IOException, InterruptedException {
		Process signalProcess = new ProcessBuilder("kill", "-s", signalName, Long.toString(pid))
				.start();
		cleanupActions.add(() -> signalProcess.destroyForcibly());
		if (!signalProcess.waitFor(5, SECONDS)) {
			signalProcess.destroyForcibly();
			signalProcess.waitFor(5, SECONDS);
		}
	}

	private void exerciseRemoteRepository(String serverUrl, StringBuilder outputBuffer)
			throws InterruptedException, RepositoryException, RepositoryConfigException {
		RemoteRepositoryManager manager = awaitRepositoryManager(serverUrl, outputBuffer);
		String repoId = "signal-" + UUID.randomUUID();
		try {
			RepositoryConfig config = new RepositoryConfig(repoId,
					new SailRepositoryConfig(new MemoryStoreConfig()));
			manager.addRepositoryConfig(config);

			Repository repository = manager.getRepository(repoId);
			repository.init();

			ValueFactory valueFactory = SimpleValueFactory.getInstance();
			IRI subject = valueFactory.createIRI("urn:signal:test");
			IRI predicate = valueFactory.createIRI("urn:signal:predicate");
			Literal object = valueFactory.createLiteral("signal");

			try (RepositoryConnection connection = repository.getConnection()) {
				connection.add(subject, predicate, object);
				TupleQuery query = connection.prepareTupleQuery(
						"select ?o where { <urn:signal:test> <urn:signal:predicate> ?o }");
				try (TupleQueryResult result = query.evaluate()) {
					assertThat(result.hasNext())
							.as("Tuple query returned a result row")
							.isTrue();
					assertThat(result.next().getValue("o"))
							.as("Tuple query binding value")
							.isEqualTo(object);
				}
			} finally {
				repository.shutDown();
			}
		} finally {
			try {
				manager.removeRepository(repoId);
			} catch (RepositoryException ignored) {
				// best-effort cleanup
			}
			manager.shutDown();
		}
	}

	private RemoteRepositoryManager awaitRepositoryManager(String serverUrl, StringBuilder outputBuffer)
			throws InterruptedException {
		RepositoryException lastException = null;
		long deadline = System.nanoTime() + SECONDS.toNanos(90);
		while (System.nanoTime() < deadline) {
			RemoteRepositoryManager manager = null;
			try {
				manager = RemoteRepositoryManager.getInstance(serverUrl);
				manager.getRepositoryIDs();
				return manager;
			} catch (RepositoryException e) {
				lastException = e;
				if (manager != null) {
					try {
						manager.shutDown();
					} catch (RepositoryException ignored) {
						// ignore cleanup failure
					}
				}
				Thread.sleep(500);
			}
		}
		String errorMessage = "Timed out connecting to " + serverUrl + " Output:\\n" + outputBuffer
				+ (lastException == null ? "" : ("\nLast error: " + lastException));
		fail(errorMessage);
		return null;
	}

	private String serverUrl(int port) {
		return "http://localhost:" + port + "/rdf4j-server";
	}

	private int findFreePort() throws IOException {
		try (ServerSocket socket = new ServerSocket(0)) {
			socket.setReuseAddress(true);
			return socket.getLocalPort();
		}
	}
}