AbstractMemoryOverflowModel.java

/*******************************************************************************
 * Copyright (c) 2024 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 ******************************************************************************/

package org.eclipse.rdf4j.model.impl;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

import javax.management.NotificationEmitter;
import javax.management.openmbean.CompositeData;

import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;

@InternalUseOnly
public abstract class AbstractMemoryOverflowModel<T extends AbstractModel> extends AbstractModel {

	private static final long serialVersionUID = 4119844228099208169L;

	private static final Runtime RUNTIME = Runtime.getRuntime();

	/**
	 * The default batch size is 1024. This is the number of statements that will be written to disk at a time.
	 */
	@SuppressWarnings("StaticNonFinalField")
	public static int BATCH_SIZE = 1024;

	/**
	 * ms GC activity over the past second that triggers overflow to disk
	 */
	@SuppressWarnings("StaticNonFinalField")
	public static int MEMORY_THRESHOLD_HIGH = 300;

	/**
	 * ms GC activity over the past second that disables overflow to disk
	 */
	@SuppressWarnings("StaticNonFinalField")
	public static int MEMORY_THRESHOLD_MEDIUM = 200;

	/**
	 * ms GC activity over the past second that skips overflow to disk in anticipation of GC freeing up memory
	 */
	@SuppressWarnings("StaticNonFinalField")
	public static int MEMORY_THRESHOLD_LOW = 100;

	private static volatile boolean overflow;

	// To reduce the chance of OOM we will always overflow once we get close to running out of memory even if we think
	// we have space for one more block. The limit is currently set at 32 MB for small heaps and 128 MB for large heaps.
	/**
	 * The minimum amount of free memory before overflowing to disk. Defaults to 32 MB for heaps smaller than 1 GB and
	 * 128 MB for heaps larger than 1 GB.
	 */
	@SuppressWarnings("StaticNonFinalField")
	public static int MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING = RUNTIME.maxMemory() >= 1024 * 1024 * 1024 ? 128 : 32;

	static final Logger logger = LoggerFactory.getLogger(AbstractMemoryOverflowModel.class);

	private volatile LinkedHashModel memory;

	protected transient volatile T disk;

	private final SimpleValueFactory vf = SimpleValueFactory.getInstance();

	private static final int[] GC_LOAD = new int[10];
	private static int prevBucket;

	private static volatile boolean highGcLoad = false;
	private static final Queue<GcInfo> gcInfos = new ConcurrentLinkedQueue<>();

	// if we are in a low memory situation and the GC load is low we will not overflow to disk size it is likely that
	// the GC will be able to free up enough memory
	private static volatile boolean lowMemLowGcSum = false;

	private final ReentrantLock lock = new ReentrantLock();

	static {
		List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
		for (GarbageCollectorMXBean gcBean : gcBeans) {
			NotificationEmitter emitter = (NotificationEmitter) gcBean;
			emitter.addNotificationListener((notification, o) -> {

				long currentBucket = (System.currentTimeMillis() / 100) % 10;
				while (currentBucket != prevBucket) {
					prevBucket = (prevBucket + 1) % 10;
					GC_LOAD[prevBucket] = 0;
				}

				while (true) {
					GcInfo poll = gcInfos.poll();
					if (poll == null) {
						break;
					}
					GC_LOAD[(int) currentBucket] += (int) poll.getDuration();
				}

				// extract garbage collection information from notification.
				GarbageCollectionNotificationInfo gcNotificationInfo = GarbageCollectionNotificationInfo
						.from((CompositeData) notification.getUserData());
				GcInfo gcInfo = gcNotificationInfo.getGcInfo();
				gcInfos.add(gcInfo);
				long gcSum = 0;
				long gcMax = 0;
				for (int i : GC_LOAD) {
					gcSum += i;
					if (i > gcMax) {
						gcMax = i;
					}
				}

				if (gcSum < gcMax * 1.3) {
					gcSum -= (gcMax / 2);
				}

				double v = mbFree();
				if (!highGcLoad && !lowMemLowGcSum && v < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING
						&& gcSum < MEMORY_THRESHOLD_LOW) {
					lowMemLowGcSum = true;
					return;
				}

				lowMemLowGcSum = false;

				if (!highGcLoad && v < 256
						&& (gcSum > MEMORY_THRESHOLD_HIGH || v < MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING)) {
					logger.debug("High GC load detected. Free memory: {} MB, GC sum: {} ms in past 1000 ms", v, gcSum);
					highGcLoad = true;
				} else if ((v > 256 || gcSum < MEMORY_THRESHOLD_MEDIUM) && highGcLoad
						&& v > MIN_AVAILABLE_MEM_BEFORE_OVERFLOWING) {
					logger.debug("GC load back to normal. Free memory: {} MB, GC sum: {} ms in past 1000 ms", v, gcSum);
					highGcLoad = false;
				}

			}, null, null);
		}
	}

	private static double mbFree() {
		// maximum heap size the JVM can allocate
		long maxMemory = RUNTIME.maxMemory();

		// total currently allocated JVM memory
		long totalMemory = RUNTIME.totalMemory();

		// amount of memory free in the currently allocated JVM memory
		long freeMemory = RUNTIME.freeMemory();

		// estimated memory used
		long used = totalMemory - freeMemory;

		// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
		long freeToAllocateMemory = maxMemory - used;

		return (freeToAllocateMemory / 1024.0 / 1024.0);

	}

	private volatile boolean closed;

	public AbstractMemoryOverflowModel() {
		memory = new LinkedHashModel();
	}

	public AbstractMemoryOverflowModel(Set<Namespace> namespaces) {
		memory = new LinkedHashModel(namespaces, 0);
	}

