ValueStoreWALPurgeWakesProducersTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
/**
* Verifies that purging the WAL wakes any producers blocked on a full queue.
*
* <p>
* Reproduces a deadlock that occurs when {@link java.util.concurrent.ArrayBlockingQueue#clear()} is used during purge:
* it removes elements without signalling {@code notFull}, leaving producers blocked in {@code put()} even though the
* queue is now empty.
*/
class ValueStoreWALPurgeWakesProducersTest {
@TempDir
Path tempDir;
@Test
void purgeWakesBlockedProducer() throws Exception {
Path walDir = tempDir.resolve("wal-purge-wakeup");
Files.createDirectories(walDir);
ValueStoreWalConfig cfg = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.queueCapacity(1) // make saturation easy and deterministic
.build();
try (ValueStoreWAL wal = ValueStoreWAL.open(cfg)) {
// Stop the writer thread to avoid it draining the queue during this focused concurrency check
Object logWriter = getField(wal, "logWriter");
Method shutdown = logWriter.getClass().getDeclaredMethod("shutdown");
shutdown.setAccessible(true);
shutdown.invoke(logWriter);
Thread writerThread = (Thread) getField(wal, "writerThread");
writerThread.join(TimeUnit.SECONDS.toMillis(5));
assertThat(!writerThread.isAlive()).as("writer thread should be stopped for this test").isTrue();
// Swap in a test queue with explicit "clear does not signal notFull" semantics to make the behavior
// deterministic across JDK versions.
TestBlockingQueue testQueue = new TestBlockingQueue(1);
setField(wal, "queue", testQueue);
// Fill the test queue to capacity so the next mint attempt will block in put()
boolean offered = testQueue.offer(new ValueStoreWalRecord(1L, 1, ValueStoreWalValueKind.LITERAL,
"pre-fill", "dt", "", 0));
assertThat(offered).isTrue();
AtomicBoolean producerFinished = new AtomicBoolean(false);
Thread producer = new Thread(() -> {
try {
wal.logMint(2, ValueStoreWalValueKind.LITERAL, "after-purge", "dt", "", 0);
producerFinished.set(true);
} catch (Exception e) {
// mark as finished to avoid hanging the test in case of interruption on put()
producerFinished.set(true);
}
}, "blocked-producer");
producer.start();
// Small delay to ensure the producer is actually blocked on put()
Thread.sleep(50);
assertThat(producer.isAlive()).as("producer should be blocked on a full queue").isTrue();
// Perform the purge using the internal method to model the writer's purge path without races
Method performPurge = logWriter.getClass().getDeclaredMethod("performPurgeInternal");
performPurge.setAccessible(true);
performPurge.invoke(logWriter);
// Expectation: purge must wake the blocked producer promptly
producer.join(TimeUnit.SECONDS.toMillis(1));
boolean finishedNaturally = !producer.isAlive();
try {
assertThat(finishedNaturally)
.as("producer should have completed without external interruption after purge")
.isTrue();
assertThat(producerFinished.get())
.as("purge must wake producers blocked in queue.put()")
.isTrue();
} finally {
if (!finishedNaturally) {
// ensure no stray thread if assertion failed
producer.interrupt();
}
}
}
}
private static Object getField(Object target, String name) throws Exception {
Field f = target.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.get(target);
}
private static void setField(Object target, String name, Object value) throws Exception {
Field f = target.getClass().getDeclaredField(name);
f.setAccessible(true);
f.set(target, value);
}
/**
* Minimal blocking queue with a fixed capacity whose clear() deliberately does not signal notFull, to reproduce the
* deadlock scenario independent of the JDK's ArrayBlockingQueue implementation.
*/
private static final class TestBlockingQueue implements BlockingQueue<ValueStoreWalRecord> {
private final java.util.ArrayDeque<ValueStoreWalRecord> deque = new java.util.ArrayDeque<>();
private final int capacity;
private final java.util.concurrent.locks.ReentrantLock lock = new java.util.concurrent.locks.ReentrantLock();
private final java.util.concurrent.locks.Condition notEmpty = lock.newCondition();
private final java.util.concurrent.locks.Condition notFull = lock.newCondition();
TestBlockingQueue(int capacity) {
this.capacity = capacity;
}
@Override
public boolean offer(ValueStoreWalRecord e) {
lock.lock();
try {
if (deque.size() >= capacity)
return false;
deque.addLast(e);
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
@Override
public void put(ValueStoreWalRecord e) throws InterruptedException {
lock.lock();
try {
while (deque.size() >= capacity) {
notFull.await();
}
deque.addLast(e);
notEmpty.signal();
} finally {
lock.unlock();
}
}
@Override
public ValueStoreWalRecord poll(long timeout, java.util.concurrent.TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (deque.isEmpty()) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
ValueStoreWalRecord v = deque.removeFirst();
notFull.signal();
return v;
} finally {
lock.unlock();
}
}
@Override
public void clear() {
lock.lock();
try {
deque.clear();
// intentionally do NOT signal notFull here
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
lock.lock();
try {
return deque.isEmpty();
} finally {
lock.unlock();
}
}
@Override
public int remainingCapacity() {
lock.lock();
try {
return capacity - deque.size();
} finally {
lock.unlock();
}
}
// --- Methods below are unused in this test and implemented minimally or throw UnsupportedOperationException
// ---
@Override
public ValueStoreWalRecord take() {
throw new UnsupportedOperationException();
}
@Override
public ValueStoreWalRecord poll() {
lock.lock();
try {
if (deque.isEmpty()) {
return null;
}
ValueStoreWalRecord v = deque.removeFirst();
notFull.signal();
return v;
} finally {
lock.unlock();
}
}
@Override
public ValueStoreWalRecord remove() {
throw new UnsupportedOperationException();
}
@Override
public ValueStoreWalRecord element() {
throw new UnsupportedOperationException();
}
@Override
public ValueStoreWalRecord peek() {
return null;
}
@Override
public boolean add(ValueStoreWalRecord e) {
return offer(e);
}
@Override
public boolean offer(ValueStoreWalRecord e, long timeout, java.util.concurrent.TimeUnit unit) {
return offer(e);
}
@Override
public int drainTo(java.util.Collection<? super ValueStoreWalRecord> c) {
throw new UnsupportedOperationException();
}
@Override
public int drainTo(java.util.Collection<? super ValueStoreWalRecord> c, int maxElements) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException();
}
@Override
public int size() {
throw new UnsupportedOperationException();
}
@Override
public java.util.Iterator<ValueStoreWalRecord> iterator() {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(java.util.Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(java.util.Collection<? extends ValueStoreWalRecord> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(java.util.Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(java.util.Collection<?> c) {
throw new UnsupportedOperationException();
}
}
}