TestInstrumentedLock.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
/**
* A test class for InstrumentedLock.
*/
public class TestInstrumentedLock {
static final Logger LOG = LoggerFactory.getLogger(TestInstrumentedLock.class);
@Rule public TestName name = new TestName();
/**
* Test exclusive access of the lock.
* @throws Exception
*/
@Test(timeout=10000)
public void testMultipleThread() throws Exception {
String testname = name.getMethodName();
InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
lock.lock();
try {
Thread competingThread = new Thread() {
@Override
public void run() {
assertFalse(lock.tryLock());
}
};
competingThread.start();
competingThread.join();
} finally {
lock.unlock();
}
}
/**
* Test the correctness with try-with-resource syntax.
* @throws Exception
*/
@Test(timeout=10000)
public void testTryWithResourceSyntax() throws Exception {
String testname = name.getMethodName();
final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
@Override
public void lock() {
super.lock();
lockThread.set(Thread.currentThread());
}
@Override
public void unlock() {
super.unlock();
lockThread.set(null);
}
};
AutoCloseableLock acl = new AutoCloseableLock(lock);
try (AutoCloseable localLock = acl.acquire()) {
assertEquals(acl, localLock);
Thread competingThread = new Thread() {
@Override
public void run() {
assertNotEquals(Thread.currentThread(), lockThread.get());
assertFalse(lock.tryLock());
}
};
competingThread.start();
competingThread.join();
assertEquals(Thread.currentThread(), lockThread.get());
}
assertNull(lockThread.get());
}
/**
* Test the lock logs warning when lock held time is greater than threshold
* and not log warning otherwise.
* @throws Exception
*/
@Test(timeout=10000)
public void testLockLongHoldingReport() throws Exception {
String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
Lock mlock = mock(Lock.class);
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
final AtomicLong wMaxWait = new AtomicLong(0);
InstrumentedLock lock = new InstrumentedLock(
testname, LOG, mlock, 2000, 300, mclock) {
@Override
void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
wlogged.incrementAndGet();
wsuppresed.set(stats.getSuppressedCount());
wMaxWait.set(stats.getMaxSuppressedWait());
}
};
// do not log warning when the lock held time is short
lock.lock(); // t = 0
time.set(200);
lock.unlock(); // t = 200
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
lock.lock(); // t = 200
time.set(700);
lock.unlock(); // t = 700
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
// despite the lock held time is greater than threshold
// suppress the log warning due to the logging gap
// (not recorded in wsuppressed until next log message)
lock.lock(); // t = 700
time.set(1100);
lock.unlock(); // t = 1100
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
// log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed
// previous warnings.
time.set(2400);
lock.lock(); // t = 2400
time.set(2800);
lock.unlock(); // t = 2800
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
assertEquals(400, wMaxWait.get());
}
/**
* Test the lock logs warning when lock wait / queue time is greater than
* threshold and not log warning otherwise.
* @throws Exception
*/
@Test(timeout=10000)
public void testLockLongWaitReport() throws Exception {
String testname = name.getMethodName();
final AtomicLong time = new AtomicLong(0);
Timer mclock = new Timer() {
@Override
public long monotonicNow() {
return time.get();
}
};
Lock mlock = new ReentrantLock(true); //mock(Lock.class);
final AtomicLong wlogged = new AtomicLong(0);
final AtomicLong wsuppresed = new AtomicLong(0);
final AtomicLong wMaxWait = new AtomicLong(0);
InstrumentedLock lock = new InstrumentedLock(
testname, LOG, mlock, 2000, 300, mclock) {
@Override
void logWaitWarning(long lockHeldTime, SuppressedSnapshot stats) {
wlogged.incrementAndGet();
wsuppresed.set(stats.getSuppressedCount());
wMaxWait.set(stats.getMaxSuppressedWait());
}
};
// do not log warning when the lock held time is short
lock.lock(); // t = 0
Thread competingThread = lockUnlockThread(lock);
time.set(200);
lock.unlock(); // t = 200
competingThread.join();
assertEquals(0, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
lock.lock(); // t = 200
competingThread = lockUnlockThread(lock);
time.set(700);
lock.unlock(); // t = 700
competingThread.join();
// The competing thread will have waited for 500ms, so it should log
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
// despite the lock wait time is greater than threshold
// suppress the log warning due to the logging gap
// (not recorded in wsuppressed until next log message)
lock.lock(); // t = 700
competingThread = lockUnlockThread(lock);
time.set(1100);
lock.unlock(); // t = 1100
competingThread.join();
assertEquals(1, wlogged.get());
assertEquals(0, wsuppresed.get());
assertEquals(0, wMaxWait.get());
// log a warning message when the lock held time is greater the threshold
// and the logging time gap is satisfied. Also should display suppressed
// previous warnings.
time.set(2400);
lock.lock(); // t = 2400
competingThread = lockUnlockThread(lock);
time.set(2800);
lock.unlock(); // t = 2800
competingThread.join();
assertEquals(2, wlogged.get());
assertEquals(1, wsuppresed.get());
assertEquals(400, wMaxWait.get());
}
private Thread lockUnlockThread(Lock lock) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t = new Thread(() -> {
try {
assertFalse(lock.tryLock());
countDownLatch.countDown();
lock.lock();
} finally {
lock.unlock();
}
});
t.start();
countDownLatch.await();
// Even with the countdown latch, the main thread releases the lock
// before this thread actually starts waiting on it, so introducing a
// short sleep so the competing thread can block on the lock as intended.
Thread.sleep(3);
return t;
}
}