	@Override
	public synchronized Set<Namespace> getNamespaces() {
		return memory.getNamespaces();
	}

	@Override
	public synchronized Optional<Namespace> getNamespace(String prefix) {
		return memory.getNamespace(prefix);
	}

	@Override
	public synchronized Namespace setNamespace(String prefix, String name) {
		return memory.setNamespace(prefix, name);
	}

	@Override
	public void setNamespace(Namespace namespace) {
		memory.setNamespace(namespace);
	}

	@Override
	public synchronized Optional<Namespace> removeNamespace(String prefix) {
		return memory.removeNamespace(prefix);
	}

	@Override
	public boolean contains(Resource subj, IRI pred, Value obj, Resource... contexts) {
		return getDelegate().contains(subj, pred, obj, contexts);
	}

	@Override
	public boolean add(Resource subj, IRI pred, Value obj, Resource... contexts) {
		checkMemoryOverflow();
		return getDelegate().add(subj, pred, obj, contexts);
	}

	@Override
	public boolean add(Statement st) {
		checkMemoryOverflow();
		return getDelegate().add(st);
	}

	@Override
	public boolean addAll(Collection<? extends Statement> c) {
		checkMemoryOverflow();
		if (disk != null || c.size() <= BATCH_SIZE) {
			return getDelegate().addAll(c);
		} else {
			boolean ret = false;
			HashSet<Statement> buffer = new HashSet<>();
			for (Statement st : c) {
				buffer.add(st);
				if (buffer.size() >= BATCH_SIZE) {
					ret |= getDelegate().addAll(buffer);
					buffer.clear();
					checkMemoryOverflow();
				}
			}
			if (!buffer.isEmpty()) {
				ret |= getDelegate().addAll(buffer);
				buffer.clear();
			}

			return ret;

		}

	}

	@Override
	public boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
		return getDelegate().remove(subj, pred, obj, contexts);
	}

	@Override
	public int size() {
		return getDelegate().size();
	}

	@Override
	public Iterator<Statement> iterator() {
		return getDelegate().iterator();
	}

	@Override
	public boolean clear(Resource... contexts) {
		return getDelegate().clear(contexts);
	}

	@Override
	public Model filter(final Resource subj, final IRI pred, final Value obj, final Resource... contexts) {
		return new FilteredModel(this, subj, pred, obj, contexts) {

			private static final long serialVersionUID = -475666402618133101L;

			@Override
			public int size() {
				return getDelegate().filter(subj, pred, obj, contexts).size();
			}

			@Override
			public Iterator<Statement> iterator() {
				return getDelegate().filter(subj, pred, obj, contexts).iterator();
			}

			@Override
			protected void removeFilteredTermIteration(Iterator<Statement> iter, Resource subj, IRI pred, Value obj,
					Resource... contexts) {
				AbstractMemoryOverflowModel.this.removeTermIteration(iter, subj, pred, obj, contexts);
			}
		};
	}

	@Override
	public synchronized void removeTermIteration(Iterator<Statement> iter, Resource subj, IRI pred, Value obj,
			Resource... contexts) {
		if (disk == null) {
			memory.removeTermIteration(iter, subj, pred, obj, contexts);
		} else {
			disk.removeTermIteration(iter, subj, pred, obj, contexts);
		}
	}

	private Model getDelegate() {
		var memory = this.memory;
		if (memory != null) {
			return memory;
		} else {
			var disk = this.disk;
			if (disk != null) {
				return disk;
			}

			try {
				lock.lockInterruptibly();
				try {
					if (this.memory != null) {
						return this.memory;
					}
					if (this.disk != null) {
						return this.disk;
					}
					if (closed) {
						throw new IllegalStateException("MemoryOverflowModel is closed");
					}
					throw new IllegalStateException("MemoryOverflowModel is in an inconsistent state");
				} finally {
					lock.unlock();
				}

			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new RuntimeException(e);
			}

		}
	}

	private void writeObject(ObjectOutputStream s) throws IOException {
		// Write out any hidden serialization magic
		s.defaultWriteObject();
		// Write in size
		Model delegate = getDelegate();
		s.writeInt(delegate.size());
		// Write in all elements
		for (Statement st : delegate) {
			Resource subj = st.getSubject();
			IRI pred = st.getPredicate();
			Value obj = st.getObject();
			Resource ctx = st.getContext();
			s.writeObject(vf.createStatement(subj, pred, obj, ctx));
		}
	}

	private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
		// Read in any hidden serialization magic
		s.defaultReadObject();
		// Read in size
		int size = s.readInt();
		// Read in all elements
		for (int i = 0; i < size; i++) {
			add((Statement) s.readObject());
		}
	}

	private synchronized void checkMemoryOverflow() {
		try {
			lock.lockInterruptibly();
			try {

				if (disk == getDelegate()) {
					return;
				}

				if (overflow || highGcLoad) {
					logger.debug("Syncing triples to disk due to gc load");
					overflowToDisk();
					if (!highGcLoad) {
						overflow = false;
					}
				}
			} finally {
				lock.unlock();
			}

		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new RuntimeException(e);
		}

	}

	private synchronized void overflowToDisk() {

		try {
			lock.lockInterruptibly();
			try {
				overflow = true;
				if (memory == null) {
					assert disk != null;
					return;
				}

				LinkedHashModel memory = this.memory;
				this.memory = null;
				overflowToDiskInner(memory);

				logger.debug("overflow synced to disk");
				System.gc();
			} finally {
				lock.unlock();
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new RuntimeException(e);
		}

	}

	protected abstract void overflowToDiskInner(Model memory);
}