WeakObjectRegistry.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.memory.model;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.WeakReference;
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An object registry that uses weak references to keep track of the stored objects. The registry can be used to
 * retrieve stored objects using another, equivalent object. As such, it can be used to prevent the use of duplicates in
 * another data structure, reducing memory usage. The objects that are being stored should properly implement the
 * {@link Object#equals} and {@link Object#hashCode} methods.
 */
public class WeakObjectRegistry<K, E extends K> extends AbstractSet<E> {

	private static final Logger logger = LoggerFactory.getLogger(WeakObjectRegistry.class);

	/*-----------*
	 * Variables *
	 *-----------*/

	/**
	 * The hash map that is used to store the objects.
	 */
	private final Map<E, WeakReference<E>>[] objectMap;
	private final AdderBasedReadWriteLock[] locks;

	/*--------------*
	 * Constructors *
	 *--------------*/

	/**
	 * Constructs a new, empty object registry.
	 */
	public WeakObjectRegistry() {
		super();
		int concurrency = Runtime.getRuntime().availableProcessors() * 2;

		objectMap = new WeakHashMap[concurrency];
		for (int i = 0; i < objectMap.length; i++) {
			objectMap[i] = new WeakHashMap<>();
		}

		locks = new AdderBasedReadWriteLock[objectMap.length];
		for (int index = 0; index < locks.length; index++) {
			locks[index] = new AdderBasedReadWriteLock();
		}
	}

	/**
	 * Constructs a new WeakObjectRegistry containing the elements in the specified collection.
	 *
	 * @param c The collection whose elements are to be placed into this object registry.
	 * @throws NullPointerException If the specified collection is null.
	 */
	public WeakObjectRegistry(int cacheSize, Collection<? extends E> c) {
		this();
		addAll(c);
	}

	/*---------*
	 * Methods *
	 *---------*/

	/**
	 * Retrieves the stored object that is equal to the supplied <var>key</var> object.
	 *
	 * @param key The object that should be used as the search key for the operation.
	 * @return A stored object that is equal to the supplied key, or <var>null</var> if no such object was found.
	 */
	public E get(K key) {
		if (key == null) {
			return null;
		}

		int index = getIndex(key);
		boolean readLock = locks[index].readLock();
		try {
			Map<E, WeakReference<E>> weakReferenceMap = objectMap[index];

			WeakReference<E> weakRef = weakReferenceMap.get(key);
			if (weakRef != null) {
				return weakRef.get(); // may be null
			} else {
				return null;
			}

		} finally {
			locks[index].unlockReader(readLock);
		}

	}

	private int getIndex(Object key) {
		int i = Math.abs(key.hashCode());
		return i % objectMap.length;
	}

	public AutoCloseableIterator<E> closeableIterator() {
		return new AutoCloseableIterator<>(objectMap, locks);
	}

	@Override
	public Iterator<E> iterator() {
		logger.warn("This method is not thread safe! Use closeableIterator() instead.");
		return new AutoCloseableIterator<>(objectMap, null);
	}

	public static class AutoCloseableIterator<E> implements Iterator<E>, AutoCloseable {

		private final Iterator<Map<E, WeakReference<E>>> iterator;
		private final AdderBasedReadWriteLock[] locks;

		Iterator<E> currentIterator;
		boolean[] readLocks;
		boolean init = false;

		public AutoCloseableIterator(Map<E, WeakReference<E>>[] objectMap, AdderBasedReadWriteLock[] locks) {
			this.iterator = Arrays.asList(objectMap).iterator();
			this.locks = locks;
		}

		public void init() {
			if (!init) {
				init = true;
				if (locks != null) {
					readLocks = new boolean[locks.length];
					for (int index = 0; index < locks.length; index++) {
						readLocks[index] = locks[index].readLock();
					}
				}
				currentIterator = iterator.next().keySet().iterator();
			}
		}

		@Override
		public boolean hasNext() {
			init();
			if (currentIterator == null) {
				return false;
			}
			while (currentIterator != null) {
				if (currentIterator.hasNext()) {
					return true;
				} else {
					currentIterator = null;
					if (iterator.hasNext()) {
						currentIterator = iterator.next().keySet().iterator();
					}
				}
			}

			return false;
		}

		@Override
		public E next() {
			init();
			return currentIterator.next();
		}

		@Override
		public void close() {
			if (init) {
				if (locks != null) {
					for (int index = 0; index < locks.length; index++) {
						if (readLocks[index]) {
							locks[index].unlockReader(readLocks[index]);
							readLocks[index] = false;
						}
					}
				}
			}
		}

	}

	@Override
	public int size() {
		int size = 0;
		for (Map<E, WeakReference<E>> weakReferenceMap : objectMap) {
			size += weakReferenceMap.size();
		}
		return size;
	}

	@Override
	public boolean contains(Object key) {
		return get((K) key) != null;
	}

	@Override
	public boolean add(E object) {
		int index = getIndex(object);
		boolean writeLock = locks[index].writeLock();
		try {
			Map<E, WeakReference<E>> weakReferenceMap = objectMap[index];
			WeakReference<E> ref = new WeakReference<>(object);

			ref = weakReferenceMap.put(object, ref);

			if (ref != null) {
				E e = ref.get();
				if (e != null) {
					// A duplicate was added which replaced the existing object. Undo this operation.
					weakReferenceMap.put(e, ref);
					return false;
				}
			}

			return true;

		} finally {
			locks[index].unlockWriter(writeLock);
		}

	}

	public E getOrAdd(K key, Supplier<E> supplier) {

		int index = getIndex(key);
		Map<E, WeakReference<E>> weakReferenceMap = objectMap[index];

		boolean readLock = locks[index].readLock();
		try {
			WeakReference<E> ref = weakReferenceMap.get(key);
			if (ref != null) {
				E e = ref.get();
				if (e != null) {
					// we found the object
					return e;
				}
			}
		} finally {
			locks[index].unlockReader(readLock);
		}

		// we could not find the object, so we will use the supplier to create a new object and add that
		boolean writeLock = locks[index].writeLock();
		try {
			E object = supplier.get();
			WeakReference<E> ref = weakReferenceMap.put(object, new WeakReference<>(object));
			if (ref != null) {
				E e = ref.get();
				if (e != null) {
					// Between releasing the read-lock and acquiring the write-lock another thread put the object in the
					// weakReferenceMap. We need to put back the one that was there before and return that one to the
					// user.
					weakReferenceMap.put(e, ref);
					object = e;
				}
			}
			assert object != null;
			return object;

		} finally {
			locks[index].unlockWriter(writeLock);
		}

	}

	@Override
	public boolean remove(Object object) {

		int index = getIndex(object);
		boolean writeLock = locks[index].writeLock();
		try {
			Map<E, WeakReference<E>> weakReferenceMap = objectMap[index];
			WeakReference<E> ref = weakReferenceMap.remove(object);
			return ref != null && ref.get() != null;

		} finally {
			locks[index].unlockWriter(writeLock);
		}
	}

	@Override
	public void clear() {

		for (int index = 0; index < objectMap.length; index++) {
			boolean writeLock = locks[index].writeLock();
			try {
				objectMap[index].clear();
			} finally {
				locks[index].unlockWriter(writeLock);
			}
		}

	}

	private static class AdderBasedReadWriteLock {

		// StampedLock for handling writers.
		private volatile boolean writeLocked;

		private static final VarHandle WRITE_LOCKED;

		static {
			try {
				WRITE_LOCKED = MethodHandles.lookup()
						.in(AdderBasedReadWriteLock.class)
						.findVarHandle(AdderBasedReadWriteLock.class, "writeLocked", boolean.class);
			} catch (ReflectiveOperationException e) {
				throw new Error(e);
			}
		}

		// LongAdder for handling readers. When the count is equal then there are no active readers.
		private final LongAdder readersLocked = new LongAdder();
		private final LongAdder readersUnlocked = new LongAdder();

		public boolean readLock() {
			while (true) {
				readersLocked.increment();
				if (!((boolean) WRITE_LOCKED.getAcquire(this))) {
					// Everything is good! We have acquired a read-lock and there are no active writers.
					return true;
				} else {
					// Release our read lock so we don't block any writers.
					readersUnlocked.increment();
					while (((boolean) WRITE_LOCKED.getAcquire(this))) {
						Thread.onSpinWait();
					}
				}
			}
		}

		public void unlockReader(boolean locked) {
			if (locked) {
				readersUnlocked.increment();
			} else {
				throw new IllegalMonitorStateException();
			}
		}

		public boolean writeLock() {

			// Acquire a write-lock.
			boolean writeLocked;
			do {
				writeLocked = WRITE_LOCKED.compareAndSet(this, false, true);
			} while (!writeLocked);

			// Wait for active readers to finish.
			while (true) {
				// The order is important here.
				long unlockedSum = readersUnlocked.sum();
				long lockedSum = readersLocked.sum();
				if (unlockedSum == lockedSum) {
					// No active readers.
					return writeLocked;
				} else {
					Thread.onSpinWait();
				}

			}
		}

		public void unlockWriter(boolean writeLocked) {
			if (writeLocked) {
				// Make sure that readers in other threads will be able to read the writes that were made by the user
				// within the write-locked section. The stamped lock only guarantees that writes are visible to other
				// threads if those threads use a stamped lock read-lock.
				VarHandle.fullFence();
				WRITE_LOCKED.setRelease(this, false);
			}
		}
	}
}