ActiveTransactionRegistry.java

/*******************************************************************************
 * Copyright (c) 2016 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.http.server.repository.transaction;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.http.protocol.Protocol;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;

/**
 * Registry keeping track of active transactions identified by a {@link UUID}.
 *
 * @author Jeen Broekstra
 */
public enum ActiveTransactionRegistry {

	INSTANCE;

	private int timeout = DEFAULT_TIMEOUT;

	private final Logger logger = LoggerFactory.getLogger(ActiveTransactionRegistry.class);

	/**
	 * Configurable system property {@code rdf4j.server.txn.registry.timeout} for specifying the transaction cache
	 * timeout (in seconds).
	 *
	 * @deprecated Use {@link Protocol#CACHE_TIMEOUT_PROPERTY}
	 */
	@Deprecated(since = "2.3")
	public static final String CACHE_TIMEOUT_PROPERTY = Protocol.TIMEOUT.CACHE_PROPERTY;

	/**
	 * Default timeout setting for transaction cache entries (in seconds).
	 *
	 * @deprecated Use {@link Protocol#DEFAULT_TIMEOUT}
	 */
	@Deprecated(since = "2.3")
	public final static int DEFAULT_TIMEOUT = Protocol.TIMEOUT.DEFAULT;

	/**
	 * primary cache for transactions, accessible via transaction ID. Cache entries are kept until a transaction signals
	 * it has ended, or until the secondary cache finds an "orphaned" transaction entry.
	 */
	private final Cache<UUID, Transaction> primaryCache;

	/**
	 * The secondary cache does automatic cleanup of its entries based on the configured timeout. If an expired
	 * transaction is no longer active, it is considered "orphaned" and discarded from the primary cache.
	 */
	private final Cache<UUID, Transaction> secondaryCache;

	/**
	 * a scheduler that routinely cleanup the secondary cache there is no other way to remove stale transactions from
	 * there if remote clients are gone
	 */
	private final ScheduledExecutorService cleaupSecondaryCacheScheduler;
	private ScheduledFuture<?> cleanupTask;

	private Cache<UUID, Transaction> getSecondaryCache() {
		return secondaryCache;
	}

	/**
	 * private constructor.
	 */
	ActiveTransactionRegistry() {
		final String configuredValue = System.getProperty(Protocol.CACHE_TIMEOUT_PROPERTY);
		if (configuredValue != null) {
			try {
				timeout = Integer.parseInt(configuredValue);
			} catch (NumberFormatException e) {
				logger.warn("Expected integer value for property {}. Timeout will default to {} seconds. ",
						Protocol.CACHE_TIMEOUT_PROPERTY, Protocol.DEFAULT_TIMEOUT);
			}
		}
		primaryCache = CacheBuilder.newBuilder()
				.removalListener((RemovalNotification<UUID, Transaction> notification) -> {
					UUID transactionId = notification.getKey();
					Transaction entry = notification.getValue();
					try {
						logger.debug("primary cache removal txid {}", transactionId);
						entry.close();
					} catch (RepositoryException | InterruptedException | ExecutionException e) {
						// fall through
					}
				})
				.build();

		secondaryCache = CacheBuilder.newBuilder()
				.removalListener((RemovalNotification<UUID, Transaction> notification) -> {
					logger.debug("secondary cache removal");
					if (RemovalCause.EXPIRED.equals(notification.getCause())) {
						final UUID transactionId = notification.getKey();
						final Transaction entry = notification.getValue();
						logger.debug("expired transaction to be removed {}", transactionId);
						synchronized (primaryCache) {
							// no operation active, we can decommission this entry
							primaryCache.invalidate(transactionId);
							logger.debug("deregistered expired transaction {}", transactionId);
							try {
								logger.debug("try close() invoked on transaction !!!{}", transactionId);
								entry.close();
							} catch (Throwable t) {
								logger.debug("error on close when purging {}", t.getMessage());
							}
						}
					}
				})
				.expireAfterAccess(timeout, TimeUnit.SECONDS)
				.build();
		cleaupSecondaryCacheScheduler = Executors.newSingleThreadScheduledExecutor((

				Runnable runnable) -> {
			Thread thread = Executors.defaultThreadFactory().newThread(runnable);
			thread.setName("rdf4j-cleanup-stn-scheduler");
			thread.setDaemon(true);
			return thread;
		});

		// timeout + 10% to force cleanup
		cleanupTask = cleaupSecondaryCacheScheduler.schedule(() -> {
			cleanUpSecondaryCache();
		}, timeout + timeout / 10, TimeUnit.SECONDS);
		logger.debug("secondary cache expire time {} seconds", timeout);
	}

