TestNoNamenodesAvailableLongTime.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.fs.permission.AclEntryType.USER;
import static org.apache.hadoop.fs.permission.FsAction.ALL;
import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Lists;
import org.junit.After;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
* When failover occurs, the router may record that the ns has no active namenode
* even if there is actually an active namenode.
* Only when the router updates the cache next time can the memory status be updated,
* causing the router to report NoNamenodesAvailableException for a long time,
*
* @see org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException
*/
public class TestNoNamenodesAvailableLongTime {
// router load cache interval 10s
private static final long CACHE_FLUSH_INTERVAL_MS = 10000;
private StateStoreDFSCluster cluster;
private FileSystem fileSystem;
private RouterContext routerContext;
private FederationRPCMetrics rpcMetrics;
@After
public void cleanup() throws IOException {
rpcMetrics = null;
routerContext = null;
if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Set up state store cluster.
*
* @param numNameservices number of name services
* @param numberOfObserver number of observer
* @param useObserver whether to use observer
*/
private void setupCluster(int numNameservices, int numberOfObserver, boolean useObserver)
throws Exception {
if (!useObserver) {
numberOfObserver = 0;
}
int numberOfNamenode = 2 + numberOfObserver;
cluster = new StateStoreDFSCluster(true, numNameservices, numberOfNamenode,
DEFAULT_HEARTBEAT_INTERVAL_MS, CACHE_FLUSH_INTERVAL_MS);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.metrics()
.admin()
.rpc()
.heartbeat()
.build();
// Set router observer related configs
if (useObserver) {
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
}
// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
// No need for datanodes
cluster.setNumDatanodesPerNameservice(0);
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
// Making one Namenode active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
List<MiniRouterDFSCluster.NamenodeContext> nnList = cluster.getNamenodes(ns);
cluster.switchToActive(ns, nnList.get(0).getNamenodeId());
cluster.switchToStandby(ns, nnList.get(1).getNamenodeId());
for (int i = 2; i < numberOfNamenode; i++) {
cluster.switchToObserver(ns, nnList.get(i).getNamenodeId());
}
}
}
cluster.startRouters();
cluster.waitClusterUp();
}
/**
* Initialize the test environment and start the cluster so that
* there is no active namenode record in the router cache,
* but the second non-observer namenode in the router cache is actually active.
*/
private void initEnv(int numberOfObserver, boolean useObserver) throws Exception {
setupCluster(1, numberOfObserver, useObserver);
// Transition all namenodes in the cluster are standby.
transitionActiveToStandby();
//
allRoutersHeartbeat();
allRoutersLoadCache();
List<MiniRouterDFSCluster.NamenodeContext> namenodes = cluster.getNamenodes();
// Make sure all namenodes are in standby state
for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
assertNotEquals(ACTIVE.ordinal(), namenodeContext.getNamenode().getNameNodeState());
}
routerContext = cluster.getRandomRouter();
// Get the second namenode in the router cache and make it active
setSecondNonObserverNamenodeInTheRouterCacheActive(numberOfObserver, false);
allRoutersHeartbeat();
// Get router metrics
rpcMetrics = routerContext.getRouter().getRpcServer().getRPCMetrics();
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", useObserver));
// Retries is 2 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail
// when reties > max.attempts), so total access is 3.
routerContext.getConf().setInt("dfs.client.retry.max.attempts", 1);
if (useObserver) {
fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider();
} else {
fileSystem = routerContext.getFileSystemWithConfiguredFailoverProxyProvider();
}
}
/**
* If NoNamenodesAvailableException occurs due to
* {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
* should rotated Cache.
*/
@Test
public void testShouldRotatedCache() throws Exception {
// 2 namenodes: 1 active, 1 standby.
// But there is no active namenode in router cache.
initEnv(0, false);
// At this time, the router has recorded 2 standby namenodes in memory.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false));
Path path = new Path("/test.file");
// The first create operation will cause NoNamenodesAvailableException and RotatedCache.
// After retrying, create and complete operation will be executed successfully.
fileSystem.create(path);
assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());
// At this time, the router has recorded 2 standby namenodes in memory,
// the operation can be successful without waiting for the router load cache.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false));
}
/**
* If a request still fails even if it is sent to active,
* then the change operation itself is illegal,
* the cache should not be rotated due to illegal operations.
*/
@Test
public void testShouldNotBeRotatedCache() throws Exception {
testShouldRotatedCache();
long proxyOpNoNamenodes = rpcMetrics.getProxyOpNoNamenodes();
Path path = new Path("/test.file");
/*
* we have put the actually active namenode at the front of the cache by rotating the cache.
* Therefore, the setPermission operation does not cause NoNamenodesAvailableException.
*/
fileSystem.setPermission(path, FsPermission.createImmutable((short)0640));
assertEquals(proxyOpNoNamenodes, rpcMetrics.getProxyOpNoNamenodes());
// At this time, the router has recorded 2 standby namenodes in memory
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false));
/*
* Even if the router transfers the illegal request to active,
* NoNamenodesAvailableException will still be generated.
* Therefore, rotated cache is not needed.
*/
List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "foo", ALL));
try {
fileSystem.setAcl(path, aclSpec);
}catch (RemoteException e) {
assertTrue(e.getMessage().contains(
"org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException: " +
"No namenodes available under nameservice ns0"));
assertTrue(e.getMessage().contains(
"org.apache.hadoop.hdfs.protocol.AclException: Invalid ACL: " +
"only directories may have a default ACL. Path: /test.file"));
}
// Retries is 2 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail
// when reties > max.attempts), so total access is 3.
assertEquals(proxyOpNoNamenodes + 3, rpcMetrics.getProxyOpNoNamenodes());
proxyOpNoNamenodes = rpcMetrics.getProxyOpNoNamenodes();
// So legal operations can be accessed normally without reporting NoNamenodesAvailableException.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false));
fileSystem.getFileStatus(path);
assertEquals(proxyOpNoNamenodes, rpcMetrics.getProxyOpNoNamenodes());
// At this time, the router has recorded 2 standby namenodes in memory,
// the operation can be successful without waiting for the router load cache.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false));
}
/**
* In the observer scenario, NoNamenodesAvailableException occurs,
* the operation can be successful without waiting for the router load cache.
*/
@Test
public void testUseObserver() throws Exception {
// 4 namenodes: 2 observers, 1 active, 1 standby.
// But there is no active namenode in router cache.
initEnv(2, true);
Path path = new Path("/");
// At this time, the router has recorded 2 standby namenodes in memory.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true));
// The first msync operation will cause NoNamenodesAvailableException and RotatedCache.
// After retrying, msync and getFileInfo operation will be executed successfully.
fileSystem.getFileStatus(path);
assertEquals(1, rpcMetrics.getObserverProxyOps());
assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());
// At this time, the router has recorded 2 standby namenodes in memory,
// the operation can be successful without waiting for the router load cache.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true));
}
/**
* In a multi-observer environment, if at least one observer is normal,
* read requests can still succeed even if NoNamenodesAvailableException occurs.
*/
@Test
public void testAtLeastOneObserverNormal() throws Exception {
// 4 namenodes: 2 observers, 1 active, 1 standby.
// But there is no active namenode in router cache.
initEnv(2, true);
// Shutdown one observer.
stopObserver(1);
/*
* The first msync operation will cause NoNamenodesAvailableException and RotatedCache.
* After retrying, msync operation will be executed successfully.
* Each read request will shuffle the observer,
* if the getFileInfo operation is sent to the downed observer,
* it will cause NoNamenodesAvailableException,
* at this time, the request can be retried to the normal observer,
* no NoNamenodesAvailableException will be generated and the operation will be successful.
*/
fileSystem.getFileStatus(new Path("/"));
assertEquals(1, rpcMetrics.getProxyOpNoNamenodes());
assertEquals(1, rpcMetrics.getObserverProxyOps());
// At this time, the router has recorded 2 standby namenodes in memory,
// the operation can be successful without waiting for the router load cache.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true));
}
/**
* If all obervers are down, read requests can succeed,
* even if a NoNamenodesAvailableException occurs.
*/
@Test
public void testAllObserverAbnormality() throws Exception {
// 4 namenodes: 2 observers, 1 active, 1 standby.
// But there is no active namenode in router cache.
initEnv(2, true);
// Shutdown all observers.
stopObserver(2);
/*
* The first msync operation will cause NoNamenodesAvailableException and RotatedCache.
* After retrying, msync operation will be executed successfully.
* The getFileInfo operation retried 2 namenodes, both causing UnavailableException,
* and continued to retry to the standby namenode,
* causing NoNamenodesAvailableException and RotatedCache,
* and the execution was successful after retrying.
*/
fileSystem.getFileStatus(new Path("/"));
assertEquals(2, rpcMetrics.getProxyOpFailureCommunicate());
assertEquals(2, rpcMetrics.getProxyOpNoNamenodes());
// At this time, the router has recorded 2 standby namenodes in memory,
// the operation can be successful without waiting for the router load cache.
assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true));
}
/**
* Determine whether cache of the router has an active namenode.
*
* @return true if no active namenode, otherwise false.
*/
private boolean routerCacheNoActiveNamenode(
RouterContext context, String nsId, boolean useObserver) throws IOException {
List<? extends FederationNamenodeContext> namenodes
= context.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(nsId, useObserver);
for (FederationNamenodeContext namenode : namenodes) {
if (namenode.getState() == FederationNamenodeServiceState.ACTIVE){
return false;
}
}
return true;
}
/**
* All routers in the cluster force loadcache.
*/
private void allRoutersLoadCache() {
for (MiniRouterDFSCluster.RouterContext context : cluster.getRouters()) {
// Update service cache
context.getRouter().getStateStore().refreshCaches(true);
}
}
/**
* Set the second non-observer state namenode in the router cache to active.
*/
private void setSecondNonObserverNamenodeInTheRouterCacheActive(
int numberOfObserver, boolean useObserver) throws IOException {
List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
.getNamenodeResolver()
.getNamenodesForNameserviceId("ns0", useObserver);
String nsId = ns0.get(numberOfObserver+1).getNamenodeId();
cluster.switchToActive("ns0", nsId);
assertEquals(ACTIVE.ordinal(),
cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
}
/**
* All routers in the cluster force heartbeat.
*/
private void allRoutersHeartbeat() throws IOException {
for (RouterContext context : cluster.getRouters()) {
// Manually trigger the heartbeat, but the router does not manually load the cache
Collection<NamenodeHeartbeatService> heartbeatServices = context
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
}
}
/**
* Transition the active namenode in the cluster to standby.
*/
private void transitionActiveToStandby() {
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
List<MiniRouterDFSCluster.NamenodeContext> nnList = cluster.getNamenodes(ns);
for (MiniRouterDFSCluster.NamenodeContext namenodeContext : nnList) {
if (namenodeContext.getNamenode().isActiveState()) {
cluster.switchToStandby(ns, namenodeContext.getNamenodeId());
}
}
}
}
}
/**
* Shutdown observer namenode in the cluster.
*
* @param num The number of shutdown observer.
*/
private void stopObserver(int num) {
int nnIndex;
int numNns = cluster.getNamenodes().size();
for (nnIndex = 0; nnIndex < numNns && num > 0; nnIndex++) {
NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
if (nameNode != null && nameNode.isObserverState()) {
cluster.getCluster().shutdownNameNode(nnIndex);
num--;
}
}
}
}