AbstractReadWriteLockManagerTestIT.java
/*******************************************************************************
* Copyright (c) 2022 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.common.concurrent.locks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
@Tag("slow")
abstract class AbstractReadWriteLockManagerTestIT {
AbstractReadWriteLockManager lockManager;
AbstractReadWriteLockManager lockManagerReleaseAbandoned;
AbstractReadWriteLockManager lockManagerReleaseAbandonedStackTrace;
AbstractReadWriteLockManager lockManagerTracking;
private MemoryAppender memoryAppender;
private String className;
@BeforeEach
void beforeEach() {
if (memoryAppender != null) {
memoryAppender.stop();
}
Stream.of(lockManager, lockManagerReleaseAbandoned, lockManagerReleaseAbandonedStackTrace, lockManagerTracking)
.filter(Objects::nonNull)
.forEach(l -> {
assertFalse(l.isWriterActive());
assertFalse(l.isReaderActive());
});
lockManager = null;
lockManagerReleaseAbandoned = null;
lockManagerReleaseAbandonedStackTrace = null;
lockManagerTracking = null;
setUpLockManagers();
assertNotNull(lockManager);
assertNotNull(lockManagerReleaseAbandoned);
assertNotNull(lockManagerReleaseAbandonedStackTrace);
assertNotNull(lockManagerTracking);
assertEquals(lockManager.getClass(), lockManagerReleaseAbandoned.getClass());
assertEquals(lockManagerReleaseAbandoned.getClass(), lockManagerReleaseAbandonedStackTrace.getClass());
assertEquals(lockManagerReleaseAbandonedStackTrace.getClass(), lockManagerTracking.getClass());
className = lockManager.getClass().getName();
Logger logger = (Logger) LoggerFactory.getLogger(lockManager.getClass());
memoryAppender = new MemoryAppender();
memoryAppender.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
logger.detachAndStopAllAppenders();
logger.setLevel(Level.INFO);
logger.addAppender(memoryAppender);
memoryAppender.start();
}
abstract void setUpLockManagers();
@Test
@Timeout(30)
void testMultipleReadLocksSameThread() {
CountDownLatch acquireReadLockFirst = new CountDownLatch(1);
Stream<Runnable> runnableStream = Stream.of(() -> {
try {
Lock readLock = lockManager.getReadLock();
acquireReadLockFirst.countDown();
// try to force the other thread to begin acquiring a write lock before we continue
Thread.yield();
Thread.sleep(100);
Lock readLock2 = lockManager.getReadLock();
readLock.release();
readLock2.release();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}, () -> {
try {
acquireReadLockFirst.await();
Lock writeLock = lockManager.getWriteLock();
writeLock.release();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
);
runAsThreads(runnableStream);
assertFalse(lockManager.isReaderActive());
assertFalse(lockManager.isWriterActive());
}
private void runAsThreads(Stream<Runnable> runnableStream) {
runnableStream.parallel().forEach(Runnable::run);
}
@Test
@Timeout(30)
void writeLockShouldSucceed() throws InterruptedException {
Runnable runnable = () -> {
while (true) {
Lock readLock = null;
try {
readLock = lockManager.getReadLock();
Thread.yield();
if (Thread.interrupted()) {
break;
}
} catch (InterruptedException e) {
break;
} finally {
if (readLock != null) {
readLock.release();
}
}
}
};
List<Thread> threads = Arrays.asList(
new Thread(runnable),
new Thread(runnable),
new Thread(runnable),
new Thread(runnable)
);
try {
threads.forEach(thread -> {
thread.setDaemon(true);
thread.start();
});
do {
Thread.sleep(10);
} while (!lockManager.isReaderActive());
Lock writeLock = lockManager.getWriteLock();
writeLock.release();
} finally {
for (Thread thread : threads) {
TestHelper.interruptAndJoin(thread);
}
}
}
}