ElasticsearchStoreTestContainerSupport.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.sail.elasticsearchstore;

import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.opentest4j.TestAbortedException;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

/**
 * Test-only helper that lazily starts a single Elasticsearch container and exposes its connection details.
 */
@Testcontainers
public final class ElasticsearchStoreTestContainerSupport {

	private static final String CLUSTER_NAME = "test";

	@Container
	private static final GenericContainer<?> container = createContainer();
	private static String host;
	private static int httpPort;
	private static int transportPort;

	private ElasticsearchStoreTestContainerSupport() {
	}

	public static synchronized void start() {
		try {
			if (!container.isRunning()) {
				container.start();
			}
		} catch (IllegalStateException e) {
			throw new TestAbortedException("Docker is required to run Elasticsearch store tests. Container logs:\n"
					+ safeLogs(container), e);
		}

		host = container.getHost();
		httpPort = container.getMappedPort(9200);
		transportPort = container.getMappedPort(9300);

		if (!container.isRunning()) {
			throw new TestAbortedException("Elasticsearch test container failed to stay running. Logs:\n"
					+ safeLogs(container));
		}

		waitForClusterReady();
	}

	private static void waitForClusterReady() {
		if (container == null || !container.isRunning()) {
			throw new IllegalStateException(
					"Elasticsearch test container stopped before health check. Logs:\n" + safeLogs(container));
		}

		long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(60);
		Exception lastFailure = null;

		while (System.nanoTime() < deadline) {
			if (!container.isRunning()) {
				throw new IllegalStateException(
						"Elasticsearch test container stopped during health check. Logs:\n" + safeLogs(container));
			}
			try (RestHighLevelClient client = new RestHighLevelClient(
					RestClient.builder(new HttpHost(host, httpPort, "http")))) {
				ClusterHealthRequest request = new ClusterHealthRequest()
						.waitForYellowStatus()
						.timeout(TimeValue.timeValueSeconds(5));
				ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
				if (!response.isTimedOut()) {
					return;
				}
				lastFailure = new IllegalStateException("Cluster health timed out waiting for YELLOW status");
			} catch (Exception e) {
				lastFailure = e;
			}

			try {
				Thread.sleep(10);
			} catch (InterruptedException ie) {
				Thread.currentThread().interrupt();
				throw new IllegalStateException("Interrupted while waiting for Elasticsearch test cluster", ie);
			}
		}

		throw new IllegalStateException("Timed out waiting for Elasticsearch test cluster to become ready",
				lastFailure);
	}

	public static String getHost() {
		start();
		return host;
	}

	public static int getHttpPort() {
		start();
		return httpPort;
	}

	public static int getTransportPort() {
		start();
		return transportPort;
	}

	public static String getClusterName() {
		return CLUSTER_NAME;
	}

	public static GenericContainer<?> getContainer() {
		return container;
	}

	private static GenericContainer<?> createContainer() {
		String esVersion = System.getProperty("elasticsearch.docker.version",
				System.getProperty("elasticsearch.version", "7.15.2"));

		DockerImageName imageName = DockerImageName
				.parse("docker.elastic.co/elasticsearch/elasticsearch:" + esVersion)
				.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");

		return new GenericContainer<>(imageName)
				.withEnv("discovery.type", "single-node")
				.withEnv("cluster.name", CLUSTER_NAME)
				.withEnv("ES_JAVA_OPTS",
						"-Djdk.disableLastUsageTracking=true -XX:-UseContainerSupport -Xms512m -Xmx512m")
				.withEnv("JDK_JAVA_OPTIONS",
						"-Djdk.disableLastUsageTracking=true -XX:-UseContainerSupport -Xms512m -Xmx512m")
				.withEnv("JAVA_TOOL_OPTIONS",
						"-Djdk.disableLastUsageTracking=true -XX:-UseContainerSupport -Xms512m -Xmx512m")
				.withExposedPorts(9200, 9300);
	}

	private static String safeLogs(GenericContainer<?> c) {
		if (c == null) {
			return "Container not created";
		}
		try {
			return c.getLogs();
		} catch (Exception e) {
			return "Unable to read container logs: " + e.getMessage();
		}
	}
}