TxnRecordCache.java

/*******************************************************************************
 * Copyright (c) 2022 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.lmdb;

import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
import static org.lwjgl.util.lmdb.LMDB.MDB_NEXT;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOMETASYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOSYNC;
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open;
import static org.lwjgl.util.lmdb.LMDB.mdb_del;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_close;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_create;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_open;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_mapsize;
import static org.lwjgl.util.lmdb.LMDB.mdb_env_set_maxdbs;
import static org.lwjgl.util.lmdb.LMDB.mdb_get;
import static org.lwjgl.util.lmdb.LMDB.mdb_put;
import static org.lwjgl.util.lmdb.LMDB.mdb_stat;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_commit;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;

import org.apache.commons.io.FileUtils;
import org.eclipse.rdf4j.sail.SailException;
import org.lwjgl.PointerBuffer;
import org.lwjgl.system.MemoryStack;
import org.lwjgl.util.lmdb.MDBStat;
import org.lwjgl.util.lmdb.MDBVal;

/**
 * A cache for quads with an associated value. This cache uses a temporary file to store the records. This file is
 * deleted upon calling {@link #close()}.
 */
final class TxnRecordCache {

	private final Path dbDir;
	private final long env;
	private final int dbiExplicit;
	private final int dbiInferred;
	private long writeTxn;
	private long mapSize = 1048576; // 1 MiB
	private long pageSize;

	public TxnRecordCache(File cacheDir) throws IOException {
		try (MemoryStack stack = stackPush()) {
			PointerBuffer pp = stack.mallocPointer(1);
			E(mdb_env_create(pp));
			env = pp.get(0);

			E(mdb_env_set_maxdbs(env, 2));
			E(mdb_env_set_mapsize(env, mapSize));

			int flags = MDB_NOTLS | MDB_NOSYNC | MDB_NOMETASYNC;

			dbDir = Files.createTempDirectory(cacheDir.toPath(), "txncache");
			E(mdb_env_open(env, dbDir.toAbsolutePath().toString(), flags, 0664));
			dbiExplicit = openDatabase(env, "quads", MDB_CREATE, null);
			dbiInferred = openDatabase(env, "quads-inf", MDB_CREATE, null);

			MDBStat stat = MDBStat.malloc(stack);
			readTransaction(env, (stack2, txn) -> {
				E(mdb_stat(txn, dbiExplicit, stat));
				pageSize = stat.ms_psize();
				return null;
			});

			// directly start a write transaction
			E(mdb_txn_begin(env, NULL, 0, pp));
			writeTxn = pp.get(0);
		}
	}

	public void close() throws IOException {
		mdb_env_close(env);
		FileUtils.deleteDirectory(dbDir.toFile());
	}

	protected void commit() throws IOException {
		if (writeTxn != 0) {
			E(mdb_txn_commit(writeTxn));
			writeTxn = 0;
		}
	}

	protected boolean storeRecord(long[] quad, boolean explicit) throws IOException {
		return update(quad, explicit, true);
	}

	protected void removeRecord(long[] quad, boolean explicit) throws IOException {
		update(quad, explicit, false);
	}

	protected boolean update(long[] quad, boolean explicit, boolean add) throws IOException {
		if (LmdbUtil.requiresResize(mapSize, pageSize, writeTxn, 0)) {
			// resize map if required
			E(mdb_txn_commit(writeTxn));
			mapSize = LmdbUtil.autoGrowMapSize(mapSize, pageSize, 0);
			E(mdb_env_set_mapsize(env, mapSize));
			try (MemoryStack stack = stackPush()) {
				PointerBuffer pp = stack.mallocPointer(1);
				E(mdb_txn_begin(env, NULL, 0, pp));
				writeTxn = pp.get(0);
			}
		}
		try (MemoryStack stack = MemoryStack.stackPush()) {
			MDBVal keyVal = MDBVal.malloc(stack);
			// use calloc to get an empty data value
			MDBVal dataVal = MDBVal.calloc(stack);
			ByteBuffer keyBuf = stack.malloc(TripleStore.MAX_KEY_LENGTH);
			Varint.writeListUnsigned(keyBuf, quad);
			keyBuf.flip();
			keyVal.mv_data(keyBuf);

			boolean foundExplicit = mdb_get(writeTxn, dbiExplicit, keyVal, dataVal) == MDB_SUCCESS &&
					(dataVal.mv_data().get(0) & 0b1) != 0;
			boolean foundImplicit = !foundExplicit && mdb_get(writeTxn, dbiInferred, keyVal, dataVal) == MDB_SUCCESS
					&&
					(dataVal.mv_data().get(0) & 0b1) != 0;

			boolean found = foundExplicit || foundImplicit;
			if (add) {
				if (!found || explicit && foundImplicit) {
					if (explicit && foundImplicit) {
						E(mdb_del(writeTxn, dbiInferred, keyVal, dataVal));
					}
					// mark as add
					dataVal.mv_data(stack.bytes((byte) 1));
					E(mdb_put(writeTxn, explicit ? dbiExplicit : dbiInferred, keyVal, dataVal, 0));
				}
				return !found;
			} else {
				if (foundExplicit && explicit || foundImplicit && !explicit) {
					// simply delete quad from cache
					E(mdb_del(writeTxn, explicit ? dbiExplicit : dbiInferred, keyVal, dataVal));
				} else {
					// mark as remove
					dataVal.mv_data(stack.bytes((byte) 0));
					E(mdb_put(writeTxn, explicit ? dbiExplicit : dbiInferred, keyVal, dataVal, 0));
				}
				return true;
			}
		}
	}

	protected RecordCacheIterator getRecords(boolean explicit) throws IOException {
		return new RecordCacheIterator(explicit ? dbiExplicit : dbiInferred);
	}

	static class Record {
		long quad[];
		boolean add;
	}

	protected class RecordCacheIterator {
		private final MDBVal keyData = MDBVal.malloc();
		private final MDBVal valueData = MDBVal.malloc();
		private long txn;
		private final long cursor;
		private final int dbi;
		private final long[] quad = new long[4];

		protected RecordCacheIterator(int dbi) throws IOException {
			this.dbi = dbi;
			try (MemoryStack stack = MemoryStack.stackPush()) {
				PointerBuffer pp = stack.mallocPointer(1);

				E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
				txn = pp.get(0);

				E(mdb_cursor_open(txn, dbi, pp));
				cursor = pp.get(0);
			}
		}

		public Record next() {
			if (mdb_cursor_get(cursor, keyData, valueData, MDB_NEXT) == MDB_SUCCESS) {
				Varint.readListUnsigned(keyData.mv_data(), quad);
				byte op = valueData.mv_data().get(0);
				Record r = new Record();
				r.quad = quad;
				r.add = op == 1;
				return r;
			}
			close();
			return null;
		}

		public void close() {
			if (txn != 0) {
				keyData.close();
				valueData.close();
				mdb_cursor_close(cursor);
				mdb_txn_abort(txn);
				txn = 0;
			}
		}
	}
}