LockTracking.java

/*******************************************************************************
 * Copyright (c) 2022 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.common.concurrent.locks.diagnostics;

import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.Properties;
import org.slf4j.Logger;

/**
 * Full tracking of locks with simple deadlock detection and logging as well as automatic release of abandoned locks
 * (same as LockCleaner).
 *
 * @author H��vard M. Ottestad
 */
@InternalUseOnly
public class LockTracking<T extends Lock> implements LockMonitoring<T> {

	public static final int LOGGED_STALLED_LOCKS_MINIMUM_WAIT_TO_COLLECT = 1000;

	private final Logger logger;

	private final static ConcurrentCleaner cleaner = new ConcurrentCleaner();
	private final static AtomicLong seq = new AtomicLong();

	private final ReentrantLock staleLoggingLock = new ReentrantLock();

	// locks that have not been GCed yet
	private final Map<SimpleLock<T>, WeakReference<SimpleLock<T>>> locks = Collections
			.synchronizedMap(new WeakHashMap<>());

	private final Lock.ExtendedSupplier<T> supplier;
	private final boolean stacktrace;
	private final int waitToCollect;
	private final String alias;
	private int currentWaitToCollect;
	private long previousCleanup = 0;
	private long previousActiveLocksSignature = 0;

	public LockTracking(boolean stacktrace, String alias, Logger logger, int waitToCollect,
			Lock.ExtendedSupplier<T> supplier) {
		this.stacktrace = stacktrace;
		this.supplier = supplier;
		this.waitToCollect = waitToCollect;
		this.currentWaitToCollect = waitToCollect;
		this.logger = logger;
		this.alias = alias;
	}

	private long getActiveLocksSignature() {

		synchronized (locks) {
			return locks.keySet()
					.stream()
					.filter(Objects::nonNull)
					.filter(SimpleLock::isActive)
					.mapToLong(s -> s.state.acquiredId)
					.sum();
		}

	}

	private void logStalledLocks() {
		Thread currentThread = Thread.currentThread();
		synchronized (locks) {
			locks.keySet().stream().filter(Objects::nonNull).filter(SimpleLock::isActive).forEach(simpleLock -> {
				if (simpleLock.state.thread == currentThread) {
					logger.warn("{} is possibly deadlocked waiting on \"{}\" with id {} acquired in the same thread",
							currentThread.getName(), simpleLock.state.alias, simpleLock.state.acquiredId,
							simpleLock.state.stack);
				} else {
					logger.info(
							"Current thread ({}) is waiting on a possibly stalled lock \"{}\" with id {} acquired in {}",
							currentThread.getName(), simpleLock.state.alias, simpleLock.state.acquiredId,
							simpleLock.state.thread.getName(), simpleLock.state.stack);
				}
			});
		}

	}

	public void runCleanup() {
		if (previousCleanup == 0) {
			previousCleanup = System.currentTimeMillis();
		} else {
			if (System.currentTimeMillis() - previousCleanup > currentWaitToCollect) {

				boolean locked = false;

				try {
					locked = staleLoggingLock.tryLock();

					if (locked) {
						if (System.currentTimeMillis() - previousCleanup > currentWaitToCollect) {
							// if something should fail we don't want to perform a new cleanup immediately
							previousCleanup = 0;

							System.gc();
							long activeLocksSignature = getActiveLocksSignature();
							if (previousActiveLocksSignature == activeLocksSignature) {
								logStalledLocks();
								// avoid logging the same stalled locks over and over when waitToCollect is very small
								currentWaitToCollect = Math.max(currentWaitToCollect,
										LOGGED_STALLED_LOCKS_MINIMUM_WAIT_TO_COLLECT);
							} else {
								currentWaitToCollect = waitToCollect;
							}
							previousActiveLocksSignature = activeLocksSignature;
							previousCleanup = System.currentTimeMillis();
						}
					}
				} finally {
					if (locked) {
						staleLoggingLock.unlock();
					}
				}

			}
		}
	}

	public Lock getLock() throws InterruptedException {
		return getLock(alias);
	}

	public Lock getLock(String alias) throws InterruptedException {
		return getLockInner(supplier.getLock(), alias);
	}

	@Override
	public T unsafeInnerLock(Lock lock) {
		if (lock instanceof SimpleLock) {
			return ((SimpleLock<T>) lock).state.lock;
		} else {
			throw new IllegalArgumentException("Supplied lock is not instanceof SimpleLock");
		}
	}

	@Override
	public Lock tryLock() {

		T lock = supplier.tryLock();
		if (lock != null) {
			return getLockInner(lock, alias);
		}

		return null;
	}

	private SimpleLock<T> getLockInner(T lock, String alias) {
		Thread thread = Thread.currentThread();

		long sequenceNumber = seq.incrementAndGet();

		Throwable stack;
		if (stacktrace) {
			stack = new Throwable(alias + " lock " + sequenceNumber + " acquired in " + thread.getName());
		} else {
			stack = null;
		}

		SimpleLock<T> simpleLock = new SimpleLock<>(lock, alias, sequenceNumber, stack, thread, logger);

		locks.put(simpleLock, new WeakReference<>(simpleLock));

		return simpleLock;
	}

	@Override
	public boolean requiresManualCleanup() {
		return true;
	}

	@Override
	public Lock register(T lock) {
		return getLockInner(lock, alias);
	}

	@Override
	public void unregister(Lock lock) {
		assert !lock.isActive();
		if (lock instanceof SimpleLock) {
			((SimpleLock<?>) lock).cleanable.clean();
		} else {
			throw new IllegalArgumentException("Supplied lock is not instanceof SimpleLock");
		}
	}

	public static class SimpleLock<T extends Lock> implements Lock {

		private final State<T> state;
		private final Cleaner.Cleanable cleanable;

		public SimpleLock(T lock, String alias, long acquiredId, Throwable stack, Thread thread, Logger logger) {
			this.state = new State<>(lock, alias, acquiredId, stack, thread, logger);
			this.cleanable = cleaner.register(this, state);
		}

		@Override
		public boolean isActive() {
			return state.lock.isActive();
		}

		@Override
		public void release() {
			state.lock.release();
			cleanable.clean();
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			if (!(o instanceof SimpleLock)) {
				return false;
			}
			SimpleLock that = (SimpleLock) o;
			return state.acquiredId == that.state.acquiredId;
		}

		@Override
		public int hashCode() {
			return Long.hashCode(state.acquiredId);
		}

		static class State<T extends Lock> implements Runnable {

			private final T lock;
			private final String alias;
			private final long acquiredId;
			private final Throwable stack;
			private final Thread thread;
			private final Logger logger;

			public State(T lock, String alias, long acquiredId, Throwable stack, Thread thread, Logger logger) {
				this.lock = lock;
				this.alias = alias;
				this.acquiredId = acquiredId;
				this.stack = stack;
				this.logger = logger;
				this.thread = thread;
			}

			public void run() {
				// cleanup action accessing State, executed at most once
				if (lock.isActive()) {
					lock.release();
					logAbandoned(logger);
				}
			}

			void logAbandoned(Logger logger) {
				if (stack == null) {
					logger.warn(
							"\"{}\" lock abandoned; lock was acquired in {}; consider setting the {} system property",
							alias, thread.getName(), Properties.TRACK_LOCKS);
				} else {
					logger.warn("\"{}\" lock abandoned; lock was acquired in {}", alias, thread.getName(), stack);
				}
			}

		}

	}
}