SharedHttpClientSessionManager.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.http.client;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.http.HttpConnection;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.ServiceUnavailableRetryStrategy;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.protocol.HttpContext;
import org.eclipse.rdf4j.http.client.util.HttpClientBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A Manager for HTTP sessions that uses a shared {@link HttpClient} to manage HTTP connections.
 *
 * @author James Leigh
 */
public class SharedHttpClientSessionManager implements HttpClientSessionManager, HttpClientDependent {

	private static final AtomicLong threadCount = new AtomicLong();

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.executors.corePoolSize} for specifying the
	 * background executor core thread pool size.
	 */
	public static final String CORE_POOL_SIZE_PROPERTY = "org.eclipse.rdf4j.client.executors.corePoolSize";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.http.maxConnPerRoute} for specifying the maximum
	 * number of connections per route (per host). Default is 25.
	 *
	 * <p>
	 * This property determines the maximum number of concurrent connections to a single host (route). Adjusting this
	 * value can improve performance when communicating with a server that supports multiple concurrent connections.
	 * </p>
	 */
	public static final String MAX_CONN_PER_ROUTE_PROPERTY = "org.eclipse.rdf4j.client.http.maxConnPerRoute";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.http.maxConnTotal} for specifying the maximum total
	 * number of connections. Default is 50.
	 *
	 * <p>
	 * This property sets the maximum total number of concurrent connections that can be open at the same time.
	 * Increasing this value allows more simultaneous connections to different hosts, which can improve throughput in
	 * multi-threaded environments.
	 * </p>
	 */
	public static final String MAX_CONN_TOTAL_PROPERTY = "org.eclipse.rdf4j.client.http.maxConnTotal";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.http.connectionTimeout} for specifying the HTTP
	 * connection timeout in milliseconds for general use. Default is 30 seconds.
	 *
	 * <p>
	 * The connection timeout determines the maximum time the client will wait to establish a TCP connection to the
	 * server. A default of 30 seconds is set to allow for potential network delays without causing unnecessary
	 * timeouts.
	 * </p>
	 */
	public static final String CONNECTION_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.http.connectionTimeout";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.http.connectionRequestTimeout} for specifying the
	 * HTTP connection request timeout in milliseconds for general use. Default is 1 hour.
	 *
	 * <p>
	 * The connection request timeout defines how long the client will wait for a connection from the connection pool.
	 * </p>
	 */
	public static final String CONNECTION_REQUEST_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.http.connectionRequestTimeout";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.http.socketTimeout} for specifying the HTTP socket
	 * timeout in milliseconds for general use. Default is 10 days.
	 *
	 * <p>
	 * The socket timeout controls the maximum period of inactivity between data packets during data transfer. A longer
	 * timeout is appropriate for large data transfers, ensuring that operations are not interrupted prematurely.
	 * </p>
	 */
	public static final String SOCKET_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.http.socketTimeout";

	// System property constants for SPARQL SERVICE timeouts

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.sparql.http.connectionTimeout} for specifying the
	 * HTTP connection timeout in milliseconds when used in SPARQL SERVICE calls. Default is 5 seconds.
	 *
	 * <p>
	 * A shorter connection timeout is set for SPARQL SERVICE calls to quickly detect unresponsive endpoints in
	 * federated queries, improving overall query performance by avoiding long waits for unreachable servers.
	 * </p>
	 */
	public static final String SPARQL_CONNECTION_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.sparql.http.connectionTimeout";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.sparql.http.connectionRequestTimeout} for specifying
	 * the HTTP connection request timeout in milliseconds when used in SPARQL SERVICE calls. Default is 10 minutes.
	 *
	 * <p>
	 * This timeout controls how long the client waits for a connection from the pool when making SPARQL SERVICE calls.
	 * A shorter timeout than general use ensures that queries fail fast if resources are constrained, maintaining
	 * responsiveness.
	 * </p>
	 */
	public static final String SPARQL_CONNECTION_REQUEST_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.sparql.http.connectionRequestTimeout";

	/**
	 * Configurable system property {@code org.eclipse.rdf4j.client.sparql.http.socketTimeout} for specifying the HTTP
	 * socket timeout in milliseconds when used in SPARQL SERVICE calls. Default is 1 hour.
	 *
	 * <p>
	 * The socket timeout for SPARQL SERVICE calls is set to a shorter duration to detect unresponsive servers during
	 * data transfer, ensuring that the client does not wait indefinitely for data that may never arrive.
	 * </p>
	 */
	public static final String SPARQL_SOCKET_TIMEOUT_PROPERTY = "org.eclipse.rdf4j.client.sparql.http.socketTimeout";

	// Defaults

	/**
	 * Default core pool size for the executor service. Set to 5.
	 *
	 * <p>
	 * This value determines the number of threads to keep in the pool, even if they are idle. Adjusting this value can
	 * help manage resource utilization in high-load scenarios.
	 * </p>
	 */
	public static final int DEFAULT_CORE_POOL_SIZE = 5;

	/**
	 * Default maximum number of connections per route (per host). Set to 25.
	 *
	 * <p>
	 * This value limits the number of concurrent connections to a single host. Increasing it can improve performance
	 * when communicating with a server that can handle multiple connections.
	 * </p>
	 */
	public static final int DEFAULT_MAX_CONN_PER_ROUTE = 25;

	/**
	 * Default maximum total number of connections. Set to 50.
	 *
	 * <p>
	 * This value limits the total number of concurrent connections that can be open at the same time. Increasing it
	 * allows for more simultaneous connections to different hosts.
	 * </p>
	 */
	public static final int DEFAULT_MAX_CONN_TOTAL = 50;

	/**
	 * Default HTTP connection timeout in milliseconds for general use. Set to 30 seconds.
	 *
	 * <p>
	 * The connection timeout determines the maximum time the client will wait to establish a TCP connection to the
	 * server.
	 * </p>
	 */
	public static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000; // 30 seconds

	/**
	 * Default HTTP connection request timeout in milliseconds for general use. Set to 1 hour.
	 *
	 * <p>
	 * The connection request timeout defines how long the client will wait for a connection from the connection pool.
	 * </p>
	 */
	public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 60 * 60 * 1000; // 1 hour

	/**
	 * Default HTTP socket timeout in milliseconds for general use. Set to 10 days.
	 *
	 * <p>
	 * The socket timeout controls the maximum period of inactivity between data packets during data transfer. A longer
	 * timeout is appropriate for large data transfers.
	 * </p>
	 */
	public static final int DEFAULT_SOCKET_TIMEOUT = 10 * 24 * 60 * 60 * 1000; // 10 days

	// Default timeout values for SPARQL SERVICE calls

	/**
	 * Default HTTP connection timeout in milliseconds for SPARQL SERVICE calls. Set to 5 seconds.
	 *
	 * <p>
	 * A shorter connection timeout is set for SPARQL SERVICE calls to quickly detect unresponsive endpoints in
	 * federated queries.
	 * </p>
	 */
	public static final int DEFAULT_SPARQL_CONNECTION_TIMEOUT = 5 * 1000; // 5 seconds

	/**
	 * Default HTTP connection request timeout in milliseconds for SPARQL SERVICE calls. Set to 10 minutes.
	 *
	 * <p>
	 * This timeout controls how long the client waits for a connection from the pool when making SPARQL SERVICE calls.
	 * </p>
	 */
	public static final int DEFAULT_SPARQL_CONNECTION_REQUEST_TIMEOUT = 10 * 60 * 1000; // 10 minutes

	/**
	 * Default HTTP socket timeout in milliseconds for SPARQL SERVICE calls. Set to 1 hour.
	 *
	 * <p>
	 * The socket timeout for SPARQL SERVICE calls is set to a shorter duration to detect unresponsive servers during
	 * data transfer.
	 * </p>
	 */
	public static final int DEFAULT_SPARQL_SOCKET_TIMEOUT = 60 * 60 * 1000; // 1 hour

	// Values as read from system properties or defaults

	/**
	 * Core pool size for the executor service, as read from system properties or defaults.
	 */
	public static final int CORE_POOL_SIZE = Integer
			.parseInt(System.getProperty(CORE_POOL_SIZE_PROPERTY, String.valueOf(DEFAULT_CORE_POOL_SIZE)));

	/**
	 * Maximum number of connections per route (per host), as read from system properties or defaults.
	 */
	public static final int MAX_CONN_PER_ROUTE = Integer
			.parseInt(System.getProperty(MAX_CONN_PER_ROUTE_PROPERTY, String.valueOf(DEFAULT_MAX_CONN_PER_ROUTE)));

	/**
	 * Maximum total number of connections, as read from system properties or defaults.
	 */
	public static final int MAX_CONN_TOTAL = Integer
			.parseInt(System.getProperty(MAX_CONN_TOTAL_PROPERTY, String.valueOf(DEFAULT_MAX_CONN_TOTAL)));

	/**
	 * HTTP connection timeout in milliseconds for general use.
	 */
	public static final int CONNECTION_TIMEOUT = Integer.parseInt(
			System.getProperty(CONNECTION_TIMEOUT_PROPERTY, String.valueOf(DEFAULT_CONNECTION_TIMEOUT)));

	/**
	 * HTTP connection request timeout in milliseconds for general use.
	 */
	public static final int CONNECTION_REQUEST_TIMEOUT = Integer.parseInt(
			System.getProperty(CONNECTION_REQUEST_TIMEOUT_PROPERTY,
					String.valueOf(DEFAULT_CONNECTION_REQUEST_TIMEOUT)));

	/**
	 * HTTP socket timeout in milliseconds for general use.
	 */
	public static final int SOCKET_TIMEOUT = Integer
			.parseInt(System.getProperty(SOCKET_TIMEOUT_PROPERTY, String.valueOf(DEFAULT_SOCKET_TIMEOUT)));

	/**
	 * HTTP connection timeout in milliseconds for SPARQL SERVICE calls.
	 */
	public static final int SPARQL_CONNECTION_TIMEOUT = Integer.parseInt(
			System.getProperty(SPARQL_CONNECTION_TIMEOUT_PROPERTY, String.valueOf(DEFAULT_SPARQL_CONNECTION_TIMEOUT)));

	/**
	 * HTTP connection request timeout in milliseconds for SPARQL SERVICE calls.
	 */
	public static final int SPARQL_CONNECTION_REQUEST_TIMEOUT = Integer.parseInt(
			System.getProperty(SPARQL_CONNECTION_REQUEST_TIMEOUT_PROPERTY,
					String.valueOf(DEFAULT_SPARQL_CONNECTION_REQUEST_TIMEOUT)));

	/**
	 * HTTP socket timeout in milliseconds for SPARQL SERVICE calls.
	 */
	public static final int SPARQL_SOCKET_TIMEOUT = Integer.parseInt(
			System.getProperty(SPARQL_SOCKET_TIMEOUT_PROPERTY, String.valueOf(DEFAULT_SPARQL_SOCKET_TIMEOUT)));

	// Variables for the currently used timeouts

	private int currentConnectionTimeout = CONNECTION_TIMEOUT;
	private int currentConnectionRequestTimeout = CONNECTION_REQUEST_TIMEOUT;
	private int currentSocketTimeout = SOCKET_TIMEOUT;

	private final Logger logger = LoggerFactory.getLogger(SharedHttpClientSessionManager.class);

	/**
	 * Independent life cycle
	 */
	private volatile HttpClient httpClient;

	/**
	 * Dependent life cycle
	 */
	private volatile CloseableHttpClient dependentClient;

	private final ExecutorService executor;

	/**
	 * Optional {@link HttpClientBuilder} to create the inner {@link #httpClient} (if not provided externally)
	 */
	private volatile HttpClientBuilder httpClientBuilder;

	private final Map<SPARQLProtocolSession, Boolean> openSessions = new ConcurrentHashMap<>();

	private static final HttpRequestRetryHandler retryHandlerStale = new RetryHandlerStale();
	private static final ServiceUnavailableRetryStrategy serviceUnavailableRetryHandler = new ServiceUnavailableRetryHandler();

	/**
	 * Retry handler: closes stale connections and suggests to simply retry the HTTP request once. Just closing the
	 * stale connection is enough: the connection will be reopened elsewhere. This seems to be necessary for Jetty
	 * 9.4.24+.
	 * <p>
	 * Other HTTP issues are considered to be more severe, so these requests are not retried.
	 */
	private static class RetryHandlerStale implements HttpRequestRetryHandler {
		private final Logger logger = LoggerFactory.getLogger(RetryHandlerStale.class);

		@Override
		public boolean retryRequest(IOException ioe, int count, HttpContext context) {
			// only try this once
			if (count > 1) {
				return false;
			}
			HttpClientContext clientContext = HttpClientContext.adapt(context);
			HttpConnection conn = clientContext.getConnection();
			if (conn != null) {
				synchronized (this) {
					if (conn.isStale()) {
						try {
							logger.warn("Closing stale connection");
							conn.close();
							return true;
						} catch (IOException e) {
							logger.error("Error closing stale connection", e);
						}
					}
				}
			}
			return false;
		}
	}

	private static class ServiceUnavailableRetryHandler implements ServiceUnavailableRetryStrategy {
		private final Logger logger = LoggerFactory.getLogger(ServiceUnavailableRetryHandler.class);

		@Override
		public boolean retryRequest(HttpResponse response, int executionCount, HttpContext context) {
			// only retry on HTTP 408 (Request Timeout)
			if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
				return false;
			}

			// when `keepAlive` is disabled every connection is fresh (with the default `useSystemProperties` http
			// client configuration we use), a 408 in that case is an unexpected issue we don't handle here
			String keepAlive = System.getProperty("http.keepAlive", "true");
			if (!"true".equalsIgnoreCase(keepAlive)) {
				return false;
			}

			// Worst case, the connection pool is filled to the max and all of them idled out on the server already
			// We then need to clean up the pool and retry with a fresh connection. Hence, we need at most
			// pooledConnections + 1 retries.
			int pooledConnections = MAX_CONN_PER_ROUTE;
			if (executionCount > (pooledConnections + 1)) {
				return false;
			}

			HttpClientContext clientContext = HttpClientContext.adapt(context);
			HttpConnection conn = clientContext.getConnection();

			synchronized (this) {
				try {
					logger.info("Cleaning up closed connection");
					conn.close();
					return true;
				} catch (IOException e) {
					logger.error("Error cleaning up closed connection", e);
				}
			}
			return false;
		}

		@Override
		public long getRetryInterval() {
			return 1000;
		}
	}

	/*--------------*
	 * Constructors *
	 *--------------*/

	public SharedHttpClientSessionManager() {
		final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory();

		ExecutorService threadPoolExecutor = Executors.newCachedThreadPool((Runnable runnable) -> {
			Thread thread = backingThreadFactory.newThread(runnable);
			thread.setName(
					String.format("rdf4j-SharedHttpClientSessionManager-%d", threadCount.getAndIncrement()));
			thread.setDaemon(true);
			return thread;
		});

		Integer corePoolSize = CORE_POOL_SIZE;
		((ThreadPoolExecutor) threadPoolExecutor).setCorePoolSize(corePoolSize);
		this.executor = threadPoolExecutor;
	}

	public SharedHttpClientSessionManager(CloseableHttpClient dependentClient,
			ScheduledExecutorService dependentExecutorService) {
		this.httpClient = this.dependentClient = Objects.requireNonNull(dependentClient, "HTTP client was null");
		this.executor = Objects.requireNonNull(dependentExecutorService, "Executor service was null");
	}

	@Override
	public HttpClient getHttpClient() {
		HttpClient result = httpClient;
		if (result == null) {
			synchronized (this) {
				result = httpClient;
				if (result == null) {
					result = httpClient = dependentClient = createHttpClient();
				}
			}
		}
		return result;
	}

	/**
	 * @param httpClient The httpClient to use for remote/service calls.
	 */
	@Override
	public void setHttpClient(HttpClient httpClient) {
		synchronized (this) {
			this.httpClient = Objects.requireNonNull(httpClient, "HTTP Client cannot be null");
			// If they set a client, we need to check whether we need to
			// close any existing dependentClient
			CloseableHttpClient toCloseDependentClient = dependentClient;
			dependentClient = null;
			if (toCloseDependentClient != null) {
				HttpClientUtils.closeQuietly(toCloseDependentClient);
			}
		}
	}

	/**
	 * Set an optional {@link HttpClientBuilder} to create the inner {@link #httpClient} (if the latter is not provided
	 * externally as dependent client).
	 *
	 * @param httpClientBuilder the builder for the managed HttpClient
	 * @see HttpClientBuilders
	 */
	public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
		this.httpClientBuilder = httpClientBuilder;
	}

	@Override
	public SPARQLProtocolSession createSPARQLProtocolSession(String queryEndpointUrl, String updateEndpointUrl) {
		SPARQLProtocolSession session = new SPARQLProtocolSession(getHttpClient(), executor) {

			@Override
			public void close() {
				try {
					super.close();
				} finally {
					openSessions.remove(this);
				}
			}
		};
		session.setQueryURL(queryEndpointUrl);
		session.setUpdateURL(updateEndpointUrl);
		openSessions.put(session, true);
		return session;
	}

	@Override
	public RDF4JProtocolSession createRDF4JProtocolSession(String serverURL) {
		RDF4JProtocolSession session = new RDF4JProtocolSession(getHttpClient(), executor) {

			@Override
			public void close() {
				try {
					super.close();
				} finally {
					openSessions.remove(this);
				}
			}
		};
		session.setServerURL(serverURL);
		openSessions.put(session, true);
		return session;
	}

	@Override
	public void shutDown() {
		try {
			// Close open sessions
			openSessions.keySet().forEach(session -> {
				try {
					session.close();
				} catch (Exception e) {
					logger.error(e.toString(), e);
				}
			});
			CloseableHttpClient toCloseDependentClient = dependentClient;
			dependentClient = null;
			if (toCloseDependentClient != null) {
				HttpClientUtils.closeQuietly(toCloseDependentClient);
			}
		} finally {
			// Shutdown the executor
			try {
				executor.shutdown();
				executor.awaitTermination(10, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			} finally {
				if (!executor.isTerminated()) {
					executor.shutdownNow();
				}
			}
		}
	}

	/**
	 * No-op
	 *
	 * @deprecated Create a new instance instead of trying to reactivate an old instance.
	 */
	@Deprecated
	public void initialize() {
	}

	/**
	 * Get the {@link ExecutorService} used by this session manager.
	 *
	 * @return a {@link ExecutorService} used by all {@link SPARQLProtocolSession} and {@link RDF4JProtocolSession}
	 *         instances created by this session manager.
	 */
	protected final ExecutorService getExecutorService() {
		return this.executor;
	}

	private CloseableHttpClient createHttpClient() {

		HttpClientBuilder nextHttpClientBuilder = httpClientBuilder;
		if (nextHttpClientBuilder != null) {
			return nextHttpClientBuilder.build();
		}

		RequestConfig requestConfig = getDefaultRequestConfig();

		return HttpClientBuilder.create()
				.evictExpiredConnections()
				.evictIdleConnections(30, TimeUnit.MINUTES)
				.setRetryHandler(retryHandlerStale)
				.setServiceUnavailableRetryStrategy(serviceUnavailableRetryHandler)
				.setMaxConnPerRoute(MAX_CONN_PER_ROUTE)
				.setMaxConnTotal(MAX_CONN_TOTAL)
				.useSystemProperties()
				.setDefaultRequestConfig(requestConfig)
				.build();
	}

	/**
	 * Returns the default {@link RequestConfig} using the currently set timeout values.
	 *
	 * @return a configured {@link RequestConfig} with the current timeouts.
	 */
	public RequestConfig getDefaultRequestConfig() {
		return RequestConfig.custom()
				.setConnectTimeout(currentConnectionTimeout)
				.setConnectionRequestTimeout(currentConnectionRequestTimeout)
				.setSocketTimeout(currentSocketTimeout)
				.setCookieSpec(CookieSpecs.STANDARD)
				.build();
	}

	/**
	 * Switches the current timeout settings to use the SPARQL-specific timeouts. This method should be called when
	 * making SPARQL SERVICE calls to apply shorter timeout values.
	 *
	 * <p>
	 * The SPARQL-specific timeouts are shorter to ensure that unresponsive or slow SPARQL endpoints do not cause long
	 * delays in federated query processing. Quick detection of such issues improves the responsiveness and reliability
	 * of SPARQL queries.
	 * </p>
	 */
	public void setDefaultSparqlServiceTimeouts() {
		this.currentConnectionTimeout = SPARQL_CONNECTION_TIMEOUT;
		this.currentConnectionRequestTimeout = SPARQL_CONNECTION_REQUEST_TIMEOUT;
		this.currentSocketTimeout = SPARQL_SOCKET_TIMEOUT;
	}

	/**
	 * Resets the current timeout settings to the general timeouts. This method should be called to revert any changes
	 * made by {@link #setDefaultSparqlServiceTimeouts()} and apply the general timeout values.
	 *
	 * <p>
	 * The general timeouts are longer to accommodate operations that may take more time, such as large data transfers
	 * or extensive processing, without causing premature timeouts.
	 * </p>
	 */
	public void setDefaultTimeouts() {
		this.currentConnectionTimeout = CONNECTION_TIMEOUT;
		this.currentConnectionRequestTimeout = CONNECTION_REQUEST_TIMEOUT;
		this.currentSocketTimeout = SOCKET_TIMEOUT;
	}

}