DataFile.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.datastore;

import static org.eclipse.rdf4j.sail.nativerdf.NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.io.NioFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Class supplying access to a data file. A data file stores data sequentially. Each entry starts with the entry's
 * length (4 bytes), followed by the data itself. File offsets are used to identify entries.
 *
 * @author Arjohn Kampman
 */
public class DataFile implements Closeable {

	private static final Logger logger = LoggerFactory.getLogger(DataFile.class);

	/*-----------*
	 * Constants *
	 *-----------*/

	/**
	 * Magic number "Native Data File" to detect whether the file is actually a data file. The first three bytes of the
	 * file should be equal to this magic number.
	 */
	private static final byte[] MAGIC_NUMBER = new byte[] { 'n', 'd', 'f' };

	/**
	 * File format version, stored as the fourth byte in data files.
	 */
	private static final byte FILE_FORMAT_VERSION = 1;

	private static final long HEADER_LENGTH = MAGIC_NUMBER.length + 1;

	// Guard parameters
	private static final int FALLBACK_LARGE_READ_THRESHOLD = 128 * 1024 * 1024; // 128MB fallback
	public static final String LARGE_READ_THRESHOLD_PROPERTY = "org.eclipse.rdf4j.sail.nativerdf.datastore.DataFile.largeReadThresholdBytes";
	public static final int LARGE_READ_THRESHOLD = getConfiguredLargeReadThreshold();
	private static final int SOFT_FAIL_CAP_BYTES = 32 * 1024 * 1024; // 32MB

	/*-----------*
	 * Variables *
	 *-----------*/

	private final NioFile nioFile;

	private final boolean forceSync;

	// cached file size, also reflects buffer usage
	private volatile long nioFileSize;

