AbstractReadWriteLockManagerTest.java
/*******************************************************************************
* Copyright (c) 2022 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.common.concurrent.locks;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
abstract class AbstractReadWriteLockManagerTest {
AbstractReadWriteLockManager lockManager;
AbstractReadWriteLockManager lockManagerReleaseAbandoned;
AbstractReadWriteLockManager lockManagerReleaseAbandonedStackTrace;
AbstractReadWriteLockManager lockManagerTracking;
private MemoryAppender memoryAppender;
private String className;
@BeforeEach
void beforeEach() {
if (memoryAppender != null) {
memoryAppender.stop();
}
Stream.of(lockManager, lockManagerReleaseAbandoned, lockManagerReleaseAbandonedStackTrace, lockManagerTracking)
.filter(Objects::nonNull)
.forEach(l -> {
assertFalse(l.isWriterActive());
assertFalse(l.isReaderActive());
});
lockManager = null;
lockManagerReleaseAbandoned = null;
lockManagerReleaseAbandonedStackTrace = null;
lockManagerTracking = null;
setUpLockManagers();
assertNotNull(lockManager);
assertNotNull(lockManagerReleaseAbandoned);
assertNotNull(lockManagerReleaseAbandonedStackTrace);
assertNotNull(lockManagerTracking);
assertEquals(lockManager.getClass(), lockManagerReleaseAbandoned.getClass());
assertEquals(lockManagerReleaseAbandoned.getClass(), lockManagerReleaseAbandonedStackTrace.getClass());
assertEquals(lockManagerReleaseAbandonedStackTrace.getClass(), lockManagerTracking.getClass());
className = lockManager.getClass().getName();
Logger logger = (Logger) LoggerFactory.getLogger(lockManager.getClass());
memoryAppender = new MemoryAppender();
memoryAppender.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
logger.detachAndStopAllAppenders();
logger.setLevel(Level.INFO);
logger.addAppender(memoryAppender);
memoryAppender.start();
}
abstract void setUpLockManagers();
@Test
void createWriteLock() throws InterruptedException {
Lock lock = lockManager.getWriteLock();
assertTrue(lock.isActive());
lock.release();
assertFalse(lock.isActive());
}
@Test
void createReadLock() throws InterruptedException {
Lock lock = lockManager.getReadLock();
Lock lock2 = lockManager.getReadLock();
assertTrue(lock.isActive());
lock.release();
assertFalse(lock.isActive());
assertTrue(lock2.isActive());
lock2.release();
}
@Test
void tryWriteLock() {
Lock lock = lockManager.tryWriteLock();
assertTrue(lock.isActive());
lock.release();
assertFalse(lock.isActive());
}
@Test
void tryReadLock() {
Lock lock = lockManager.tryReadLock();
Lock lock2 = lockManager.tryReadLock();
assertTrue(lock.isActive());
lock.release();
assertFalse(lock.isActive());
assertTrue(lock2.isActive());
lock2.release();
}
@Test
@Timeout(2)
void cleanupUnreleasedLocks() throws InterruptedException {
writeLock(lockManagerReleaseAbandoned);
TestHelper.callGC(lockManagerReleaseAbandoned);
Lock writeLock = lockManagerReleaseAbandoned.getWriteLock();
writeLock.release();
}
@Test
@Timeout(2)
void cleanupUnreleasedReadLocks() throws InterruptedException {
readLock(lockManagerReleaseAbandoned);
TestHelper.callGC(lockManagerReleaseAbandoned);
Lock writeLock = lockManagerReleaseAbandoned.getWriteLock();
writeLock.release();
}
@Test
@Timeout(2)
void cleanupUnreleasedWriteLocksWithStackTrace() throws InterruptedException {
writeLock(lockManagerReleaseAbandonedStackTrace);
TestHelper.callGC(lockManagerReleaseAbandonedStackTrace);
Lock writeLock = lockManagerReleaseAbandonedStackTrace.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("\"_WRITE\" lock abandoned; lock was acquired in main", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedWriteLocksWithStackTrace",
Level.WARN);
}
@Test
@Timeout(2)
void cleanupUnreleasedTryWriteLocksWithStackTrace() throws InterruptedException {
writeLockTry(lockManagerReleaseAbandonedStackTrace);
TestHelper.callGC(lockManagerReleaseAbandonedStackTrace);
Lock writeLock = lockManagerReleaseAbandonedStackTrace.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("\"_WRITE\" lock abandoned; lock was acquired in main", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedTryWriteLocksWithStackTrace",
Level.WARN);
}
@Test
@Timeout(2)
void cleanupUnreleasedWriteLocksWithoutStackTrace() throws InterruptedException {
writeLock(lockManagerReleaseAbandoned);
TestHelper.callGC(lockManagerReleaseAbandoned);
Lock writeLock = lockManagerReleaseAbandoned.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("\"_WRITE\" lock abandoned; consider setting the ", Level.WARN);
memoryAppender.assertNotContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedWriteLocksWithStackTrace",
Level.WARN);
}
@Test
@Timeout(2)
void cleanupUnreleasedWriteLocksWithTracking() throws InterruptedException {
writeLock(lockManagerTracking);
Lock writeLock = lockManagerTracking.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("\"_WRITE\" lock abandoned; lock was acquired in main", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedWriteLocksWithTracking(AbstractReadWriteLockManagerTest.java",
Level.WARN);
}
@Test
@Timeout(2)
void cleanupUnreleasedReadLocksWithTracking() throws InterruptedException {
readLock(lockManagerTracking);
Lock writeLock = lockManagerTracking.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedReadLocksWithTracking",
Level.WARN);
}
@Test
@Timeout(2)
void cleanupUnreleasedTryReadLocksWithTracking() throws InterruptedException {
readLockTry(lockManagerTracking);
Lock writeLock = lockManagerTracking.getWriteLock();
writeLock.release();
memoryAppender.waitForEvents();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.cleanupUnreleasedTryReadLocksWithTracking",
Level.WARN);
}
@Test
@Timeout(2)
void stalledTestReadWrite() throws InterruptedException {
Lock readLock = lockManagerTracking.getReadLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getWriteLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
readLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains(" is waiting on a possibly stalled lock \"_READ\" with id ", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.stalledTestReadWrite(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void stalledTestWriteRead() throws InterruptedException {
Lock writeLock = lockManagerTracking.getWriteLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getReadLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
writeLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains(" is waiting on a possibly stalled lock \"_WRITE\" with id ", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.stalledTestWriteRead(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void stalledTestWriteWrite() throws InterruptedException {
Lock writeLock = lockManagerTracking.getWriteLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getWriteLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
writeLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains(" is waiting on a possibly stalled lock \"_WRITE\" with id ", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.stalledTestWriteWrite(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void deadlockTestReadWrite() throws InterruptedException {
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getReadLock,
lockManagerTracking::getWriteLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
Lock writeLock = lockManagerTracking.getWriteLock();
assertTrue(writeLock.isActive());
writeLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is possibly deadlocked waiting on \"_READ\" with id ", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.TestHelper.lambda$getStartedDaemonThread", Level.WARN);
}
@Test
@Timeout(2)
void deadlockTestWriteRead() throws InterruptedException {
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getWriteLock,
lockManagerTracking::getReadLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
Lock writeLock = lockManagerTracking.getWriteLock();
assertTrue(writeLock.isActive());
writeLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is possibly deadlocked waiting on \"_WRITE\" with id ", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.TestHelper.lambda$getStartedDaemonThread", Level.WARN);
}
@Test
@Timeout(2)
void deadlockTestWriteWrite() throws InterruptedException {
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getWriteLock,
lockManagerTracking::getWriteLock);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
Lock writeLock = lockManagerTracking.getWriteLock();
assertTrue(writeLock.isActive());
writeLock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is possibly deadlocked waiting on \"_WRITE\" with id ", Level.WARN);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.TestHelper.lambda$getStartedDaemonThread", Level.WARN);
}
@Test
@Timeout(2)
void interruptWaitForActiveWriter() throws InterruptedException {
Lock lock = lockManagerTracking.getWriteLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::waitForActiveWriter);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
lock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is waiting on a possibly stalled lock \"_WRITE\" with id", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.interruptWaitForActiveWriter(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void waitForActiveWriter() throws InterruptedException {
Lock lock = lockManagerTracking.getWriteLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::waitForActiveWriter);
memoryAppender.waitForEvents();
lock.release();
} finally {
TestHelper.join(thread);
}
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is waiting on a possibly stalled lock \"_WRITE\" with id", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.waitForActiveWriter(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void interruptWaitForActiveReader() throws InterruptedException {
Lock lock = lockManagerTracking.getReadLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::waitForActiveReaders);
memoryAppender.waitForEvents();
} finally {
TestHelper.interruptAndJoin(thread);
}
lock.release();
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is waiting on a possibly stalled lock \"_READ\" with id", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.interruptWaitForActiveReader(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
@Timeout(2)
void waitForActiveReader() throws InterruptedException {
Lock writeLock = lockManagerTracking.getReadLock();
Thread thread = null;
try {
thread = TestHelper.getStartedDaemonThread(lockManagerTracking::waitForActiveReaders);
memoryAppender.waitForEvents();
writeLock.release();
} finally {
TestHelper.join(thread);
}
assertThat(memoryAppender.countEventsForLogger(className)).isEqualTo(1);
memoryAppender.assertContains("is waiting on a possibly stalled lock \"_READ\" with id", Level.INFO);
memoryAppender.assertContains(
"at org.eclipse.rdf4j.common.concurrent.locks.AbstractReadWriteLockManagerTest.waitForActiveReader(AbstractReadWriteLockManagerTest.java:",
Level.INFO);
}
@Test
void interruptTestWriteRead() throws InterruptedException {
Lock writeLock = lockManager.getWriteLock();
Thread thread1 = TestHelper.getStartedDaemonThread(lockManager::getReadLock);
Thread thread2 = TestHelper.getStartedDaemonThread(lockManager::getReadLock);
Thread thread3 = TestHelper.getStartedDaemonThread(lockManager::getReadLock);
Thread thread4 = TestHelper.getStartedDaemonThread(lockManager::getReadLock);
thread2.interrupt();
thread4.interrupt();
TestHelper.join(thread2);
TestHelper.join(thread4);
assertTrue(writeLock.isActive());
writeLock.release();
assertFalse(writeLock.isActive());
TestHelper.join(thread1);
TestHelper.join(thread3);
}
@Test
void interruptTestWriteWrite() throws InterruptedException {
Lock writeLock = lockManager.getWriteLock();
Thread thread = TestHelper.getStartedDaemonThread(lockManager::getWriteLock);
TestHelper.interruptAndJoin(thread);
assertTrue(writeLock.isActive());
writeLock.release();
assertFalse(writeLock.isActive());
}
@Test
void counter() {
long[] counter = { 0, 0, 0 };
ExecutorService executorService = Executors.newFixedThreadPool(8);
Random random = new Random(475824);
IntStream.range(0, 100)
.mapToObj(i -> {
int randomInt = random.nextInt(3);
return new Runnable() {
@Override
public void run() {
try {
for (int j = 0; j < 1000; j++) {
if (randomInt == 0) {
long counter2 = counter[2];
long counter0 = counter[0];
long counter1 = counter[1];
if (counter2 < counter1) {
throw new IllegalStateException();
}
if (counter1 < counter0) {
throw new IllegalStateException();
}
Lock writeLock = lockManager.getWriteLock();
counter[2]++;
writeLock.release();
} else if (randomInt == 1) {
long counter1 = counter[1];
long counter0 = counter[0];
long counter2 = counter[2];
if (counter2 < counter1) {
throw new IllegalStateException();
}
if (counter1 < counter0) {
throw new IllegalStateException();
}
Lock writeLock = lockManager.getWriteLock();
counter[1] = Math.max(counter[2], counter[1]);
writeLock.release();
} else if (randomInt == 2) {
long counter2 = counter[2];
long counter1 = counter[1];
long counter0 = counter[0];
if (counter2 < counter1) {
throw new IllegalStateException();
}
if (counter1 < counter0) {
throw new IllegalStateException();
}
Lock writeLock = lockManager.getWriteLock();
counter[0] = Math.max(counter[1], counter[0]);
writeLock.release();
}
}
} catch (InterruptedException ignored) {
}
}
};
})
.collect(Collectors.toList())
.stream()
.map(executorService::submit)
.forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
executorService.shutdownNow();
assertEquals(34000, counter[0]);
assertEquals(34000, counter[1]);
assertEquals(35000, counter[2]);
}
@Test
void interruptTestReadWrite() throws InterruptedException {
Lock readLock = lockManagerTracking.getReadLock();
Thread thread = TestHelper.getStartedDaemonThread(lockManagerTracking::getWriteLock);
TestHelper.interruptAndJoin(thread);
assertTrue(readLock.isActive());
readLock.release();
assertFalse(readLock.isActive());
}
private void writeLock(AbstractReadWriteLockManager lockManager) throws InterruptedException {
Lock lock = lockManager.getWriteLock();
assertTrue(lock.isActive());
}
private void readLock(AbstractReadWriteLockManager lockManager) throws InterruptedException {
Lock lock = lockManager.getReadLock();
assertTrue(lock.isActive());
}
private void writeLockTry(AbstractReadWriteLockManager lockManager) {
Lock lock = lockManager.tryWriteLock();
assertNotNull(lock);
assertTrue(lock.isActive());
}
private void readLockTry(AbstractReadWriteLockManager lockManager) {
Lock lock = lockManager.tryReadLock();
assertNotNull(lock);
assertTrue(lock.isActive());
}
}