RangeIterator.java

/*******************************************************************************
 * Copyright (c) 2018 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.btree;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.common.io.ByteArrayUtil;

class RangeIterator implements RecordIterator, NodeListener {

	private final BTree tree;

	private final byte[] searchKey;

	private final byte[] searchMask;

	private final byte[] minValue;

	private final byte[] maxValue;

	private volatile boolean started;

	private volatile Node currentNode;

	private final AtomicBoolean revisitValue = new AtomicBoolean();

	/**
	 * Tracks the parent nodes of {@link #currentNode}.
	 */
	private final LinkedList<Node> parentNodeStack = new LinkedList<>();

	/**
	 * Tracks the index of child nodes in parent nodes.
	 */
	private final LinkedList<Integer> parentIndexStack = new LinkedList<>();

	private volatile int currentIdx;

	private volatile boolean closed = false;

	public RangeIterator(BTree tree, byte[] searchKey, byte[] searchMask, byte[] minValue, byte[] maxValue) {
		this.tree = tree;
		this.searchKey = searchKey;
		this.searchMask = searchMask;
		this.minValue = minValue;
		this.maxValue = maxValue;
		this.started = false;
	}

	@Override
	public byte[] next() throws IOException {
		tree.btreeLock.readLock().lock();
		try {
			if (!started) {
				started = true;
				findMinimum();
			}

			byte[] value = findNext(revisitValue.getAndSet(false));
			while (value != null) {
				if (maxValue != null && tree.comparator.compareBTreeValues(maxValue, value, 0, value.length) < 0) {
					// Reached maximum value, stop iterating
					close();
					value = null;
					break;
				} else if (searchKey != null && !ByteArrayUtil.matchesPattern(value, searchMask, searchKey)) {
					// Value doesn't match search key/mask
					value = findNext(false);
					continue;
				} else {
					// Matching value found
					break;
				}
			}

			return value;
		} finally {
			tree.btreeLock.readLock().unlock();
		}
	}

	private void findMinimum() {
		Node nextCurrentNode = currentNode = tree.readRootNode();

		if (nextCurrentNode == null) {
			// Empty BTree
			return;
		}

		nextCurrentNode.register(this);
		currentIdx = 0;

		// Search first value >= minValue, or the left-most value in case
		// minValue is null
		while (true) {
			if (minValue != null) {
				currentIdx = nextCurrentNode.search(minValue);

				if (currentIdx >= 0) {
					// Found exact match with minimum value
					break;
				} else {
					// currentIdx indicates the first value larger than the
					// minimum value
					currentIdx = -currentIdx - 1;
				}
			}

			if (nextCurrentNode.isLeaf()) {
				break;
			} else {
				// [SES-725] must change stacks after node loading has succeeded
				Node childNode = nextCurrentNode.getChildNode(currentIdx);
				pushStacks(childNode);
				// pushStacks updates the current node
				nextCurrentNode = currentNode;
			}
		}
	}

	private byte[] findNext(boolean returnedFromRecursion) throws IOException {
		Node nextCurrentNode = currentNode;
		if (nextCurrentNode == null) {
			return null;
		}

		if (returnedFromRecursion || nextCurrentNode.isLeaf()) {
			if (currentIdx >= nextCurrentNode.getValueCount()) {
				// No more values in this node, continue with parent node
				popStacks();
				return findNext(true);
			} else {
				return nextCurrentNode.getValue(currentIdx++);
			}
		} else {
			// [SES-725] must change stacks after node loading has succeeded
			Node childNode = nextCurrentNode.getChildNode(currentIdx);
			pushStacks(childNode);
			return findNext(false);
		}
	}

	@Override
	public void set(byte[] value) {
		tree.btreeLock.readLock().lock();
		try {
			Node nextCurrentNode = currentNode;
			if (nextCurrentNode == null || currentIdx > nextCurrentNode.getValueCount()) {
				throw new IllegalStateException();
			}

			nextCurrentNode.setValue(currentIdx - 1, value);
		} finally {
			tree.btreeLock.readLock().unlock();
		}
	}

	@Override
	public void close() throws IOException {
		if (!closed) {
			synchronized (this) {
				if (!closed) {
					closed = true;
					tree.btreeLock.readLock().lock();
					try {
						while (popStacks()) {
						}

						assert parentNodeStack.isEmpty();
						assert parentIndexStack.isEmpty();
					} finally {
						tree.btreeLock.readLock().unlock();
					}
				}
			}
		}
	}

	private void pushStacks(Node newChildNode) {
		newChildNode.register(this);
		parentNodeStack.add(currentNode);
		parentIndexStack.add(currentIdx);
		currentNode = newChildNode;
		currentIdx = 0;
	}

	private synchronized boolean popStacks() throws IOException {
		Node nextCurrentNode = currentNode;
		if (nextCurrentNode == null) {
			// There's nothing to pop
			return false;
		}

		nextCurrentNode.deregister(this);
		nextCurrentNode.release();

		if (!parentNodeStack.isEmpty()) {
			currentNode = parentNodeStack.removeLast();
			currentIdx = parentIndexStack.removeLast();
			return true;
		} else {
			currentNode = null;
			currentIdx = 0;
			return false;
		}
	}

	@Override
	public boolean valueAdded(Node node, int addedIndex) {
		assert tree.btreeLock.isWriteLockedByCurrentThread();

		if (node == currentNode) {
			if (addedIndex < currentIdx) {
				currentIdx++;
			}
		} else {
			for (int i = 0; i < parentNodeStack.size(); i++) {
				if (node == parentNodeStack.get(i)) {
					int parentIdx = parentIndexStack.get(i);
					if (addedIndex < parentIdx) {
						parentIndexStack.set(i, parentIdx + 1);
					}

					break;
				}
			}
		}

		return false;
	}

	@Override
	public boolean valueRemoved(Node node, int removedIndex) {
		assert tree.btreeLock.isWriteLockedByCurrentThread();

		if (node == currentNode) {
			if (removedIndex < currentIdx) {
				currentIdx--;
			}
		} else {
			for (int i = 0; i < parentNodeStack.size(); i++) {
				if (node == parentNodeStack.get(i)) {
					int parentIdx = parentIndexStack.get(i);
					if (removedIndex < parentIdx) {
						parentIndexStack.set(i, parentIdx - 1);
					}

					break;
				}
			}
		}

		return false;
	}

	@Override
	public boolean rotatedLeft(Node node, int valueIndex, Node leftChildNode, Node rightChildNode) throws IOException {
		Node nextCurrentNode = currentNode;
		if (nextCurrentNode == node) {
			if (valueIndex == currentIdx - 1) {
				// the value that was removed had just been visited
				currentIdx = valueIndex;
				revisitValue.set(true);

				if (!node.isLeaf()) {
					pushStacks(leftChildNode);
					leftChildNode.use();
				}
			}
		} else if (nextCurrentNode == rightChildNode) {
			if (currentIdx == 0) {
				// the value that would be visited next has been moved to the
				// parent node
				popStacks();
				currentIdx = valueIndex;
				revisitValue.set(true);
			}
		} else {
			for (int i = 0; i < parentNodeStack.size(); i++) {
				Node stackNode = parentNodeStack.get(i);

				if (stackNode == rightChildNode) {
					int stackIdx = parentIndexStack.get(i);

					if (stackIdx == 0) {
						// this node is no longer the parent, replace with left
						// sibling
						rightChildNode.deregister(this);
						rightChildNode.release();

						leftChildNode.use();
						leftChildNode.register(this);

						parentNodeStack.set(i, leftChildNode);
						parentIndexStack.set(i, leftChildNode.getValueCount());
					}

					break;
				}
			}
		}

		return false;
	}

	@Override
	public boolean rotatedRight(Node node, int valueIndex, Node leftChildNode, Node rightChildNode) throws IOException {
		for (int i = 0; i < parentNodeStack.size(); i++) {
			Node stackNode = parentNodeStack.get(i);

			if (stackNode == leftChildNode) {
				int stackIdx = parentIndexStack.get(i);

				if (stackIdx == leftChildNode.getValueCount()) {
					// this node is no longer the parent, replace with right
					// sibling
					leftChildNode.deregister(this);
					leftChildNode.release();

					rightChildNode.use();
					rightChildNode.register(this);

					parentNodeStack.set(i, rightChildNode);
					parentIndexStack.set(i, 0);
				}

				break;
			}
		}

		return false;
	}

	@Override
	public boolean nodeSplit(Node node, Node newNode, int medianIdx) throws IOException {
		assert tree.btreeLock.isWriteLockedByCurrentThread();

		boolean deregister = false;

		Node nextCurrentNode = currentNode;
		if (node == nextCurrentNode) {
			if (currentIdx > medianIdx) {
				nextCurrentNode.release();
				deregister = true;

				newNode.use();
				newNode.register(this);

				currentNode = newNode;
				currentIdx -= medianIdx + 1;
			}
		} else {
			for (int i = 0; i < parentNodeStack.size(); i++) {
				Node parentNode = parentNodeStack.get(i);

				if (node == parentNode) {
					int parentIdx = parentIndexStack.get(i);

					if (parentIdx > medianIdx) {
						parentNode.release();
						deregister = true;

						newNode.use();
						newNode.register(this);

						parentNodeStack.set(i, newNode);
						parentIndexStack.set(i, parentIdx - medianIdx - 1);
					}

					break;
				}
			}
		}

		return deregister;
	}

	@Override
	public boolean nodeMergedWith(Node sourceNode, Node targetNode, int mergeIdx) throws IOException {
		assert tree.btreeLock.isWriteLockedByCurrentThread();

		boolean deregister = false;

		Node nextCurrentNode = currentNode;
		if (sourceNode == nextCurrentNode) {
			nextCurrentNode.release();
			deregister = true;

			targetNode.use();
			targetNode.register(this);

			currentNode = targetNode;
			currentIdx += mergeIdx;
		} else {
			for (int i = 0; i < parentNodeStack.size(); i++) {
				Node parentNode = parentNodeStack.get(i);

				if (sourceNode == parentNode) {
					parentNode.release();
					deregister = true;

					targetNode.use();
					targetNode.register(this);

					parentNodeStack.set(i, targetNode);
					parentIndexStack.set(i, mergeIdx + parentIndexStack.get(i));

					break;
				}
			}
		}

		return deregister;
	}

	@Override
	public String toString() {
		return "RangeIterator{" +
				"tree=" + tree +
				'}';
	}
}