TestThrottledAsyncCheckerTimeout.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
public class TestThrottledAsyncCheckerTimeout {
public static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestThrottledAsyncCheckerTimeout.class);
@Rule
public TestName testName = new TestName();
@Rule
public Timeout testTimeout = new Timeout(300_000);
private static final long DISK_CHECK_TIMEOUT = 10;
private ReentrantLock lock;
private ExecutorService getExecutorService() {
return new ScheduledThreadPoolExecutor(1);
}
@Before
public void initializeLock() {
lock = new ReentrantLock();
}
@Test
public void testDiskCheckTimeout() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
// Acquire lock to halt checker. Release after timeout occurs.
lock.lock();
final Optional<ListenableFuture<Boolean>> olf = checker
.schedule(target, true);
final AtomicLong numCallbackInvocationsSuccess = new AtomicLong(0);
final AtomicLong numCallbackInvocationsFailure = new AtomicLong(0);
AtomicBoolean callbackResult = new AtomicBoolean(false);
final Throwable[] throwable = new Throwable[1];
assertTrue(olf.isPresent());
Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
numCallbackInvocationsSuccess.incrementAndGet();
callbackResult.set(true);
}
@Override
public void onFailure(Throwable t) {
throwable[0] = t;
numCallbackInvocationsFailure.incrementAndGet();
callbackResult.set(true);
}
}, MoreExecutors.directExecutor());
while (!callbackResult.get()) {
// Wait for the callback
Thread.sleep(DISK_CHECK_TIMEOUT);
}
lock.unlock();
assertThat(numCallbackInvocationsFailure.get(), is(1L));
assertThat(numCallbackInvocationsSuccess.get(), is(0L));
assertTrue(throwable[0] instanceof TimeoutException);
}
@Test
public void testDiskCheckTimeoutInvokesOneCallbackOnly() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
FutureCallback<Boolean> futureCallback = mock(FutureCallback.class);
// Acquire lock to halt disk checker. Release after timeout occurs.
lock.lock();
final Optional<ListenableFuture<Boolean>> olf1 = checker
.schedule(target, true);
assertTrue(olf1.isPresent());
Futures.addCallback(olf1.get(), futureCallback,
MoreExecutors.directExecutor());
// Verify that timeout results in only 1 onFailure call and 0 onSuccess
// calls.
verify(futureCallback, timeout((int) DISK_CHECK_TIMEOUT*10).times(1))
.onFailure(any());
verify(futureCallback, timeout((int) DISK_CHECK_TIMEOUT*10).times(0))
.onSuccess(any());
// Release lock so that target can acquire it.
lock.unlock();
final Optional<ListenableFuture<Boolean>> olf2 = checker
.schedule(target, true);
assertTrue(olf2.isPresent());
Futures.addCallback(olf2.get(), futureCallback,
MoreExecutors.directExecutor());
// Verify that normal check (dummy) results in only 1 onSuccess call.
// Number of times onFailure is invoked should remain the same i.e. 1.
verify(futureCallback, timeout((int) DISK_CHECK_TIMEOUT*10).times(1))
.onFailure(any());
verify(futureCallback, timeout((int) DISK_CHECK_TIMEOUT*10).times(1))
.onSuccess(any());
}
@Test
public void testTimeoutExceptionIsNotThrownForGoodDisk() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
final Optional<ListenableFuture<Boolean>> olf = checker
.schedule(target, true);
AtomicBoolean callbackResult = new AtomicBoolean(false);
final Throwable[] throwable = new Throwable[1];
assertTrue(olf.isPresent());
Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
callbackResult.set(true);
}
@Override
public void onFailure(Throwable t) {
throwable[0] = t;
callbackResult.set(true);
}
}, MoreExecutors.directExecutor());
while (!callbackResult.get()) {
// Wait for the callback
Thread.sleep(DISK_CHECK_TIMEOUT);
}
assertTrue(throwable[0] == null);
}
/**
* A dummy Checkable that just returns true after acquiring lock.
*/
protected class DummyCheckable implements Checkable<Boolean,Boolean> {
@Override
public Boolean check(Boolean context) throws Exception {
// Wait to acquire lock
lock.lock();
lock.unlock();
return true;
}
}
}