MemoryStore.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.memory;

import java.io.File;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;

import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategyFactory;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolverClient;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategyFactory;
import org.eclipse.rdf4j.repository.sparql.federation.SPARQLServiceResolver;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.SailChangedEvent;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.base.SailDataset;
import org.eclipse.rdf4j.sail.base.SailSink;
import org.eclipse.rdf4j.sail.base.SailStore;
import org.eclipse.rdf4j.sail.helpers.AbstractNotifyingSail;
import org.eclipse.rdf4j.sail.helpers.DirectoryLockManager;
import org.eclipse.rdf4j.sail.memory.model.MemValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An implementation of the Sail interface that stores its data in main memory and that can use a file for persistent
 * storage. This Sail implementation supports single, isolated transactions. This means that changes to the data are not
 * visible until a transaction is committed and that concurrent transactions are not possible. When another transaction
 * is active, calls to <var>startTransaction()</var> will block until the active transaction is committed or rolled
 * back.
 * <p>
 * The MemoryStore is designed for datasets with fewer than 100,000 triples. The MemoryStore uses hash tables, and when
 * these hash tables fill up it copies the values to larger hash tables. This can cause strain on the garbage collector
 * due to lots of memory being allocated and freed.
 *
 * @author Arjohn Kampman
 * @author jeen
 */
public class MemoryStore extends AbstractNotifyingSail implements FederatedServiceResolverClient {

	private static final Logger logger = LoggerFactory.getLogger(MemoryStore.class);

	/*-----------*
	 * Constants *
	 *-----------*/

	protected static final String DATA_FILE_NAME = "memorystore.data";

	protected static final String SYNC_FILE_NAME = "memorystore.sync";

	/*-----------*
	 * Variables *
	 *-----------*/

	/**
	 * Factory/cache for MemValue objects.
	 */
	private SailStore store;

	private volatile boolean persist = false;

	/**
	 * The file used for data persistence, null if this is a volatile RDF store.
	 */
	private volatile File dataFile;

	/**
	 * The file used for serialising data, null if this is a volatile RDF store.
	 */
	private volatile File syncFile;

	/**
	 * The directory lock, null if this is read-only or a volatile RDF store.
	 */
	private volatile Lock dirLock;

	/**
	 * Flag indicating whether the contents of this repository have changed.
	 */
	private volatile boolean contentsChanged;

	/**
	 * The sync delay.
	 *
	 * @see #setSyncDelay
	 */
	private volatile long syncDelay = 0L;

	/**
	 * Semaphore used to synchronize concurrent access to {@link #syncWithLock()} .
	 */
	private final Object syncSemaphore = new Object();

	/**
	 * The timer used to trigger file synchronization.
	 */
	private volatile Timer syncTimer;

	/**
	 * The currently scheduled timer task, if any.
	 */
	private volatile TimerTask syncTimerTask;

	/**
	 * Semaphore used to synchronize concurrent access to {@link #syncTimer} and {@link #syncTimerTask}.
	 */
	private final Object syncTimerSemaphore = new Object();

	private EvaluationStrategyFactory evalStratFactory;

	/**
	 * independent life cycle
	 */
	private FederatedServiceResolver serviceResolver;

	/**
	 * dependent life cycle
	 */
	private SPARQLServiceResolver dependentServiceResolver;

	/*--------------*
	 * Constructors *
	 *--------------*/

	/**
	 * Creates a new MemoryStore.
	 */
	public MemoryStore() {
		setSupportedIsolationLevels(IsolationLevels.NONE, IsolationLevels.READ_COMMITTED, IsolationLevels.SNAPSHOT_READ,
				IsolationLevels.SNAPSHOT, IsolationLevels.SERIALIZABLE);
		setDefaultIsolationLevel(IsolationLevels.SNAPSHOT_READ);
	}

	/**
	 * Creates a new persistent MemoryStore. If the specified data directory contains an existing store, its contents
	 * will be restored upon initialization.
	 *
	 * @param dataDir the data directory to be used for persistence.
	 */
	public MemoryStore(File dataDir) {
		this();
		setDataDir(dataDir);
		setPersist(true);
	}

	/*---------*
	 * Methods *
	 *---------*/

	public void setPersist(boolean persist) {
		if (isInitialized()) {
			throw new IllegalStateException("sail has already been initialized");
		}

		this.persist = persist;
	}

	public boolean getPersist() {
		return persist;
	}

