TestObserverReadProxyProvider.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.StopWatch;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.event.Level;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link ObserverReadProxyProvider} under various configurations of
* NameNode states. Mainly testing that the proxy provider picks the correct
* NameNode to communicate with.
*/
public class TestObserverReadProxyProvider {
private final static long SLOW_RESPONSE_SLEEP_TIME = TimeUnit.SECONDS.toMillis(5); // 5 s
private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT = TimeUnit.SECONDS.toMillis(2);
private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG = TimeUnit.SECONDS.toMillis(25);
private final GenericTestUtils.LogCapturer proxyLog =
GenericTestUtils.LogCapturer.captureLogs(ObserverReadProxyProvider.LOG);
private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
private String ns;
private URI nnURI;
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
private NameNodeAnswer[] namenodeAnswers;
private String[] namenodeAddrs;
@BeforeClass
public static void setLogLevel() {
GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG);
}
@Before
public void setup() throws Exception {
ns = "testcluster";
nnURI = URI.create("hdfs://" + ns);
}
private void setupProxyProvider(int namenodeCount) throws Exception {
setupProxyProvider(namenodeCount, new Configuration());
}
private void setupProxyProvider(int namenodeCount, long nnHAStateProbeTimeout) throws Exception {
Configuration conf = new Configuration();
conf.setLong(NAMENODE_HA_STATE_PROBE_TIMEOUT, nnHAStateProbeTimeout);
setupProxyProvider(namenodeCount, conf);
}
private void setupProxyProvider(int namenodeCount, Configuration conf) throws Exception {
String[] namenodeIDs = new String[namenodeCount];
namenodeAddrs = new String[namenodeCount];
namenodeAnswers = new NameNodeAnswer[namenodeCount];
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
Map<String, ClientProtocol> proxyMap = new HashMap<>();
for (int i = 0; i < namenodeCount; i++) {
namenodeIDs[i] = "nn" + i;
namenodeAddrs[i] = "namenode" + i + ".test:8020";
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
"." + namenodeIDs[i], namenodeAddrs[i]);
namenodeAnswers[i] = new NameNodeAnswer();
proxies[i] = mock(ClientProtocol.class);
doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
.when(proxies[i]));
doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
.when(proxies[i]));
Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
.when(proxies[i]).getHAServiceState();
proxyMap.put(namenodeAddrs[i], proxies[i]);
}
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
Joiner.on(",").join(namenodeIDs));
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
// Set observer probe retry period to 0. Required by the tests that
// transition observer back and forth
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false);
proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
ClientProtocol.class,
new ClientHAProxyFactory<ClientProtocol>() {
@Override
public ClientProtocol createProxy(Configuration config,
InetSocketAddress nnAddr, Class<ClientProtocol> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) {
return proxyMap.get(nnAddr.toString());
}
}) {
@Override
protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
URI uri, String addressKey) {
List<NNProxyInfo<ClientProtocol>> nnProxies =
super.getProxyAddresses(uri, addressKey);
return nnProxies;
}
};
proxyProvider.setObserverReadEnabled(true);
}
@Test
public void testWithNonClientProxy() throws Exception {
setupProxyProvider(2); // This will initialize all of the instance fields
final String fakeUser = "fakeUser";
final String[] fakeGroups = {"fakeGroup"};
HAProxyFactory<GetUserMappingsProtocol> proxyFactory =
new NameNodeHAProxyFactory<GetUserMappingsProtocol>() {
@Override
public GetUserMappingsProtocol createProxy(Configuration config,
InetSocketAddress addr, Class<GetUserMappingsProtocol> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
GetUserMappingsProtocol proxy =
mock(GetUserMappingsProtocol.class);
when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups);
return proxy;
}
};
ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider =
new ObserverReadProxyProvider<>(proxyProvider.conf, nnURI,
GetUserMappingsProtocol.class, proxyFactory);
assertArrayEquals(fakeGroups,
userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser));
}
@Test
public void testReadOperationOnObserver() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[2].setObserverState();
doRead();
assertHandledBy(2);
}
@Test
public void testWriteOperationOnActive() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[2].setObserverState();
doWrite();
assertHandledBy(0);
}
@Test
public void testUnreachableObserverWithNoBackup() throws Exception {
setupProxyProvider(2);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[1].setUnreachable(true);
// Confirm that read still succeeds even though observer is not available
doRead();
assertHandledBy(0);
}
@Test
public void testUnreachableObserverWithMultiple() throws Exception {
setupProxyProvider(4);
namenodeAnswers[0].setActiveState();
namenodeAnswers[2].setObserverState();
namenodeAnswers[3].setObserverState();
doRead();
assertHandledBy(2);
namenodeAnswers[2].setUnreachable(true);
doRead();
// Fall back to the second observer node
assertHandledBy(3);
namenodeAnswers[2].setUnreachable(false);
doRead();
// Current index has changed, so although the first observer is back,
// it should continue requesting from the second observer
assertHandledBy(3);
namenodeAnswers[3].setUnreachable(true);
doRead();
// Now that second is unavailable, go back to using the first observer
assertHandledBy(2);
namenodeAnswers[2].setUnreachable(true);
doRead();
// Both observers are now unavailable, so it should fall back to active
assertHandledBy(0);
}
@Test
public void testObserverToActive() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[2].setObserverState();
doWrite();
assertHandledBy(0);
// Transition an observer to active
namenodeAnswers[0].setStandbyState();
namenodeAnswers[1].setActiveState();
try {
doWrite();
fail("Write should fail; failover required");
} catch (RemoteException re) {
assertEquals(re.getClassName(),
StandbyException.class.getCanonicalName());
}
proxyProvider.performFailover(proxyProvider.getProxy().proxy);
doWrite();
// After failover, previous observer is now active
assertHandledBy(1);
doRead();
assertHandledBy(2);
// Transition back to original state but second observer not available
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[2].setUnreachable(true);
for (int i = 0; i < 2; i++) {
try {
doWrite();
fail("Should have failed");
} catch (IOException ioe) {
proxyProvider.performFailover(proxyProvider.getProxy().proxy);
}
}
doWrite();
assertHandledBy(0);
doRead();
assertHandledBy(1);
}
@Test
public void testObserverToStandby() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[2].setObserverState();
doRead();
assertHandledBy(1);
namenodeAnswers[1].setStandbyState();
doRead();
assertHandledBy(2);
namenodeAnswers[2].setStandbyState();
doRead();
assertHandledBy(0);
namenodeAnswers[1].setObserverState();
doRead();
assertHandledBy(1);
}
@Test
public void testSingleObserverToStandby() throws Exception {
setupProxyProvider(2);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
doRead();
assertHandledBy(1);
namenodeAnswers[1].setStandbyState();
doRead();
assertHandledBy(0);
namenodeAnswers[1].setObserverState();
// The proxy provider still thinks the second NN is in observer state,
// so it will take a second call for it to notice the new observer
doRead();
doRead();
assertHandledBy(1);
}
@Test
public void testObserverRetriableException() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[2].setObserverState();
// Set the first observer to throw "ObserverRetryOnActiveException" so that
// the request should skip the second observer and be served by the active.
namenodeAnswers[1].setRetryActive(true);
doRead();
assertHandledBy(0);
namenodeAnswers[1].setRetryActive(false);
doRead();
assertHandledBy(1);
}
/**
* Happy case for GetHAServiceStateWithTimeout.
*/
@Test
public void testGetHAServiceStateWithTimeout() throws Exception {
proxyLog.clearOutput();
setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
final HAServiceState state = HAServiceState.STANDBY;
@SuppressWarnings("unchecked")
NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
(NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
@SuppressWarnings("unchecked")
Future<HAServiceState> task = mock(Future.class);
when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state);
HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
assertEquals(state, state2);
verify(task).get(anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(task);
assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
"HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state));
proxyLog.clearOutput();
}
/**
* Test TimeoutException for GetHAServiceStateWithTimeout.
*/
@Test
public void testTimeoutExceptionGetHAServiceStateWithTimeout() throws Exception {
proxyLog.clearOutput();
setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
@SuppressWarnings("unchecked")
NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
(NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
@SuppressWarnings("unchecked")
Future<HAServiceState> task = mock(Future.class);
TimeoutException e = new TimeoutException("Timeout");
when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
assertNull(state);
verify(task).get(anyLong(), any(TimeUnit.class));
verify(task).cancel(true);
verifyNoMoreInteractions(task);
assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
"Cancel NN probe task due to timeout for " + dummyNNProxyInfo.proxyInfo));
proxyLog.clearOutput();
}
/**
* Test InterruptedException for GetHAServiceStateWithTimeout.
*/
@Test
public void testInterruptedExceptionGetHAServiceStateWithTimeout() throws Exception {
proxyLog.clearOutput();
setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
@SuppressWarnings("unchecked")
NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
(NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
@SuppressWarnings("unchecked")
Future<HAServiceState> task = mock(Future.class);
InterruptedException e = new InterruptedException("Interrupted");
when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
assertNull(state);
verify(task).get(anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(task);
assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
"Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo));
proxyLog.clearOutput();
}
/**
* Test ExecutionException for GetHAServiceStateWithTimeout.
*/
@Test
public void testExecutionExceptionGetHAServiceStateWithTimeout() throws Exception {
proxyLog.clearOutput();
setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
@SuppressWarnings("unchecked")
NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
(NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
@SuppressWarnings("unchecked")
Future<HAServiceState> task = mock(Future.class);
Exception e = new ExecutionException(new InterruptedException("Interrupted"));
when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
assertNull(state);
verify(task).get(anyLong(), any(TimeUnit.class));
verifyNoMoreInteractions(task);
assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
"Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo));
proxyLog.clearOutput();
}
/**
* Test GetHAServiceState when timeout is disabled (test the else { task.get() } code path).
*/
@Test
public void testGetHAServiceStateWithoutTimeout() throws Exception {
proxyLog.clearOutput();
setupProxyProvider(1, 0);
final HAServiceState state = HAServiceState.STANDBY;
@SuppressWarnings("unchecked")
NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
(NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
@SuppressWarnings("unchecked")
Future<HAServiceState> task = mock(Future.class);
when(task.get()).thenReturn(state);
HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
assertEquals(state, state2);
verify(task).get();
verifyNoMoreInteractions(task);
assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
"HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state));
proxyLog.clearOutput();
}
/**
* Test getHAServiceState when we have a slow NN, using a 25s timeout.
* This is to verify the old behavior without being able to fast-fail (we can also set
* namenodeHAStateProbeTimeoutMs to 0 or a negative value and the rest of the test can stay
* the same).
*
* 5-second (SLOW_RESPONSE_SLEEP_TIME) latency is introduced and we expect that latency is added
* to the READ operation.
*/
@Test
public void testStandbyGetHAServiceStateLongTimeout() throws Exception {
setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setSlowNode(true);
namenodeAnswers[3].setObserverState();
StopWatch watch = new StopWatch();
watch.start();
doRead();
long runtime = watch.now(TimeUnit.MILLISECONDS);
assertTrue("Read operation finished earlier than we expected",
runtime > SLOW_RESPONSE_SLEEP_TIME);
}
/**
* Test getHAServiceState using a 2s timeout with a slow standby.
* Fail the test if we don't complete it in 4s.
*/
@Test(timeout = 4000)
public void testStandbyGetHAServiceStateTimeout() throws Exception {
setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setSlowNode(true);
namenodeAnswers[3].setObserverState();
doRead();
}
private void doRead() throws Exception {
doRead(proxyProvider.getProxy().proxy);
}
private void doWrite() throws Exception {
doWrite(proxyProvider.getProxy().proxy);
}
private void assertHandledBy(int namenodeIdx) {
assertEquals(namenodeAddrs[namenodeIdx],
proxyProvider.getLastProxy().proxyInfo);
}
private static void doWrite(ClientProtocol client) throws Exception {
client.reportBadBlocks(EMPTY_BLOCKS);
}
private static void doRead(ClientProtocol client) throws Exception {
client.checkAccess("/", FsAction.READ);
}
/**
* An {@link Answer} used for mocking of {@link ClientProtocol}.
* Setting the state or unreachability of this
* Answer will make the linked ClientProtocol respond as if it was
* communicating with a NameNode of the corresponding state. It is in Standby
* state by default.
*/
private static class NameNodeAnswer {
private volatile boolean unreachable = false;
private volatile boolean retryActive = false;
private volatile boolean slowNode = false;
// Standby state by default
private volatile boolean allowWrites = false;
private volatile boolean allowReads = false;
private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
private class ClientProtocolAnswer implements Answer<Object> {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
if (unreachable) {
throw new IOException("Unavailable");
}
// sleep to simulate slow rpc responses.
if (slowNode) {
Thread.sleep(SLOW_RESPONSE_SLEEP_TIME);
}
// retryActive should be checked before getHAServiceState.
// Check getHAServiceState first here only because in test,
// it relies read call, which relies on getHAServiceState
// to have passed already. May revisit future.
if (invocationOnMock.getMethod()
.getName().equals("getHAServiceState")) {
HAServiceState status;
if (allowReads && allowWrites) {
status = HAServiceState.ACTIVE;
} else if (allowReads) {
status = HAServiceState.OBSERVER;
} else {
status = HAServiceState.STANDBY;
}
return status;
}
if (retryActive) {
throw new RemoteException(
ObserverRetryOnActiveException.class.getCanonicalName(),
"Try active!"
);
}
switch (invocationOnMock.getMethod().getName()) {
case "reportBadBlocks":
if (!allowWrites) {
throw new RemoteException(
StandbyException.class.getCanonicalName(), "No writes!");
}
return null;
case "checkAccess":
if (!allowReads) {
throw new RemoteException(
StandbyException.class.getCanonicalName(), "No reads!");
}
return null;
default:
throw new IllegalArgumentException(
"Only reportBadBlocks and checkAccess supported!");
}
}
}
void setUnreachable(boolean unreachable) {
this.unreachable = unreachable;
}
// Whether this node should be slow in rpc response.
void setSlowNode(boolean slowNode) {
this.slowNode = slowNode;
}
void setActiveState() {
allowReads = true;
allowWrites = true;
}
void setStandbyState() {
allowReads = false;
allowWrites = false;
}
void setObserverState() {
allowReads = true;
allowWrites = false;
}
void setRetryActive(boolean shouldRetryActive) {
retryActive = shouldRetryActive;
}
}
}