TxnManager.java

/*******************************************************************************
 * Copyright (c) 2022 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.lmdb;

import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E;
import static org.lwjgl.system.MemoryStack.stackPush;
import static org.lwjgl.system.MemoryUtil.NULL;
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_renew;
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_reset;

import java.io.Closeable;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.concurrent.locks.StampedLock;

import org.eclipse.rdf4j.sail.lmdb.LmdbUtil.Transaction;
import org.lwjgl.PointerBuffer;
import org.lwjgl.system.MemoryStack;

/**
 * Manager for LMDB transactions.
 */
class TxnManager {

	private final Mode mode;
	private final IdentityHashMap<Txn, Boolean> active = new IdentityHashMap<>();
	private final long[] pool;
	private final StampedLock lock = new StampedLock();
	private final long env;
	private volatile int poolIndex = -1;

	TxnManager(long env, Mode mode) {
		this.env = env;
		this.mode = mode;
		this.pool = mode == Mode.RESET ? new long[128] : null;
	}

	private long startReadTxn() throws IOException {
		long readTxn;
		try (MemoryStack stack = stackPush()) {
			PointerBuffer pp = stack.mallocPointer(1);
			E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
			readTxn = pp.get(0);
		}
		return readTxn;
	}

	/**
	 * Wraps an existing transaction into a txn reference object.
	 *
	 * @param txn the existing read or write transactions
	 * @return the txn reference object
	 */
	Txn createTxn(long txn) {
		return new Txn(txn) {
			@Override
			public void close() {
				// do nothing
			}
		};
	}

	/**
	 * Creates a new read-only transaction reference.
	 *
	 * @return the new transaction reference
	 * @throws IOException if the transaction cannot be started for some reason
	 */
	Txn createReadTxn() throws IOException {
		Txn txnRef = new Txn(createReadTxnInternal());
		synchronized (active) {
			active.put(txnRef, Boolean.TRUE);
		}
		return txnRef;
	}

	long createReadTxnInternal() throws IOException {
		long txn = 0;
		if (mode == Mode.RESET) {
			synchronized (pool) {
				if (poolIndex >= 0) {
					txn = pool[poolIndex--];
				}
			}
			if (txn == 0) {
				txn = startReadTxn();
			} else {
				mdb_txn_renew(txn);
			}
		} else {
			txn = startReadTxn();
		}
		return txn;
	}

	<T> T doWith(Transaction<T> transaction) throws IOException {
		long stamp = lock.readLock();
		T ret;
		try (MemoryStack stack = stackPush()) {
			try (Txn txn = createReadTxn()) {
				ret = transaction.exec(stack, txn.get());
			}
		} finally {
			lock.unlockRead(stamp);
		}
		return ret;
	}

	/**
	 * This lock is used to globally block all transactions by using a {@link StampedLock#writeLock()}. This is required
	 * to block transactions while automatic resizing the memory map.
	 *
	 * @return lock for managed transactions
	 */
	StampedLock lock() {
		return lock;
	}

	void activate() throws IOException {
		synchronized (active) {
			for (Txn txn : active.keySet()) {
				txn.setActive(true);
			}
		}
	}

	void deactivate() throws IOException {
		synchronized (active) {
			for (Txn txn : active.keySet()) {
				txn.setActive(false);
			}
		}
	}

	void reset() throws IOException {
		synchronized (active) {
			for (Txn txn : active.keySet()) {
				txn.reset();
			}
		}
	}

	enum Mode {
		RESET,
		ABORT,
		NONE
	}

	class Txn implements Closeable, AutoCloseable {

		private long txn;
		private long version;

		Txn(long txn) {
			this.txn = txn;
		}

		long get() {
			return txn;
		}

		/**
		 * A {@link StampedLock#readLock()} should be acquired while working with the transaction. This is required to
		 * block transactions while automatic resizing the memory map.
		 *
		 * @return lock for managed transactions
		 */
		StampedLock lock() {
			return lock;
		}

		private void free(long txn) {
			switch (mode) {
			case RESET:
				synchronized (pool) {
					if (poolIndex < pool.length - 1) {
						mdb_txn_reset(txn);
						pool[++poolIndex] = txn;
					} else {
						mdb_txn_abort(txn);
					}
				}
				break;
			case ABORT:
				mdb_txn_abort(txn);
				break;
			case NONE:
				break;
			}
		}

		@Override
		public void close() {
			synchronized (active) {
				active.remove(this);
			}
			free(txn);
		}

		/**
		 * Resets current transaction as it points to "old" data.
		 */
		void reset() throws IOException {
			mdb_txn_reset(txn);
			E(mdb_txn_renew(txn));
			version++;
		}

		/**
		 * Triggers active state of current transaction.
		 */
		void setActive(boolean active) throws IOException {
			if (active) {
				E(mdb_txn_renew(txn));
				version++;
			} else {
				mdb_txn_reset(txn);
			}
		}

		long version() {
			return version;
		}
	}
}