	/**
	 * Sets the time (in milliseconds) to wait after a transaction was commited before writing the changed data to file.
	 * Setting this variable to 0 will force a file sync immediately after each commit. A negative value will deactivate
	 * file synchronization until the Sail is shut down. A positive value will postpone the synchronization for at least
	 * that amount of milliseconds. If in the meantime a new transaction is started, the file synchronization will be
	 * rescheduled to wait for another <var>syncDelay</var> ms. This way, bursts of transaction events can be combined
	 * in one file sync.
	 * <p>
	 * The default value for this parameter is <var>0</var> (immediate synchronization).
	 *
	 * @param syncDelay The sync delay in milliseconds.
	 */
	public void setSyncDelay(long syncDelay) {
		if (isInitialized()) {
			throw new IllegalStateException("sail has already been initialized");
		}

		this.syncDelay = syncDelay;
	}

	/**
	 * Gets the currently configured sync delay.
	 *
	 * @return syncDelay The sync delay in milliseconds.
	 * @see #setSyncDelay
	 */
	public long getSyncDelay() {
		return syncDelay;
	}

	/**
	 * @return Returns the {@link EvaluationStrategy}.
	 */
	public synchronized EvaluationStrategyFactory getEvaluationStrategyFactory() {
		if (evalStratFactory == null) {
			evalStratFactory = new DefaultEvaluationStrategyFactory(getFederatedServiceResolver());
		}
		evalStratFactory.setQuerySolutionCacheThreshold(getIterationCacheSyncThreshold());
		evalStratFactory.setTrackResultSize(isTrackResultSize());
		return evalStratFactory;
	}

	/**
	 * Sets the {@link EvaluationStrategy} to use.
	 */
	public synchronized void setEvaluationStrategyFactory(EvaluationStrategyFactory factory) {
		evalStratFactory = factory;
	}

	/**
	 * @return Returns the SERVICE resolver.
	 */
	public synchronized FederatedServiceResolver getFederatedServiceResolver() {
		if (serviceResolver == null) {
			if (dependentServiceResolver == null) {
				dependentServiceResolver = new SPARQLServiceResolver();
			}
			setFederatedServiceResolver(dependentServiceResolver);
		}
		return serviceResolver;
	}

	/**
	 * Overrides the {@link FederatedServiceResolver} used by this instance, but the given resolver is not shutDown when
	 * this instance is.
	 *
	 * @param resolver The SERVICE resolver to set.
	 */
	@Override
	public synchronized void setFederatedServiceResolver(FederatedServiceResolver resolver) {
		this.serviceResolver = resolver;
		if (resolver != null && evalStratFactory instanceof FederatedServiceResolverClient) {
			((FederatedServiceResolverClient) evalStratFactory).setFederatedServiceResolver(resolver);
		}
	}

	/**
	 * Initializes this repository. If a persistence file is defined for the store, the contents will be restored.
	 *
	 * @throws SailException when initialization of the store failed.
	 */
	@Override
	protected void initializeInternal() throws SailException {
		logger.debug("Initializing MemoryStore...");

		this.store = new MemorySailStore(debugEnabled());

		if (persist) {
			File dataDir = getDataDir();
			DirectoryLockManager locker = new DirectoryLockManager(dataDir);
			dataFile = new File(dataDir, DATA_FILE_NAME);
			syncFile = new File(dataDir, SYNC_FILE_NAME);

			if (dataFile.exists()) {
				logger.debug("Reading data from {}...", dataFile);

				// Initialize persistent store from file
				if (!dataFile.canRead()) {
					logger.error("Data file is not readable: {}", dataFile);
					throw new SailException("Can't read data file: " + dataFile);
				}
				// try to create a lock for later writing
				dirLock = locker.tryLock();
				if (dirLock == null) {
					logger.warn("Failed to lock directory: {}", dataDir);
				}
				// Don't try to read empty files: this will result in an
				// IOException, and the file doesn't contain any data anyway.
				if (dataFile.length() == 0L) {
					logger.warn("Ignoring empty data file: {}", dataFile);
				} else {
					SailSink explicit = store.getExplicitSailSource().sink(IsolationLevels.NONE);
					SailSink inferred = store.getInferredSailSource().sink(IsolationLevels.NONE);
					try {
						new FileIO((MemValueFactory) store.getValueFactory()).read(dataFile, explicit, inferred);
						logger.debug("Data file read successfully");
					} catch (IOException e) {
						logger.error("Failed to read data file", e);
						throw new SailException(e);
					} finally {
						explicit.prepare();
						explicit.flush();
						explicit.close();
						inferred.prepare();
						inferred.flush();
						inferred.close();
					}
				}
			} else {
				// file specified that does not exist yet, create it
				try {
					File dir = dataFile.getParentFile();
					if (dir != null && !dir.exists()) {
						logger.debug("Creating directory for data file...");
						if (!dir.mkdirs()) {
							logger.debug("Failed to create directory for data file: {}", dir);
							throw new SailException("Failed to create directory for data file: " + dir);
						}
					}
					// try to lock directory or fail
					dirLock = locker.lockOrFail();

					logger.debug("Initializing data file...");
					try (SailDataset explicit = store.getExplicitSailSource().dataset(IsolationLevels.SNAPSHOT);
							SailDataset inferred = store.getInferredSailSource().dataset(IsolationLevels.SNAPSHOT)) {
						new FileIO((MemValueFactory) store.getValueFactory()).write(explicit, inferred, syncFile,
								dataFile);
					}
					logger.debug("Data file initialized");
				} catch (IOException | SailException e) {
					logger.debug("Failed to initialize data file", e);
					throw new SailException("Failed to initialize data file " + dataFile, e);
				}
			}
		}

		contentsChanged = false;

		logger.debug("MemoryStore initialized");
	}

