TestNameNodeReconfigure.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminBackoffMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.junit.Assert.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
public class TestNameNodeReconfigure {
public static final Logger LOG = LoggerFactory
.getLogger(TestNameNodeReconfigure.class);
private MiniDFSCluster cluster;
private final int customizedBlockInvalidateLimit = 500;
@Before
public void setUp() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_BLOCK_INVALIDATE_LIMIT_KEY,
customizedBlockInvalidateLimit);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
}
@Test
public void testReconfigureCallerContextEnabled()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final FSNamesystem nameSystem = nameNode.getNamesystem();
// try invalid values
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "text");
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, false);
// enable CallerContext
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "true");
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, true);
// disable CallerContext
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "false");
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, false);
// revert to default
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, null);
// verify default
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value", false,
nameSystem.getCallerContextEnabled());
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value", null,
nameNode.getConf().get(HADOOP_CALLER_CONTEXT_ENABLED_KEY));
}
void verifyReconfigureCallerContextEnabled(final NameNode nameNode,
final FSNamesystem nameSystem, boolean expected) {
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
expected, nameNode.getNamesystem().getCallerContextEnabled());
assertEquals(
HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
expected,
nameNode.getConf().getBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT));
}
/**
* Test to reconfigure enable/disable IPC backoff
*/
@Test
public void testReconfigureIPCBackoff() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer();
String ipcClientRPCBackoffEnable = NameNode.buildBackoffEnableKey(nnrs
.getClientRpcServer().getPort());
// try invalid values
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
false);
// enable IPC_CLIENT_RPC_BACKOFF
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, "true");
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
true);
// disable IPC_CLIENT_RPC_BACKOFF
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, "false");
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
false);
// revert to default
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, null);
assertEquals(ipcClientRPCBackoffEnable + " has wrong value", false,
nnrs.getClientRpcServer().isClientBackoffEnabled());
assertEquals(ipcClientRPCBackoffEnable + " has wrong value", null,
nameNode.getConf().get(ipcClientRPCBackoffEnable));
}
void verifyReconfigureIPCBackoff(final NameNode nameNode,
final NameNodeRpcServer nnrs, String property, boolean expected) {
assertEquals(property + " has wrong value", expected, nnrs
.getClientRpcServer().isClientBackoffEnabled());
assertEquals(property + " has wrong value", expected, nameNode.getConf()
.getBoolean(property, IPC_BACKOFF_ENABLE_DEFAULT));
}
/**
* Test to reconfigure interval of heart beat check and re-check.
*/
@Test
public void testReconfigureHearbeatCheck() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
// change properties
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "" + 6);
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
"" + (10 * 60 * 1000));
// try invalid values
try {
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue(expected.getCause() instanceof NumberFormatException);
}
try {
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
"text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue(expected.getCause() instanceof NumberFormatException);
}
// verify change
assertEquals(
DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
6,
nameNode.getConf().getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", 6,
datanodeManager.getHeartbeatInterval());
assertEquals(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY + " has wrong value",
10 * 60 * 1000,
nameNode.getConf().getInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", 10 * 60 * 1000,
datanodeManager.getHeartbeatRecheckInterval());
// change to a value with time unit
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "1m");
assertEquals(
DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
60,
nameNode.getConf().getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", 60,
datanodeManager.getHeartbeatInterval());
// revert to defaults
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, null);
nameNode.reconfigureProperty(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
null);
// verify defaults
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", null,
nameNode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
DFS_HEARTBEAT_INTERVAL_DEFAULT, datanodeManager.getHeartbeatInterval());
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", null,
nameNode.getConf().get(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY
+ " has wrong value", DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT,
datanodeManager.getHeartbeatRecheckInterval());
}
/**
* Tests enable/disable Storage Policy Satisfier dynamically when
* "dfs.storage.policy.enabled" feature is disabled.
*
* @throws ReconfigurationException
* @throws IOException
*/
@Test(timeout = 30000)
public void testReconfigureSPSWithStoragePolicyDisabled()
throws ReconfigurationException, IOException {
// shutdown cluster
cluster.shutdown();
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
final NameNode nameNode = cluster.getNameNode();
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE, false);
// enable SPS internally by keeping DFS_STORAGE_POLICY_ENABLED_KEY
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
// Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
assertNull("SPS shouldn't start as "
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
nameNode.getNamesystem().getBlockManager().getSPSManager());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL, false);
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.EXTERNAL.toString(), nameNode.getConf()
.get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
}
/**
* Tests enable/disable Storage Policy Satisfier dynamically.
*/
@Test(timeout = 30000)
public void testReconfigureStoragePolicySatisfierEnabled()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE, false);
// try invalid values
try {
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
"text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException e) {
GenericTestUtils.assertExceptionContains(
"For enabling or disabling storage policy satisfier, must "
+ "pass either internal/external/none string value only",
e.getCause());
}
// disable SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE, false);
// enable external SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
false, nameNode.getNamesystem().getBlockManager().getSPSManager()
.isSatisfierRunning());
assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
StoragePolicySatisfierMode.EXTERNAL.toString(),
nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
assertNotNull("SPS Manager should be created",
nameNode.getNamesystem().getBlockManager().getSPSManager());
}
/**
* Test to satisfy storage policy after disabled storage policy satisfier.
*/
@Test(timeout = 30000)
public void testSatisfyStoragePolicyAfterSatisfierDisabled()
throws ReconfigurationException, IOException {
final NameNode nameNode = cluster.getNameNode();
// disable SPS
nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE, false);
assertNull("SPS Manager should be null",
nameNode.getNamesystem().getBlockManager().getSPSManager());
Path filePath = new Path("/testSPS");
DistributedFileSystem fileSystem = cluster.getFileSystem();
fileSystem.create(filePath);
fileSystem.setStoragePolicy(filePath, "COLD");
try {
fileSystem.satisfyStoragePolicy(filePath);
fail("Expected to fail, as storage policy feature has disabled.");
} catch (RemoteException e) {
GenericTestUtils
.assertExceptionContains("Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been disabled"
+ " by admin. Seek for an admin help to enable it "
+ "or use Mover tool.", e);
}
}
void verifySPSEnabled(final NameNode nameNode, String property,
StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
StoragePolicySatisfyManager spsMgr = nameNode
.getNamesystem().getBlockManager().getSPSManager();
boolean isSPSRunning = spsMgr != null ? spsMgr.isSatisfierRunning()
: false;
assertEquals(property + " has wrong value", isSatisfierRunning, isSPSRunning);
String actual = nameNode.getConf().get(property,
DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
assertEquals(property + " has wrong value", expected,
StoragePolicySatisfierMode.fromString(actual));
}
@Test
public void testBlockInvalidateLimitAfterReconfigured()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not correctly set",
customizedBlockInvalidateLimit,
datanodeManager.getBlockInvalidateLimit());
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY,
Integer.toString(6));
// 20 * 6 = 120 < 500
// Invalid block limit should stay same as before after reconfiguration.
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ " is not honored after reconfiguration",
customizedBlockInvalidateLimit,
datanodeManager.getBlockInvalidateLimit());
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY,
Integer.toString(50));
// 20 * 50 = 1000 > 500
// Invalid block limit should be reset to 1000
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ " is not reconfigured correctly",
1000,
datanodeManager.getBlockInvalidateLimit());
}
@Test
public void testEnableParallelLoadAfterReconfigured()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
// By default, enableParallelLoad is false
assertEquals(false, FSImageFormatProtobuf.getEnableParallelLoad());
nameNode.reconfigureProperty(DFS_IMAGE_PARALLEL_LOAD_KEY,
Boolean.toString(true));
// After reconfigured, enableParallelLoad is true
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
}
@Test
public void testEnableSlowNodesParametersAfterReconfigured()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final BlockManager blockManager = nameNode.namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
// By default, avoidSlowDataNodesForRead is false.
assertEquals(false, datanodeManager.getEnableAvoidSlowDataNodesForRead());
nameNode.reconfigureProperty(
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, Boolean.toString(true));
// After reconfigured, avoidSlowDataNodesForRead is true.
assertEquals(true, datanodeManager.getEnableAvoidSlowDataNodesForRead());
// By default, excludeSlowNodesEnabled is false.
assertEquals(false, blockManager.
getExcludeSlowNodesEnabled(BlockType.CONTIGUOUS));
assertEquals(false, blockManager.
getExcludeSlowNodesEnabled(BlockType.STRIPED));
nameNode.reconfigureProperty(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, Boolean.toString(true));
// After reconfigured, excludeSlowNodesEnabled is true.
assertEquals(true, blockManager.
getExcludeSlowNodesEnabled(BlockType.CONTIGUOUS));
assertEquals(true, blockManager.
getExcludeSlowNodesEnabled(BlockType.STRIPED));
}
@Test
public void testReconfigureMaxSlowpeerCollectNodes()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
// By default, DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 5.
assertEquals(5, datanodeManager.getMaxSlowpeerCollectNodes());
// Reconfigure.
nameNode.reconfigureProperty(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, Integer.toString(10));
// Assert DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 10.
assertEquals(10, datanodeManager.getMaxSlowpeerCollectNodes());
}
@Test
public void testBlockInvalidateLimit() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not correctly set",
customizedBlockInvalidateLimit, datanodeManager.getBlockInvalidateLimit());
try {
nameNode.reconfigureProperty(DFS_BLOCK_INVALIDATE_LIMIT_KEY, "non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property dfs.block.invalidate.limit from '500' to 'non-numeric'",
e.getMessage());
}
nameNode.reconfigureProperty(DFS_BLOCK_INVALIDATE_LIMIT_KEY, "2500");
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not honored after reconfiguration", 2500,
datanodeManager.getBlockInvalidateLimit());
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "500");
// 20 * 500 (10000) > 2500
// Hence, invalid block limit should be reset to 10000
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not reconfigured correctly", 10000,
datanodeManager.getBlockInvalidateLimit());
}
@Test
public void testSlowPeerTrackerEnabled() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem.getBlockManager()
.getDatanodeManager();
assertFalse("SlowNode tracker is already enabled. It should be disabled by default",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertTrue(datanodeManager.isSlowPeerCollectorInitialized());
try {
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "non-boolean");
fail("should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property dfs.datanode.peer.stats.enabled from 'false' to 'non-boolean'",
e.getMessage());
}
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "True");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
assertFalse("SlowNode tracker is still enabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
}
@Test
public void testSlowPeerMaxNodesToReportReconf() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem.getBlockManager()
.getDatanodeManager();
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
SlowPeerTracker tracker = datanodeManager.getSlowPeerTracker();
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
tracker.addReport("node1", "node70", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
tracker.addReport("node2", "node71", outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
tracker.addReport("node3", "node72", outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node4", "node73", outlierMetrics4);
OutlierMetrics outlierMetrics5 = new OutlierMetrics(0.0, 0.0, 0.0, 0.2);
tracker.addReport("node5", "node74", outlierMetrics4);
OutlierMetrics outlierMetrics6 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node6", "node75", outlierMetrics4);
String jsonReport = tracker.getJson();
LOG.info("Retrieved slow peer json report: {}", jsonReport);
List<Boolean> containReport = validatePeerReport(jsonReport);
assertEquals(1, containReport.stream().filter(reportVal -> !reportVal).count());
nameNode.reconfigurePropertyImpl(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, "2");
jsonReport = tracker.getJson();
LOG.info("Retrieved slow peer json report: {}", jsonReport);
containReport = validatePeerReport(jsonReport);
assertEquals(4, containReport.stream().filter(reportVal -> !reportVal).count());
}
private List<Boolean> validatePeerReport(String jsonReport) {
List<Boolean> containReport = new ArrayList<>();
containReport.add(jsonReport.contains("node1"));
containReport.add(jsonReport.contains("node2"));
containReport.add(jsonReport.contains("node3"));
containReport.add(jsonReport.contains("node4"));
containReport.add(jsonReport.contains("node5"));
containReport.add(jsonReport.contains("node6"));
return containReport;
}
@Test
public void testReconfigureDecommissionBackoffMonitorParameters()
throws ReconfigurationException, IOException {
Configuration conf = new HdfsConfiguration();
conf.setClass(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS,
DatanodeAdminBackoffMonitor.class, DatanodeAdminMonitorInterface.class);
int defaultPendingRepLimit = 1000;
conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, defaultPendingRepLimit);
int defaultBlocksPerLock = 1000;
conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
defaultBlocksPerLock);
try (MiniDFSCluster newCluster = new MiniDFSCluster.Builder(conf).build()) {
newCluster.waitActive();
final NameNode nameNode = newCluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
// verify defaultPendingRepLimit.
assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(),
defaultPendingRepLimit);
// try invalid pendingRepLimit.
try {
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
defaultPendingRepLimit + "' to 'non-numeric'", e.getMessage());
}
try {
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"-1");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
defaultPendingRepLimit + "' to '-1'", e.getMessage());
}
// try correct pendingRepLimit.
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"20000");
assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(), 20000);
// verify defaultBlocksPerLock.
assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(),
defaultBlocksPerLock);
// try invalid blocksPerLock.
try {
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
defaultBlocksPerLock + "' to 'non-numeric'", e.getMessage());
}
try {
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "-1");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
defaultBlocksPerLock + "' to '-1'", e.getMessage());
}
// try correct blocksPerLock.
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "10000");
assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(), 10000);
}
}
@Test
public void testReconfigureMinBlocksForWrite() throws Exception {
final NameNode nameNode = cluster.getNameNode(0);
final BlockManager bm = nameNode.getNamesystem().getBlockManager();
String key = DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
int defaultVal = DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
// Ensure we cannot set any of the parameters negative
ReconfigurationException reconfigurationException =
LambdaTestUtils.intercept(ReconfigurationException.class,
() -> nameNode.reconfigurePropertyImpl(key, "-20"));
assertTrue(reconfigurationException.getCause() instanceof IllegalArgumentException);
assertEquals(key + " = '-20' is invalid. It should be a "
+"positive, non-zero integer value.", reconfigurationException.getCause().getMessage());
// Ensure none of the values were updated from the defaults
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
reconfigurationException = LambdaTestUtils.intercept(ReconfigurationException.class,
() -> nameNode.reconfigurePropertyImpl(key, "0"));
assertTrue(reconfigurationException.getCause() instanceof IllegalArgumentException);
assertEquals(key + " = '0' is invalid. It should be a "
+"positive, non-zero integer value.", reconfigurationException.getCause().getMessage());
// Ensure none of the values were updated from the defaults
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
// Ensure none of the parameters can be set to a string value
reconfigurationException = LambdaTestUtils.intercept(ReconfigurationException.class,
() -> nameNode.reconfigurePropertyImpl(key, "str"));
assertTrue(reconfigurationException.getCause() instanceof NumberFormatException);
// Ensure none of the values were updated from the defaults
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
nameNode.reconfigurePropertyImpl(key, "3");
// Ensure none of the values were updated from the new value.
assertEquals(3, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
assertEquals(3, bm.getMinBlocksForWrite(BlockType.STRIPED));
}
@Test
public void testReconfigureLogSlowRPC() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer();
// verify default value.
assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());
assertEquals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT,
nnrs.getClientRpcServer().getLogSlowRPCThresholdTime());
// try invalid logSlowRPC.
try {
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "non-boolean");
fail("should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property ipc.server.log.slow.rpc from 'false' to 'non-boolean'",
e.getMessage());
}
// try correct logSlowRPC.
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "True");
assertTrue(nnrs.getClientRpcServer().isLogSlowRPC());
// revert to defaults.
nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, null);
assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());
// try invalid logSlowRPCThresholdTime.
try {
nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"ipc.server.log.slow.rpc.threshold.ms from '0' to 'non-numeric'", e.getMessage());
}
// try correct logSlowRPCThresholdTime.
nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
"20000");
assertEquals(nnrs.getClientRpcServer().getLogSlowRPCThresholdTime(), 20000);
}
@Test
public void testReconfigureFSNamesystemLockMetricsParameters()
throws ReconfigurationException, IOException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, false);
long defaultReadLockMS = 1000L;
conf.setLong(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, defaultReadLockMS);
long defaultWriteLockMS = 1000L;
conf.setLong(DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, defaultWriteLockMS);
try (MiniDFSCluster newCluster = new MiniDFSCluster.Builder(conf).build()) {
newCluster.waitActive();
final NameNode nameNode = newCluster.getNameNode();
final FSNamesystem fsNamesystem = nameNode.getNamesystem();
// verify default value.
assertFalse(fsNamesystem.isMetricsEnabled());
assertEquals(defaultReadLockMS, fsNamesystem.getReadLockReportingThresholdMs());
assertEquals(defaultWriteLockMS, fsNamesystem.getWriteLockReportingThresholdMs());
// try invalid metricsEnabled.
try {
nameNode.reconfigurePropertyImpl(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
"non-boolean");
fail("should not reach here");
} catch (ReconfigurationException e) {
assertEquals(
"Could not change property dfs.namenode.lock.detailed-metrics.enabled from " +
"'false' to 'non-boolean'", e.getMessage());
}
// try correct metricsEnabled.
nameNode.reconfigurePropertyImpl(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, "true");
assertTrue(fsNamesystem.isMetricsEnabled());
nameNode.reconfigurePropertyImpl(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, null);
assertFalse(fsNamesystem.isMetricsEnabled());
// try invalid readLockMS.
try {
nameNode.reconfigureProperty(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.read-lock-reporting-threshold-ms from '" +
defaultReadLockMS + "' to 'non-numeric'", e.getMessage());
}
// try correct readLockMS.
nameNode.reconfigureProperty(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
"20000");
assertEquals(fsNamesystem.getReadLockReportingThresholdMs(), 20000);
// try invalid writeLockMS.
try {
nameNode.reconfigureProperty(
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, "non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.write-lock-reporting-threshold-ms from '" +
defaultWriteLockMS + "' to 'non-numeric'", e.getMessage());
}
// try correct writeLockMS.
nameNode.reconfigureProperty(
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, "100000");
assertEquals(fsNamesystem.getWriteLockReportingThresholdMs(), 100000);
}
}
@Test
public void testReconfigureSlowPeerCollectInterval() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager =
nameNode.namesystem.getBlockManager().getDatanodeManager();
assertFalse("SlowNode tracker is already enabled. It should be disabled by default",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertTrue(datanodeManager.isSlowPeerCollectorInitialized());
try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
} catch (NullPointerException e) {
assertEquals("slowPeerCollectorDaemon thread is null, not support restart", e.getMessage());
}
nameNode.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "True");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(1800000, datanodeManager.getSlowPeerCollectionInterval());
try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "non-numeric");
} catch (ReconfigurationException e) {
assertEquals("Could not change property dfs.namenode.slowpeer.collect.interval from "
+ "'30m' to 'non-numeric'", e.getMessage());
}
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, null);
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
// set to the value of the current system
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());
}
@After
public void shutDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
}