TestDataNodeReconfiguration.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test to reconfigure some parameters for DataNode without restart
*/
public class TestDataNodeReconfiguration {
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+ "data";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
private final int NUM_NAME_NODE = 1;
private final int NUM_DATA_NODE = 10;
private MiniDFSCluster cluster;
private static long counter = 0;
@Before
public void Setup() throws IOException {
startDFSCluster(NUM_NAME_NODE, NUM_DATA_NODE);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
File dir = new File(DATA_DIR);
if (dir.exists())
Assert.assertTrue("Cannot delete data-node dirs",
FileUtil.fullyDelete(dir));
}
private void startDFSCluster(int numNameNodes, int numDataNodes)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
MiniDFSNNTopology nnTopology = MiniDFSNNTopology
.simpleFederatedTopology(numNameNodes);
cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology)
.numDataNodes(numDataNodes).build();
cluster.waitActive();
}
/**
* Starts an instance of DataNode
*
* @throws IOException
*/
public DataNode[] createDNsForTest(int numDateNode) throws IOException {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
DataNode[] result = new DataNode[numDateNode];
for (int i = 0; i < numDateNode; i++) {
result[i] = InternalDataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
}
return result;
}
@Test
public void testMaxConcurrentMoversReconfiguration()
throws ReconfigurationException, IOException {
int maxConcurrentMovers = 10;
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// try invalid values
try {
dn.reconfigureProperty(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
try {
dn.reconfigureProperty(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
String.valueOf(0));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
// change properties
dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
String.valueOf(maxConcurrentMovers));
// verify change
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
maxConcurrentMovers, dn.xserver.balanceThrottler.getMaxConcurrentMovers());
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
maxConcurrentMovers, Integer.parseInt(dn.getConf().get(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)));
// revert to default
dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
null);
// verify default
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT,
dn.xserver.balanceThrottler.getMaxConcurrentMovers());
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), null, dn
.getConf().get(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
}
}
@Test
public void testAcquireWithMaxConcurrentMoversGreaterThanDefault()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
try {
testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 10);
} finally {
dns[0].shutdown();
}
}
@Test
public void testAcquireWithMaxConcurrentMoversLessThanDefault()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
try {
testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 3);
} finally {
dns[0].shutdown();
}
}
/**
* Simulates a scenario where the DataNode has been reconfigured with fewer
* mover threads, but all of the current treads are busy and therefore the
* DataNode is unable to honor this request within a reasonable amount of
* time. The DataNode eventually gives up and returns a flag indicating that
* the request was not honored.
*/
@Test
public void testFailedDecreaseConcurrentMovers()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
final DataNode dataNode = dns[0];
try {
// Set the current max to 2
dataNode.xserver.updateBalancerMaxConcurrentMovers(2);
// Simulate grabbing 2 threads
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.setMaxReconfigureWaitTime(1);
// Attempt to set new maximum to 1
final boolean success =
dataNode.xserver.updateBalancerMaxConcurrentMovers(1);
Assert.assertFalse(success);
} finally {
dataNode.shutdown();
}
}
/**
* Test with invalid configuration.
*/
@Test(expected = ReconfigurationException.class)
public void testFailedDecreaseConcurrentMoversReconfiguration()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
final DataNode dataNode = dns[0];
try {
// Set the current max to 2
dataNode.xserver.updateBalancerMaxConcurrentMovers(2);
// Simulate grabbing 2 threads
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.setMaxReconfigureWaitTime(1);
// Now try reconfigure maximum downwards with threads released
dataNode.reconfigurePropertyImpl(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "1");
} catch (ReconfigurationException e) {
Assert.assertEquals(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
e.getProperty());
Assert.assertEquals("1", e.getNewValue());
throw e;
} finally {
dataNode.shutdown();
}
}
private void testAcquireOnMaxConcurrentMoversReconfiguration(
DataNode dataNode, int maxConcurrentMovers) throws IOException,
ReconfigurationException {
final int defaultMaxThreads = dataNode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
/** Test that the default setup is working */
for (int i = 0; i < defaultMaxThreads; i++) {
assertEquals("should be able to get thread quota", true,
dataNode.xserver.balanceThrottler.acquire());
}
assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire());
// Give back the threads
for (int i = 0; i < defaultMaxThreads; i++) {
dataNode.xserver.balanceThrottler.release();
}
/** Test that the change is applied correctly */
// change properties
dataNode.reconfigureProperty(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
String.valueOf(maxConcurrentMovers));
assertEquals("thread quota is wrong", maxConcurrentMovers,
dataNode.xserver.balanceThrottler.getMaxConcurrentMovers());
for (int i = 0; i < maxConcurrentMovers; i++) {
assertEquals("should be able to get thread quota", true,
dataNode.xserver.balanceThrottler.acquire());
}
assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire());
}
@Test
public void testBlockReportIntervalReconfiguration()
throws ReconfigurationException {
int blockReportInterval = 300 * 1000;
String[] blockReportParameters = {
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_KEY};
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
BlockPoolManager blockPoolManager = dn.getBlockPoolManager();
// Try invalid values.
for (String blockReportParameter : blockReportParameters) {
try {
dn.reconfigureProperty(blockReportParameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
}
try {
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
try {
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1));
assertEquals(0, dn.getDnConf().initialBlockReportDelayMs);
// Change properties and verify the change.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(blockReportInterval));
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123));
assertEquals(123, dn.getDnConf().blockReportSplitThreshold);
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123");
assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs);
// Revert to default and verify default.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null);
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY),
dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY));
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null);
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY),
dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY));
}
}
@Test
public void testDataXceiverReconfiguration()
throws ReconfigurationException {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
// Change properties and verify change.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
123, dn.getXferServer().getMaxXceiverCount());
dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
String.valueOf(1000));
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getTransferThrottler().getBandwidth());
dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
String.valueOf(1000));
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getWriteThrottler().getBandwidth());
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
String.valueOf(1000));
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getReadThrottler().getBandwidth());
// Revert to default.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, null);
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
null, dn.getXferServer().getTransferThrottler());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY));
dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, null);
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
null, dn.getXferServer().getWriteThrottler());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, null);
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
null, dn.getXferServer().getReadThrottler());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY));
}
}
@Test
public void testCacheReportReconfiguration()
throws ReconfigurationException {
int cacheReportInterval = 300 * 1000;
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
try {
dn.reconfigureProperty(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
// Change properties.
dn.reconfigureProperty(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
String.valueOf(cacheReportInterval));
// Verify change.
assertEquals(String.format("%s has wrong value", DFS_CACHEREPORT_INTERVAL_MSEC_KEY),
cacheReportInterval, dn.getDnConf().getCacheReportInterval());
// Revert to default.
dn.reconfigureProperty(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_CACHEREPORT_INTERVAL_MSEC_KEY),
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT, dn.getDnConf().getCacheReportInterval());
assertNull(String.format("expect %s is not configured", DFS_CACHEREPORT_INTERVAL_MSEC_KEY),
dn.getConf().get(DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
}
}
@Test
public void testSlowPeerParameters() throws Exception {
String[] slowPeersParameters = {
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY};
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.peer.stats.enabled from 'true' to 'text'",
() -> dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "text"));
for (String parameter : slowPeersParameters) {
try {
dn.reconfigureProperty(parameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(parameter, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
}
// Change and verify properties.
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
assertFalse(dn.getDnConf().peerStatsEnabled);
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
for (String parameter : slowPeersParameters) {
dn.reconfigureProperty(parameter, "123");
}
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionNodes());
assertEquals(123, dn.getPeerMetrics().getLowThresholdMs());
assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionSamples());
assertEquals(123,
dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes());
assertEquals(123,
dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs());
// Revert to default and verify.
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_PEER_STATS_ENABLED_KEY), null,
dn.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY));
// Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
for (String parameter : slowPeersParameters) {
dn.reconfigureProperty(parameter, null);
}
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY), null,
dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY));
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY), null,
dn.getConf().get(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY));
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY), null,
dn.getConf().get(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes(),
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs(),
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
}
}
@Test
public void testSlowDiskParameters() throws ReconfigurationException, IOException {
String[] slowDisksParameters1 = {
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY};
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
try {
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "text");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "text");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
// Enable disk stats, make DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY > 0.
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "1");
for (String parameter : slowDisksParameters1) {
try {
dn.reconfigureProperty(parameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(parameter, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
}
// Change and verify properties.
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1ms");
assertEquals(1, dn.getDnConf().outliersReportIntervalMs);
BlockPoolManager blockPoolManager = new BlockPoolManager(dn);
blockPoolManager.refreshNamenodes(dn.getConf());
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY),
1, actor.getScheduler().getOutliersReportIntervalMs());
}
}
}
String[] slowDisksParameters2 = {
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY};
for (String parameter : slowDisksParameters2) {
dn.reconfigureProperty(parameter, "99");
}
// Assert diskMetrics.
assertEquals(99, dn.getDiskMetrics().getMinOutlierDetectionDisks());
assertEquals(99, dn.getDiskMetrics().getLowThresholdMs());
assertEquals(99, dn.getDiskMetrics().getMaxSlowDisksToExclude());
// Assert dnConf.
assertTrue(dn.getDnConf().diskStatsEnabled);
// Assert profilingEventHook.
assertTrue(dn.getFileIoProvider().getProfilingEventHook().getDiskStatsEnabled());
assertEquals((int) ((double) 99 / 100 * Integer.MAX_VALUE),
dn.getFileIoProvider().getProfilingEventHook().getSampleRangeMax());
// Assert slowDiskDetector.
assertEquals(99,
dn.getDiskMetrics().getSlowDiskDetector().getMinOutlierDetectionNodes());
assertEquals(99,
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
// Revert to default and verify.
dn.reconfigureProperty(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, null);
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY), null,
dn.getConf().get(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY));
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, null);
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY), null,
dn.getConf().get(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY));
assertFalse(dn.getFileIoProvider().getProfilingEventHook().getDiskStatsEnabled());
assertEquals(0,
dn.getFileIoProvider().getProfilingEventHook().getSampleRangeMax());
// Enable disk stats, make DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY > 0.
dn.reconfigureProperty(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, "1");
dn.reconfigureProperty(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY, null);
dn.reconfigureProperty(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, null);
dn.reconfigureProperty(DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, null);
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY), null,
dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY));
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY), null,
dn.getConf().get(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY), null,
dn.getConf().get(DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY));
assertEquals(DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT,
dn.getDiskMetrics().getSlowDiskDetector().getMinOutlierDetectionNodes());
assertEquals(DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT,
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
}
}
@Test
public void testDfsUsageParameters() throws ReconfigurationException {
String[] dfsUsageParameters = {
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY};
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
for (String parameter : dfsUsageParameters) {
try {
dn.reconfigureProperty(parameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(parameter, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
}
// Change and verify properties.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, "99");
}
List<FsVolumeImpl> volumeList = dn.data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
}
}
// Revert to default and verify.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, null);
}
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), null,
dn.getConf().get(FS_DU_INTERVAL_KEY));
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), null,
dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY));
}
}
}
}
public static class DummyCachingGetSpaceUsed extends CachingGetSpaceUsed {
public DummyCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder.setInterval(1000).setJitter(0L));
}
@Override
protected void refresh() {
counter++;
}
}
@Test
public void testDfsUsageKlass() throws ReconfigurationException, InterruptedException {
long lastCounter = counter;
Thread.sleep(5000);
assertEquals(lastCounter, counter);
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
dn.reconfigurePropertyImpl(FS_GETSPACEUSED_CLASSNAME,
DummyCachingGetSpaceUsed.class.getName());
}
lastCounter = counter;
Thread.sleep(5000);
assertTrue(counter > lastCounter);
}
@Test
public void testDiskBalancerParameters() throws Exception {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Verify DFS_DISK_BALANCER_ENABLED.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.enabled from 'true' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text"));
// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null);
assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled());
// Set DFS_DISK_BALANCER_ENABLED to false.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false");
assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled());
// Set DFS_DISK_BALANCER_ENABLED to true.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true");
assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled());
// Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.plan.valid.interval from " +
"'1d' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text"));
// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null);
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityIntervalInConfig());
// Set value is 6 then 6 milliseconds.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6);
assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
// Set value with time unit.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m");
assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
}
}
@Test
public void testSlowIoWarningThresholdReconfiguration() throws Exception {
int slowIoWarningThreshold = 500;
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Verify DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.slow.io.warning.threshold.ms from "
+ "'300' to 'text'",
() -> dn.reconfigureProperty(DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, "text"));
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.slow.io.warning.threshold.ms from "
+ "'300' to '-1'",
() -> dn.reconfigureProperty(DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, "-1"));
// Set value is 500.
dn.reconfigureProperty(DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
String.valueOf(slowIoWarningThreshold));
assertEquals(slowIoWarningThreshold, dn.getDnConf().getSlowIoWarningThresholdMs());
// Set default value.
dn.reconfigureProperty(DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, null);
assertEquals(DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT,
dn.getDnConf().getSlowIoWarningThresholdMs());
}
}
}