	protected void cleanUpSecondaryCache() {
		synchronized (primaryCache) {
			logger.debug("performing secondary cache cleanup. {}", getSecondaryCache().size());
			getSecondaryCache().cleanUp();
		}
		cleanupTask = cleaupSecondaryCacheScheduler.schedule(() -> {
			cleanUpSecondaryCache();
		}, timeout + timeout / 10, TimeUnit.SECONDS);
	}

	// stops the secondary cache cleanup scheduler. invoked by TransactionController.destroy()
	public void destroyScheduler() {
		if (cleanupTask != null)
			cleanupTask.cancel(false);
		cleanupTask = null;
		cleaupSecondaryCacheScheduler.shutdownNow();
		logger.debug("ActiveTransactionCache destroy invoked!");
	}

	public long getTimeout(TimeUnit unit) {
		return unit.convert(timeout, TimeUnit.SECONDS);
	}

	/**
	 * @param txn
	 */
	public void register(Transaction txn) {
		synchronized (primaryCache) {
			Transaction existingTxn = primaryCache.getIfPresent(txn.getID());
			if (existingTxn == null) {
				primaryCache.put(txn.getID(), txn);
				secondaryCache.put(txn.getID(), txn);
				logger.debug("registered transaction {} ", txn.getID());
			} else {
				logger.error("transaction already registered: {}", txn.getID());
				throw new RepositoryException("transaction with id " + txn.getID().toString() + " already registered.");
			}
		}
	}

	public Transaction getTransaction(UUID id) {
		synchronized (primaryCache) {
			Transaction entry = primaryCache.getIfPresent(id);
			if (entry == null) {
				throw new RepositoryException("transaction with id " + id.toString() + " not registered.");
			}
			updateSecondaryCache(entry);
			return entry;
		}
	}

	/**
	 * Resets transaction timeout. If transaction has already timed-out, reinsert the transaction.
	 *
	 * @param txn
	 */
	public void active(Transaction txn) {
		synchronized (primaryCache) {
			updateSecondaryCache(txn);
			Transaction existingTxn = primaryCache.getIfPresent(txn.getID());
			if (existingTxn == null) {
				// reinstate transaction that timed-out too soon
				primaryCache.put(txn.getID(), txn);
				logger.debug("reinstated transaction {} ", txn.getID());
			}
		}
	}

	/**
	 * @param transaction
	 */
	public void deregister(Transaction transaction) {

		synchronized (primaryCache) {
			Transaction entry = primaryCache.getIfPresent(transaction.getID());
			if (entry == null) {
				throw new RepositoryException(
						"transaction with id " + transaction.getID().toString() + " not registered.");
			} else {
				primaryCache.invalidate(transaction.getID());
				secondaryCache.invalidate(transaction.getID());
				logger.debug("deregistered transaction {}", transaction.getID());
			}
		}
	}

	/**
	 * Checks if the given transaction entry is still in the secondary cache (resetting its last access time in the
	 * process) and if not reinserts it.
	 *
	 * @param transaction the transaction to check
	 */
	private void updateSecondaryCache(final Transaction transaction) {
		try {
			secondaryCache.get(transaction.getID(), () -> transaction);
			logger.debug("secondary cache update transaction {}", transaction.getID());
		} catch (ExecutionException e) {
			throw new RuntimeException(e);
		}
	}
}