	// 4KB write buffer that is flushed on sync, close and any read operations
	private final ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);

	/*--------------*
	 * Constructors *
	 *--------------*/

	public DataFile(File file) throws IOException {
		this(file, false);
	}

	public DataFile(File file, boolean forceSync) throws IOException {
		this.nioFile = new NioFile(file);
		this.forceSync = forceSync;

		try {
			// Open a read/write channel to the file

			if (nioFile.size() == 0) {
				// Empty file, write header
				nioFile.writeBytes(MAGIC_NUMBER, 0);
				nioFile.writeByte(FILE_FORMAT_VERSION, MAGIC_NUMBER.length);

				sync();
			} else if (nioFile.size() < HEADER_LENGTH) {
				throw new IOException("File too small to be a compatible data file");
			} else {
				// Verify file header
				if (!Arrays.equals(MAGIC_NUMBER, nioFile.readBytes(0, MAGIC_NUMBER.length))) {
					throw new IOException("File doesn't contain compatible data records");
				}

				byte version = nioFile.readByte(MAGIC_NUMBER.length);
				if (version > FILE_FORMAT_VERSION) {
					throw new IOException("Unable to read data file; it uses a newer file format");
				} else if (version != FILE_FORMAT_VERSION) {
					throw new IOException("Unable to read data file; invalid file format version: " + version);
				}
			}
		} catch (IOException e) {
			this.nioFile.close();
			throw e;
		}

		this.nioFileSize = nioFile.size();

	}

	/*---------*
	 * Methods *
	 *---------*/

	public File getFile() {
		return nioFile.getFile();
	}

	/**
	 * Returns the current file size (after flushing any pending writes).
	 */
	public long getFileSize() throws IOException {
		flush();
		return nioFileSize;
	}

	/**
	 * Attempts to recover data bytes between two known entry offsets when the length field at {@code startOffset} is
	 * corrupt (e.g., zero). This returns up to {@code endOffset - startOffset - 4} bytes starting after the length
	 * field, capped to a reasonable maximum to avoid large allocations.
	 */
	public byte[] tryRecoverBetweenOffsets(long startOffset, long endOffset) throws IOException {
		flush();
		if (endOffset <= startOffset + 4) {
			return new byte[0];
		}
		long available = endOffset - (startOffset + 4);
		int cap = 32 * 1024 * 1024; // 32MB cap for recovery
		int toRead = (int) Math.min(Math.max(available, 0), cap);
		return nioFile.readBytes(startOffset + 4L, toRead);
	}

	/**
	 * Stores the specified data and returns the byte-offset at which it has been stored.
	 *
	 * @param data The data to store, must not be <var>null</var>.
	 * @return The byte-offset in the file at which the data was stored.
	 */
	synchronized public long storeData(byte[] data) throws IOException {
		assert data != null : "data must not be null";

		long offset = nioFileSize;

		if (data.length + 4 > buffer.capacity()) {
			// direct write because we are writing more data than the buffer can hold

			flush();

			// TODO: two writes could be more efficient since it prevent array copies
			ByteBuffer buf = ByteBuffer.allocate(data.length + 4);
			buf.putInt(data.length);
			buf.put(data);
			buf.rewind();

			nioFile.write(buf, offset);

			nioFileSize += buf.array().length;

		} else {
			if (data.length + 4 > remainingBufferCapacity()) {
				flush();
			}

			buffer.putInt(data.length);
			buffer.put(data);
			nioFileSize += data.length + 4;
		}

		return offset;

	}

	synchronized private void flush() throws IOException {
		int position = buffer.position();
		if (position == 0) {
			return;
		}
		buffer.position(0);

		byte[] byteToWrite = new byte[position];
		buffer.get(byteToWrite, 0, position);

		nioFile.write(ByteBuffer.wrap(byteToWrite), nioFileSize - byteToWrite.length);

		buffer.position(0);

	}

	synchronized private int remainingBufferCapacity() {
		return buffer.capacity() - buffer.position();
	}

	// This variable is used for predicting the number of bytes to read in getData(long offset). This helps us to only
	// need to execute a single IO read instead of first one read to find the length and then one read to read the data.
	int dataLengthApproximateAverage = 25;

	/**
	 * Gets the data that is stored at the specified offset.
	 *
	 * @param offset An offset in the data file, must be larger than 0.
	 * @return The data that was found on the specified offset.
	 * @throws IOException If an I/O error occurred.
	 */
	public byte[] getData(long offset) throws IOException {
		assert offset > 0 : "offset must be larger than 0, is: " + offset;
		flush();

		// Read in twice the average length because multiple small read operations take more time than one single larger
		// operation even if that larger operation is unnecessarily large (within sensible limits).
		byte[] data = new byte[(dataLengthApproximateAverage * 2) + 4];
		ByteBuffer buf = ByteBuffer.wrap(data);
		nioFile.read(buf, offset);

		int dataLength = (data[0] << 24) & 0xff000000 |
				(data[1] << 16) & 0x00ff0000 |
				(data[2] << 8) & 0x0000ff00 |
				(data[3]) & 0x000000ff;

		// Validate and possibly reduce the length before allocating a large array
		dataLength = guardedDataLength(dataLength);

		try {

			// We have either managed to read enough data and can return the required subset of the data, or we have
			// read
			// too little so we need to execute another read to get the correct data.
			if (dataLength <= data.length - 4) {

				// adjust the approximate average with 1 part actual length and 99 parts previous average up to a
				// sensible
				// max of 200
				dataLengthApproximateAverage = (int) Math.max(0, Math.min(200,
						((dataLengthApproximateAverage / 100.0) * 99) + (dataLength / 100.0)));

				int i = dataLength + 4;
				if (i < 0 || i > data.length) {
					throw new IOException("Corrupt data record at offset " + offset + ". Data length: " + dataLength);
				}

				return Arrays.copyOfRange(data, 4, i);

			} else {

				// adjust the approximate average, but favour the actual dataLength since dataLength predictions misses
				// are costly
				dataLengthApproximateAverage = Math.max(0,
						Math.min(200, (dataLengthApproximateAverage + dataLength) / 2));

				// we didn't read enough data so we need to execute a new read
				data = new byte[dataLength];
				buf = ByteBuffer.wrap(data);
				nioFile.read(buf, offset + 4L);

				return data;
			}
		} catch (OutOfMemoryError e) {
			if (dataLength > LARGE_READ_THRESHOLD) {
				logger.error(
						"Trying to read large amounts of data may be a sign of data corruption. Consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true");
			}
			throw e;
		}

	}

	/**
	 * For very large reads, ensure there appears to be sufficient free heap to allocate the requested record. If soft
	 * fail mode is enabled and insufficient memory is observed, returns a reduced cap to allow recovery; otherwise
	 * throws an IOException with guidance for remediation.
	 */
	private int guardedDataLength(int requested) throws IOException {
		if (requested <= 0) {
			return requested;
		}

		// Soft-fail corruption cap remains in effect for oversized claims
		if (requested > LARGE_READ_THRESHOLD && SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
			logger.error(
					"Data length is {}MB which is larger than {}MB. This is likely data corruption. Truncating length to {} MB.",
					requested / (1024 * 1024), LARGE_READ_THRESHOLD / (1024 * 1024),
					SOFT_FAIL_CAP_BYTES / (1024 * 1024));
			return SOFT_FAIL_CAP_BYTES;
		}

		if (requested <= LARGE_READ_THRESHOLD) {
			return requested;
		}

		Runtime rt = Runtime.getRuntime();
		for (int i = 0; i < 6; i++) { // initial check + up to 5 GC attempts
			long free = getFreeMemory(rt);
			if (free >= requested) {
				return requested;
			}
			if (i < 5) {
				System.gc();
				try {
					TimeUnit.MILLISECONDS.sleep(1);
				} catch (InterruptedException ie) {
					Thread.currentThread().interrupt();
					break;
				}
			}
		}

		long free = getFreeMemory(rt);
		if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
			logger.error(
					"Attempt to read {} MB but only {} MB free heap available. Truncating to {} MB due to soft-fail mode.",
					requested / (1024 * 1024), free / (1024 * 1024), SOFT_FAIL_CAP_BYTES / (1024 * 1024));
			return SOFT_FAIL_CAP_BYTES;
		}
		throw new IOException("Attempt to read " + (requested / (1024 * 1024)) + " MB but only "
				+ (free / (1024 * 1024))
				+ " MB free heap available. This may indicate corrupted data length. Consider enabling soft-fail mode via system property 'org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes'=true to attempt recovery.");
	}

	@InternalUseOnly
	public long getFreeMemory(Runtime rt) {
		// this method is overridden in tests to simulate low-heap conditions
		long allocated = rt.totalMemory() - rt.freeMemory();
		return (rt.maxMemory() - allocated);
	}

	private static int getConfiguredLargeReadThreshold() {
		int defaultThreshold = defaultLargeReadThreshold();
		String configured = System.getProperty(LARGE_READ_THRESHOLD_PROPERTY);
		if (configured == null || configured.isBlank()) {
			logger.debug(
					"Using default large read threshold of {} MB. To configure, set system property {} to a positive integer value in bytes.",
					defaultThreshold / (1024 * 1024), LARGE_READ_THRESHOLD_PROPERTY);
			return defaultThreshold;
		}
		try {
			int parsed = Integer.parseInt(configured.trim());
			if (parsed <= 0) {
				logger.warn(
						"Ignoring non-positive value {} for system property {}. Falling back to {} MB.",
						configured, LARGE_READ_THRESHOLD_PROPERTY, defaultThreshold / (1024 * 1024));
				return defaultThreshold;
			}
			return parsed;
		} catch (NumberFormatException e) {
			logger.warn(
					"Ignoring non-numeric value {} for system property {}. Falling back to {} MB.",
					configured, LARGE_READ_THRESHOLD_PROPERTY, defaultThreshold / (1024 * 1024));
			return defaultThreshold;
		}
	}

	private static int defaultLargeReadThreshold() {
		long maxMemory = Runtime.getRuntime().maxMemory();
		if (maxMemory <= 0) {
			return FALLBACK_LARGE_READ_THRESHOLD;
		}
		long threshold = maxMemory / 16L;
		if (threshold <= 0) {
			return FALLBACK_LARGE_READ_THRESHOLD;
		}
		return Math.toIntExact(Math.min(threshold, Integer.MAX_VALUE));
	}

	/**
	 * Discards all stored data.
	 *
	 * @throws IOException If an I/O error occurred.
	 */
	synchronized public void clear() throws IOException {
		nioFile.truncate(HEADER_LENGTH);
		nioFileSize = HEADER_LENGTH;
		buffer.clear();
	}

	/**
	 * Syncs any unstored data to the hash file.
	 */
	synchronized public void sync() throws IOException {
		flush();

		if (forceSync) {
			nioFile.force(false);
		}
	}

	synchronized public void sync(boolean force) throws IOException {
		flush();

		nioFile.force(force);
	}

	/**
	 * Closes the data file, releasing any file locks that it might have.
	 *
	 * @throws IOException
	 */
	@Override
	synchronized public void close() throws IOException {
		flush();
		nioFile.force(true);
		nioFile.close();
	}

	/**
	 * Gets an iterator that can be used to iterate over all stored data.
	 *
	 * @return a DataIterator.
	 */
	public DataIterator iterator() {
		try {
			flush();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		return new DataIterator();
	}

	/**
	 * An iterator that iterates over the data that is stored in a data file.
	 */
	public class DataIterator {

		private long position = HEADER_LENGTH;

		public boolean hasNext() {
			return position < nioFileSize;
		}

		public byte[] next() throws IOException {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}

			byte[] data = getData(position);
			position += (4 + data.length);
			return data;
		}
	}
}