	@Override
	protected void shutDownInternal() throws SailException {
		try {
			cancelSyncTimer();
			sync();

			store.close();
			dataFile = null;
			syncFile = null;
		} finally {
			if (dirLock != null) {
				dirLock.release();
			}
			if (dependentServiceResolver != null) {
				dependentServiceResolver.shutDown();
			}
		}
	}

	/**
	 * Checks whether this Sail object is writable. A MemoryStore is not writable if a read-only data file is used.
	 */
	@Override
	public boolean isWritable() {
		// Sail is not writable when it has a dataDir but no directory lock
		return !persist || dirLock != null;
	}

	@Override
	protected NotifyingSailConnection getConnectionInternal() throws SailException {
		return new MemoryStoreConnection(this);
	}

	@Override
	public ValueFactory getValueFactory() {
		if (store == null) {
			throw new IllegalStateException("sail not initialized.");
		}

		return store.getValueFactory();
	}

	@Override
	public void notifySailChanged(SailChangedEvent event) {
		super.notifySailChanged(event);
		synchronized (syncSemaphore) {
			contentsChanged = contentsChanged || event.statementsAdded() || event.statementsRemoved();
		}
	}

	protected void scheduleSyncTask() throws SailException {
		if (!persist) {
			return;
		}

		if (syncDelay == 0L) {
			// Sync immediately
			sync();
		} else if (syncDelay > 0L) {
			synchronized (syncTimerSemaphore) {
				// Sync in syncDelay milliseconds
				if (syncTimer == null) {
					// Create the syncTimer on a deamon thread
					syncTimer = new Timer("MemoryStore synchronization", true);
				}

				if (syncTimerTask != null) {
					// sync task from (concurrent) other transaction exists.
					// cancel and replace with newly scheduled sync task.
					syncTimerTask.cancel();
				}

				syncTimerTask = new TimerTask() {

					@Override
					public void run() {
						try {
							sync();
						} catch (SailException e) {
							logger.warn("Unable to sync on timer", e);
						}
					}
				};

				syncTimer.schedule(syncTimerTask, syncDelay);
			}
		}
	}

	protected void cancelSyncTask() {
		synchronized (syncTimerSemaphore) {
			if (syncTimerTask != null) {
				syncTimerTask.cancel();
				syncTimerTask = null;
			}
		}
	}

	protected void cancelSyncTimer() {
		synchronized (syncTimerSemaphore) {
			if (syncTimer != null) {
				syncTimer.cancel();
				syncTimer = null;
			}
		}
	}

	/**
	 * Synchronizes the contents of this repository with the data that is stored on disk. Data will only be written when
	 * the contents of the repository and data in the file are out of sync.
	 */
	public void sync() throws SailException {
		// syncSemaphore prevents concurrent file synchronizations
		synchronized (syncSemaphore) {
			if (persist && contentsChanged) {
				logger.debug("syncing data to file...");
				try {
					IsolationLevels level = IsolationLevels.SNAPSHOT;
					try (SailDataset explicit = store.getExplicitSailSource().dataset(level);
							SailDataset inferred = store.getInferredSailSource().dataset(level)) {
						new FileIO((MemValueFactory) store.getValueFactory()).write(explicit, inferred, syncFile,
								dataFile);
					}
					contentsChanged = false;
					logger.debug("Data synced to file");
				} catch (IOException e) {
					logger.error("Failed to sync to file", e);
					throw new SailException(e);
				}
			}
		}
	}

	SailStore getSailStore() {
		return store;
	}
}