ElasticsearchStore.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleStore;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * An RDF4J SailStore persisted to Elasticsearch.
 * </p>
 *
 * <p>
 * This is an EXPERIMENTAL feature. Use at your own risk!
 * </p>
 * <p>
 * Note that, while RDF4J is licensed under the EDL, several ElasticSearch dependencies are licensed under the Elastic
 * License or the SSPL, which may have implications for some projects. <br/>
 * Please consult the ElasticSearch website and Elastic license FAQ for more information.
 * </p>
 * <p>
 * There is no write-ahead logging, so a failure during a transaction may result in partially persisted changes.
 * </p>
 *
 * @see <a href="https://www.elastic.co/licensing/elastic-license/faq">Elastic License FAQ</a>
 *
 * @author H��vard Mikkelsen Ottestad
 */
@Experimental
public class ElasticsearchStore extends ExtensibleStore<ElasticsearchDataStructure, ElasticsearchNamespaceStore> {

	private static final Logger logger = LoggerFactory.getLogger(ElasticsearchStore.class);

	final ClientProvider clientProvider;
	private final AtomicBoolean shutdown = new AtomicBoolean(false);

	private String hostname;
	private int port;
	private String clusterName;
	private String index;

	public ElasticsearchStore(String hostname, int port, String clusterName, String index) {
		this(hostname, port, clusterName, index, Cache.EAGER);
	}

	public ElasticsearchStore(String hostname, int port, String clusterName, String index, Cache cache) {
		super(cache);
		this.hostname = hostname;
		this.port = port;
		this.clusterName = clusterName;
		this.index = index;

		clientProvider = new SingletonClientProvider(hostname, port, clusterName);

		dataStructure = new ElasticsearchDataStructure(clientProvider, index);
		namespaceStore = new ElasticsearchNamespaceStore(clientProvider, index + "_namespaces");

		ReferenceQueue<ElasticsearchStore> objectReferenceQueue = new ReferenceQueue<>();
		startGarbageCollectionMonitoring(objectReferenceQueue, new PhantomReference<>(this, objectReferenceQueue),
				clientProvider);

	}

	public ElasticsearchStore(ClientProvider clientPool, String index) {
		this(clientPool, index, Cache.EAGER);
	}

	public ElasticsearchStore(ClientProvider clientPool, String index, Cache cache) {
		super(cache);
		this.clientProvider = new UnclosableClientProvider(clientPool);

		dataStructure = new ElasticsearchDataStructure(this.clientProvider, index);
		namespaceStore = new ElasticsearchNamespaceStore(this.clientProvider, index + "_namespaces");

	}

	public ElasticsearchStore(Client client, String index) {
		this(client, index, Cache.EAGER);
	}

	public ElasticsearchStore(Client client, String index, Cache cache) {
		this(new UnclosableClientProvider(new UserProvidedClientProvider(client)), index, cache);
	}

	@Override
	protected void initializeInternal() throws SailException {
		if (shutdown.get()) {
			throw new SailException("Can not be initialized after calling shutdown!");
		}
		waitForElasticsearch(10, ChronoUnit.MINUTES);

		super.initializeInternal();
	}

	@Override
	protected void shutDownInternal() throws SailException {
		if (shutdown.compareAndSet(false, true)) {
			super.shutDownInternal();
			try {
				clientProvider.close();
			} catch (Exception e) {
				throw new SailException(e);
			}
		}
	}

	public void waitForElasticsearch(int time, TemporalUnit timeUnit) {

		LocalDateTime tenMinFromNow = LocalDateTime.now().plus(time, timeUnit);

		logger.info("Waiting for Elasticsearch to start");

		while (true) {
			if (LocalDateTime.now().isAfter(tenMinFromNow)) {
				logger.error(
						"Could not connect to Elasticsearch after " + time + " " + timeUnit.toString() + " of trying!");

				try {
					clientProvider.close();
				} catch (Exception e) {
					throw new RuntimeException(e);
				}
				throw new RuntimeException(
						"Could not connect to Elasticsearch after " + time + " " + timeUnit.toString() + " of trying!");

			}
			try {
				Client client = clientProvider.getClient();

				ClusterHealthResponse clusterHealthResponse = client.admin()
						.cluster()
						.health(new ClusterHealthRequest())
						.actionGet();
				ClusterHealthStatus status = clusterHealthResponse.getStatus();
				logger.info("Cluster status: {}", status.name());

				if (status.equals(ClusterHealthStatus.GREEN) || status.equals(ClusterHealthStatus.YELLOW)) {
					logger.info("Elasticsearch started!");

					return;

				}

			} catch (Throwable e) {
				logger.info("Unable to connect to elasticsearch cluster due to {}", e.getClass().getSimpleName());

				try {
					clientProvider.close();
				} catch (Exception e2) {
					throw new RuntimeException(e2);
				}
				e.printStackTrace();
			}

			logger.info(".");

			try {
				Thread.sleep(1000);
			} catch (InterruptedException ignored) {

			}
		}

	}

	// this code does some final safety cleanup when the user's ElasticsearchStore gets garbage collected
	private void startGarbageCollectionMonitoring(ReferenceQueue<ElasticsearchStore> referenceQueue,
			Reference<ElasticsearchStore> ref, ClientProvider clientProvider) {

		ExecutorService ex = Executors.newSingleThreadExecutor(r -> {
			Thread t = Executors.defaultThreadFactory().newThread(r);
			// this thread pool does not need to stick around if the all other threads are done
			t.setDaemon(true);
			return t;
		});

		ex.execute(() -> {
			while (referenceQueue.poll() != ref) {
				// don't hang forever
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					// should never be interrupted
					break;
				}
			}

			if (ref.get() != null) {
				// we were apparently interrupted before the object was set to be finalized
				return;
			}

			if (!clientProvider.isClosed()) {
				logger.warn(
						"Closing ClientPool in ElasticsearchStore due to store having no references and shutdown() never being called()");
			}

			try {
				clientProvider.close();
			} catch (Exception ignored) {
				// ignoring any exception, since this cleanup is best effort
			}

		});
		// this is a soft operation, the thread pool will actually wait until the task above has completed
		ex.shutdown();
	}

	public void setElasticsearchScrollTimeout(int timeout) {
		dataStructure.setElasticsearchScrollTimeout(timeout);
	}

	@Override
	protected NotifyingSailConnection getConnectionInternal() throws SailException {
		return new ElasticsearchStoreConnection(this);
	}

	@Override
	public boolean isWritable() throws SailException {
		return true;
	}

	public String getHostname() {
		return hostname;
	}

	public int getPort() {
		return port;
	}

	public String getClusterName() {
		return clusterName;
	}

	public String getIndex() {
		return index;
	}

	public void setElasticsearchBulkSize(int size) {
		dataStructure.setElasticsearchBulkSize(size);
	}

	@Override
	public ExtensibleStatementHelper getExtensibleStatementHelper() {
		return (ExtensibleStatementHelper) ElasticsearchValueFactory.getInstance();
	}
}