TestDFSAdmin.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.tools;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;

import org.apache.commons.io.FileUtils;
import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.TestRefreshUserMappings;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ToolRunner;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}.
 */
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class TestDFSAdmin {
  private static final Logger LOG = LoggerFactory.getLogger(TestDFSAdmin.class);
  private Configuration conf = null;
  private MiniDFSCluster cluster;
  private DFSAdmin admin;
  private DataNode datanode;
  private NameNode namenode;
  private final ByteArrayOutputStream out = new ByteArrayOutputStream();
  private final ByteArrayOutputStream err = new ByteArrayOutputStream();
  private static final PrintStream OLD_OUT = System.out;
  private static final PrintStream OLD_ERR = System.err;
  private String tempResource = null;
  private static final int NUM_DATANODES = 2;

  @BeforeEach
  public void setUp() throws Exception {
    conf = new Configuration();
    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3);
    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
        GenericTestUtils.getRandomizedTempPath());
    conf.setInt(DFSConfigKeys.FS_TRASH_INTERVAL_KEY, 60);
    conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
    restartCluster();

    admin = new DFSAdmin(conf);
  }

  private void redirectStream() {
    System.setOut(new PrintStream(out));
    System.setErr(new PrintStream(err));
  }

  private void resetStream() {
    out.reset();
    err.reset();
  }

  @AfterEach
  public void tearDown() throws Exception {
    try {
      System.out.flush();
      System.err.flush();
    } finally {
      System.setOut(OLD_OUT);
      System.setErr(OLD_ERR);
    }

    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }

    resetStream();
    if (tempResource != null) {
      File f = new File(tempResource);
      FileUtils.deleteQuietly(f);
      tempResource = null;
    }
  }

  private void restartCluster() throws IOException {
    if (cluster != null) {
      cluster.shutdown();
    }
    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(NUM_DATANODES).build();
    cluster.waitActive();
    datanode = cluster.getDataNodes().get(0);
    namenode = cluster.getNameNode();
  }

  private void getReconfigurableProperties(String nodeType, String address,
      final List<String> outs, final List<String> errs) throws IOException, InterruptedException {
    reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
        address, outs, errs);
  }

  private void getReconfigurationStatus(String nodeType, String address,
      final List<String> outs, final List<String> errs) throws IOException, InterruptedException {
    reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
        address, outs, errs);
  }

  private void reconfigurationOutErrFormatter(String methodName,
      String nodeType, String address, final List<String> outs,
      final List<String> errs) throws IOException, InterruptedException {
    ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
    PrintStream outStream = new PrintStream(bufOut);
    ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
    PrintStream errStream = new PrintStream(bufErr);

    if (methodName.equals("getReconfigurableProperties")) {
      admin.getReconfigurableProperties(
          nodeType,
          address,
          outStream,
          errStream);
    } else if (methodName.equals("getReconfigurationStatus")) {
      admin.getReconfigurationStatusUtil(nodeType, address, outStream, errStream);
    } else if (methodName.equals("startReconfiguration")) {
      admin.startReconfigurationUtil(nodeType, address, outStream, errStream);
    }

    scanIntoList(bufOut, outs);
    scanIntoList(bufErr, errs);
  }

  private static void scanIntoList(
      final ByteArrayOutputStream baos,
      final List<String> list) {
    final Scanner scanner = new Scanner(baos.toString());
    while (scanner.hasNextLine()) {
      list.add(scanner.nextLine());
    }
    scanner.close();
  }

  @Test
  @Timeout(value = 30)
  public void testGetDatanodeInfo() throws Exception {
    redirectStream();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);

    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
      resetStream();
      final DataNode dn = cluster.getDataNodes().get(i);
      final String addr = String.format(
          "%s:%d",
          dn.getXferAddress().getHostString(),
          dn.getIpcPort());
      final int ret = ToolRunner.run(dfsAdmin,
          new String[]{"-getDatanodeInfo", addr});
      assertEquals(0, ret);

      /* collect outputs */
      final List<String> outs = Lists.newArrayList();
      scanIntoList(out, outs);
      /* verify results */
      assertEquals(1, outs.size(),
          "One line per DataNode like: Uptime: XXX, Software version: x.y.z,"
              + " Config version: core-x.y.z,hdfs-x");
      assertThat(outs.get(0))
          .contains("Uptime:")
          .contains("Software version")
          .contains("Config version");
    }
  }

  @Test
  @Timeout(value = 30)
  public void testTriggerBlockReport() throws Exception {
    redirectStream();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
    final DataNode dn = cluster.getDataNodes().get(0);
    final NameNode nn = cluster.getNameNode();

    final String dnAddr = String.format(
        "%s:%d",
        dn.getXferAddress().getHostString(),
        dn.getIpcPort());
    final String nnAddr = nn.getHostAndPort();
    resetStream();
    final List<String> outs = Lists.newArrayList();
    final int ret = ToolRunner.run(dfsAdmin,
        new String[]{"-triggerBlockReport", dnAddr, "-incremental", "-namenode", nnAddr});
    assertEquals(0, ret);

    scanIntoList(out, outs);
    assertEquals(1, outs.size());
    assertThat(outs.get(0)).
        contains("Triggering an incremental block report on ").
        contains(" to namenode ");
  }

  @Test
  @Timeout(value = 30)
  public void testGetVolumeReport() throws Exception {
    redirectStream();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);

    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
      resetStream();
      final DataNode dn = cluster.getDataNodes().get(i);
      final String addr = String.format("%s:%d", dn.getXferAddress()
          .getHostString(), dn.getIpcPort());
      final int ret = ToolRunner.run(dfsAdmin, new String[] {
          "-getVolumeReport", addr });
      assertEquals(0, ret);

      /* collect outputs */
      final List<String> outs = Lists.newArrayList();
      scanIntoList(out, outs);
      assertEquals(outs.get(0), "Active Volumes : 2");
    }
  }
  /**
   * Test that if datanode is not reachable, some DFSAdmin commands will fail
   * elegantly with non-zero ret error code along with exception error message.
   */
  @Test
  @Timeout(value = 60)
  public void testDFSAdminUnreachableDatanode() throws Exception {
    redirectStream();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
    for (String command : new String[]{"-getDatanodeInfo",
        "-evictWriters", "-getBalancerBandwidth"}) {
      // Connecting to Xfer port instead of IPC port will get
      // Datanode unreachable. java.io.EOFException
      final String dnDataAddr = datanode.getXferAddress().getHostString() + ":"
          + datanode.getXferPort();
      resetStream();
      final List<String> outs = Lists.newArrayList();
      final int ret = ToolRunner.run(dfsAdmin,
          new String[]{command, dnDataAddr});
      assertEquals(-1, ret);

      scanIntoList(out, outs);
      assertTrue(outs.isEmpty(), "Unexpected " + command + " stdout: " + out);
      assertTrue(err.toString().contains("Exception"),
          "Unexpected " + command + " stderr: " + err);
    }
  }

  @Test
  @Timeout(value = 30)
  public void testDataNodeGetReconfigurableProperties() throws IOException, InterruptedException {
    final int port = datanode.getIpcPort();
    final String address = "localhost:" + port;
    final List<String> outs = Lists.newArrayList();
    final List<String> errs = Lists.newArrayList();
    getReconfigurableProperties("datanode", address, outs, errs);
    assertEquals(26, outs.size());
    assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
  }

  /**
   * Test reconfiguration and check the status outputs.
   * @param expectedSuccuss set true if the reconfiguration task should success.
   * @throws IOException
   * @throws InterruptedException
   * @throws TimeoutException
   */
  private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
      throws IOException, InterruptedException, TimeoutException {
    ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
    datanode.setReconfigurationUtil(ru);

    List<ReconfigurationUtil.PropertyChange> changes =
        new ArrayList<>();
    File newDir = new File(cluster.getDataDirectory(), "data_new");
    if (expectedSuccuss) {
      newDir.mkdirs();
    } else {
      // Inject failure.
      newDir.createNewFile();
    }
    changes.add(new ReconfigurationUtil.PropertyChange(
        DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
        datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
    changes.add(new ReconfigurationUtil.PropertyChange(
        "randomKey", "new123", "old456"));
    when(ru.parseChangedProperties(any(Configuration.class),
        any(Configuration.class))).thenReturn(changes);

    final int port = datanode.getIpcPort();
    final String address = "localhost:" + port;

    assertThat(admin.startReconfiguration("datanode", address)).isEqualTo(0);

    final List<String> outs = Lists.newArrayList();
    final List<String> errs = Lists.newArrayList();
    awaitReconfigurationFinished("datanode", address, outs, errs);

    if (expectedSuccuss) {
      assertThat(outs.size()).isEqualTo(4);
    } else {
      assertThat(outs.size()).isEqualTo(6);
    }

    List<StorageLocation> locations = DataNode.getStorageLocations(
        datanode.getConf());
    if (expectedSuccuss) {
      assertThat(locations.size()).isEqualTo(1);
      assertThat(new File(locations.get(0).getUri())).isEqualTo(newDir);
      // Verify the directory is appropriately formatted.
      assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
    } else {
      assertTrue(locations.isEmpty());
    }

    int offset = 1;
    if (expectedSuccuss) {
      assertThat(outs.get(offset)).
          contains("SUCCESS: Changed property " +
              DFS_DATANODE_DATA_DIR_KEY);
    } else {
      assertThat(outs.get(offset)).
          contains("FAILED: Change property " +
              DFS_DATANODE_DATA_DIR_KEY);
    }
    File dnDir0 = cluster.getInstanceStorageDir(0, 0);
    File dnDir1 = cluster.getInstanceStorageDir(0, 1);
    assertThat(outs.get(offset + 1)).
        contains("From:").
        contains(dnDir0.getName())
        .contains(dnDir1.getName());
    assertThat(outs.get(offset + 2))
        .doesNotContain(dnDir0.getName())
        .doesNotContain(dnDir1.getName());
    assertThat(outs.get(offset + 2)).contains("To").contains("data_new");
  }

  @Test
  @Timeout(value = 30)
  public void testDataNodeGetReconfigurationStatus() throws IOException,
      InterruptedException, TimeoutException {
    testDataNodeGetReconfigurationStatus(true);
    restartCluster();
    testDataNodeGetReconfigurationStatus(false);
  }

  @Test
  @Timeout(value = 30)
  public void testNameNodeGetReconfigurableProperties() throws IOException, InterruptedException {
    final String address = namenode.getHostAndPort();
    final List<String> outs = Lists.newArrayList();
    final List<String> errs = Lists.newArrayList();
    getReconfigurableProperties("namenode", address, outs, errs);
    assertEquals(29, outs.size());
    assertTrue(outs.get(0).contains("Reconfigurable properties:"));
    assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
    assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
    assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(3));
    assertEquals(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, outs.get(4));
    assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(5));
    assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(6));
    assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(7));
    assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(8));
    assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(9));
    assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY, outs.get(10));
    assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(11));
    assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(12));
    assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(13));
    assertEquals(DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY, outs.get(14));
    assertEquals(errs.size(), 0);
  }

  void awaitReconfigurationFinished(final String nodeType,
      final String address, final List<String> outs, final List<String> errs)
      throws TimeoutException, IOException, InterruptedException {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        outs.clear();
        errs.clear();
        try {
          getReconfigurationStatus(nodeType, address, outs, errs);
        } catch (IOException | InterruptedException e) {
          LOG.error(String.format(
              "call getReconfigurationStatus on %s[%s] failed.", nodeType,
              address), e);
        }
        return !outs.isEmpty() && outs.get(0).contains("finished");

      }
    }, 100, 100 * 100);
  }

  @Test
  @Timeout(value = 30)
  public void testPrintTopology() throws Exception {
    redirectStream();

    /* init conf */
    final Configuration dfsConf = new HdfsConfiguration();
    final File baseDir = new File(
        PathUtils.getTestDir(getClass()),
        GenericTestUtils.getMethodName());
    dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());

    final int numDn = 4;
    final String[] racks = {
        "/d1/r1", "/d1/r2",
        "/d2/r1", "/d2/r2"};

    /* init cluster using topology */
    try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf)
        .numDataNodes(numDn).racks(racks).build()) {

      miniCluster.waitActive();
      assertEquals(numDn, miniCluster.getDataNodes().size());
      final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);

      resetStream();
      final int ret = ToolRunner.run(dfsAdmin, new String[] {"-printTopology"});

      /* collect outputs */
      final List<String> outs = Lists.newArrayList();
      scanIntoList(out, outs);

      /* verify results */
      assertEquals(0, ret);
      assertEquals(12, outs.size(),
          "There should be three lines per Datanode: the 1st line is"
              + " rack info, 2nd node info, 3rd empty line. The total"
              + " should be as a result of 3 * numDn.");
      assertThat(outs.get(0)).
          contains("Rack:").contains("/d1/r1");
      assertThat(outs.get(3)).
          contains("Rack:").contains("/d1/r2");
      assertThat(outs.get(6)).
          contains("Rack:").contains("/d2/r1");
      assertThat(outs.get(9)).
          contains("Rack:").contains("/d2/r2");
    }
  }

  @Test
  @Timeout(value = 30)
  public void testPrintTopologyWithStatus() throws Exception {
    redirectStream();
    final Configuration dfsConf = new HdfsConfiguration();
    final File baseDir = new File(
            PathUtils.getTestDir(getClass()),
            GenericTestUtils.getMethodName());
    dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());

    final int numDn = 4;
    final String[] racks = {
        "/d1/r1", "/d1/r2",
        "/d2/r1", "/d2/r2"};

    try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf)
            .numDataNodes(numDn).racks(racks).build()) {
      miniCluster.waitActive();
      assertEquals(numDn, miniCluster.getDataNodes().size());

      DatanodeManager dm = miniCluster.getNameNode().getNamesystem().
          getBlockManager().getDatanodeManager();
      DatanodeDescriptor maintenanceNode = dm.getDatanode(
          miniCluster.getDataNodes().get(1).getDatanodeId());
      maintenanceNode.setInMaintenance();
      DatanodeDescriptor demissionNode = dm.getDatanode(
          miniCluster.getDataNodes().get(2).getDatanodeId());
      demissionNode.setDecommissioned();

      final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);

      resetStream();
      final int ret = ToolRunner.run(dfsAdmin, new String[] {"-printTopology"});

      /* collect outputs */
      final List<String> outs = Lists.newArrayList();
      scanIntoList(out, outs);

      /* verify results */
      assertEquals(0, ret);
      assertTrue(outs.get(1).contains(DatanodeInfo.AdminStates.NORMAL.toString()));
      assertTrue(outs.get(4).contains(DatanodeInfo.AdminStates.IN_MAINTENANCE.toString()));
      assertTrue(outs.get(7).contains(DatanodeInfo.AdminStates.DECOMMISSIONED.toString()));
      assertTrue(outs.get(10).contains(DatanodeInfo.AdminStates.NORMAL.toString()));
    }
  }

  @Test
  @Timeout(value = 30)
  public void testNameNodeGetReconfigurationStatus() throws IOException,
      InterruptedException, TimeoutException {
    ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
    namenode.setReconfigurationUtil(ru);
    final String address = namenode.getHostAndPort();

    List<ReconfigurationUtil.PropertyChange> changes =
        new ArrayList<>();
    changes.add(new ReconfigurationUtil.PropertyChange(
        DFS_HEARTBEAT_INTERVAL_KEY, String.valueOf(6),
        namenode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY)));
    changes.add(new ReconfigurationUtil.PropertyChange(
        "randomKey", "new123", "old456"));
    when(ru.parseChangedProperties(any(Configuration.class),
        any(Configuration.class))).thenReturn(changes);
    assertThat(admin.startReconfiguration("namenode", address)).isEqualTo(0);

    final List<String> outs = Lists.newArrayList();
    final List<String> errs = Lists.newArrayList();
    awaitReconfigurationFinished("namenode", address, outs, errs);

    // verify change
    assertEquals(6, namenode
        .getConf()
        .getLong(DFS_HEARTBEAT_INTERVAL_KEY,
            DFS_HEARTBEAT_INTERVAL_DEFAULT), DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value");
    assertEquals(6, namenode
        .getNamesystem()
        .getBlockManager()
        .getDatanodeManager()
        .getHeartbeatInterval(), DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value");

    int offset = 1;
    assertThat(outs.get(offset)).contains("SUCCESS: Changed property "
        + DFS_HEARTBEAT_INTERVAL_KEY);
    assertThat(outs.get(offset + 1)).contains("From:").contains("3");
    assertThat(outs.get(offset + 2)).contains("To:").contains("6");
  }

  private static String scanIntoString(final ByteArrayOutputStream baos) {
    final TextStringBuilder sb = new TextStringBuilder();
    final Scanner scanner = new Scanner(baos.toString());
    while (scanner.hasNextLine()) {
      sb.appendln(scanner.nextLine());
    }
    scanner.close();
    return sb.toString();
  }

  // get block details and check if the block is corrupt
  private void waitForCorruptBlock(MiniDFSCluster miniCluster,
      DFSClient client, Path file)
      throws TimeoutException, InterruptedException {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        LocatedBlocks blocks = null;
        try {
          miniCluster.triggerBlockReports();
          blocks = client.getNamenode().getBlockLocations(file.toString(), 0,
              Long.MAX_VALUE);
        } catch (IOException e) {
          return false;
        }
        return blocks != null && blocks.get(0).isCorrupt();
      }
    }, 1000, 60000);
  }

  @Test
  @Timeout(value = 180)
  public void testReportCommand() throws Exception {
    tearDown();
    redirectStream();

    // init conf
    final Configuration dfsConf = new HdfsConfiguration();
    ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
        SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
    dfsConf.setInt(
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
    dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
    final Path baseDir = new Path(
        PathUtils.getTestDir(getClass()).getAbsolutePath(),
        GenericTestUtils.getMethodName());
    dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
    final int numDn =
        ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();

    try(MiniDFSCluster miniCluster = new MiniDFSCluster
        .Builder(dfsConf)
        .numDataNodes(numDn).build()) {

      miniCluster.waitActive();
      assertEquals(numDn, miniCluster.getDataNodes().size());

      final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
      final DFSClient client = miniCluster.getFileSystem().getClient();

      // Verify report command for all counts to be zero
      resetStream();
      assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
      verifyNodesAndCorruptBlocks(numDn, numDn, 0, 0, client, 0L, 0L);

      final short replFactor = 1;
      final long fileLength = 512L;
      final DistributedFileSystem fs = miniCluster.getFileSystem();
      final Path file = new Path(baseDir, "/corrupted");
      fs.enableErasureCodingPolicy(ecPolicy.getName());
      DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
      DFSTestUtil.waitReplication(fs, file, replFactor);
      final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
      LocatedBlocks lbs = miniCluster.getFileSystem().getClient().
          getNamenode().getBlockLocations(
          file.toString(), 0, fileLength);
      assertTrue(lbs.get(0) instanceof LocatedBlock, "Unexpected block type: " + lbs.get(0));
      LocatedBlock locatedBlock = lbs.get(0);
      DatanodeInfo locatedDataNode = locatedBlock.getLocations()[0];
      LOG.info("Replica block located on: " + locatedDataNode);

      Path ecDir = new Path(baseDir, "ec");
      fs.mkdirs(ecDir);
      fs.getClient().setErasureCodingPolicy(ecDir.toString(),
          ecPolicy.getName());
      Path ecFile = new Path(ecDir, "ec-file");
      int stripesPerBlock = 2;
      int cellSize = ecPolicy.getCellSize();
      int blockSize = stripesPerBlock * cellSize;
      int blockGroupSize =  ecPolicy.getNumDataUnits() * blockSize;
      int totalBlockGroups = 1;
      DFSTestUtil.createStripedFile(miniCluster, ecFile, ecDir,
          totalBlockGroups, stripesPerBlock, false, ecPolicy);

      // Verify report command for all counts to be zero
      resetStream();
      assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
      verifyNodesAndCorruptBlocks(numDn, numDn, 0, 0, client, 0L, 0L);
      // Choose a DataNode to shutdown
      final List<DataNode> datanodes = miniCluster.getDataNodes();
      DataNode dataNodeToShutdown = null;
      for (DataNode dn : datanodes) {
        if (!dn.getDatanodeId().getDatanodeUuid().equals(
            locatedDataNode.getDatanodeUuid())) {
          dataNodeToShutdown = dn;
          break;
        }
      }
      assertTrue(dataNodeToShutdown != null, "Unable to choose a DataNode to shutdown!");
      // Shut down the DataNode not hosting the replicated block
      LOG.info("Shutting down: " + dataNodeToShutdown);
      dataNodeToShutdown.shutdown();
      miniCluster.setDataNodeDead(dataNodeToShutdown.getDatanodeId());

      // Verify report command to show dead DataNode
      assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
      verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, 0, client, 0L, 1L);

      // Corrupt the replicated block
      final int blockFilesCorrupted = miniCluster
          .corruptBlockOnDataNodes(block);
      assertEquals(replFactor, blockFilesCorrupted,
          "Fail to corrupt all replicas for block " + block);

      try {
        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
            conf, true);
        fail("Should have failed to read the file with corrupted blocks.");
      } catch (ChecksumException ignored) {
        // expected exception reading corrupt blocks
      }

      // Increase replication factor, this should invoke transfer request.
      // Receiving datanode fails on checksum and reports it to namenode
      fs.setReplication(file, (short) (replFactor + 1));

      // get block details and check if the block is corrupt
      BlockManagerTestUtil.updateState(
          miniCluster.getNameNode().getNamesystem().getBlockManager());
      waitForCorruptBlock(miniCluster, client, file);

      // verify report command for corrupt replicated block
      resetStream();
      assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
      verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, 0, client, 0L, 1L);

      lbs = miniCluster.getFileSystem().getClient().
          getNamenode().getBlockLocations(
          ecFile.toString(), 0, blockGroupSize);
      assertTrue(lbs.get(0) instanceof LocatedStripedBlock, "Unexpected block type: " + lbs.get(0));
      LocatedStripedBlock bg =
          (LocatedStripedBlock)(lbs.get(0));

      miniCluster.getNamesystem().writeLock(RwLockMode.BM);
      try {
        BlockManager bm = miniCluster.getNamesystem().getBlockManager();
        bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0],
            "STORAGE_ID", "TEST");
        BlockManagerTestUtil.updateState(bm);
      } finally {
        miniCluster.getNamesystem().writeUnlock(RwLockMode.BM, "testReportCommand");
      }
      waitForCorruptBlock(miniCluster, client, file);

      // verify report command for corrupt replicated block
      // and EC block group
      resetStream();
      assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"}));
      verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, 1, client, 0L, 0L);

      // verify report command for list all DN types
      resetStream();
      String[] reportWithArg = new String[DFSAdmin.DFS_REPORT_ARGS.length + 1];
      reportWithArg[0] = "-report";
      System.arraycopy(DFSAdmin.DFS_REPORT_ARGS, 0, reportWithArg, 1,
          DFSAdmin.DFS_REPORT_ARGS.length);
      assertEquals(0, ToolRunner.run(dfsAdmin, reportWithArg));
    }
  }

  @Test
  @Timeout(300)
  public void testListOpenFiles() throws Exception {
    redirectStream();

    final Configuration dfsConf = new HdfsConfiguration();
    dfsConf.setInt(
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
    dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
    dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5);
    final Path baseDir = new Path(
        PathUtils.getTestDir(getClass()).getAbsolutePath(),
        GenericTestUtils.getMethodName());
    dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());

    final int numDataNodes = 3;
    final int numClosedFiles = 25;
    final int numOpenFiles = 15;

    try(MiniDFSCluster miniCluster = new MiniDFSCluster
        .Builder(dfsConf)
        .numDataNodes(numDataNodes).build()) {
      final short replFactor = 1;
      final long fileLength = 512L;
      final FileSystem fs = miniCluster.getFileSystem();
      final Path parentDir = new Path("/tmp/files/");

      fs.mkdirs(parentDir);
      HashSet<Path> closedFileSet = new HashSet<>();
      for (int i = 0; i < numClosedFiles; i++) {
        Path file = new Path(parentDir, "closed-file-" + i);
        DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
        closedFileSet.add(file);
      }

      HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
      for (int i = 0; i < numOpenFiles; i++) {
        Path file = new Path(parentDir, "open-file-" + i);
        DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
        FSDataOutputStream outputStream = fs.append(file);
        openFilesMap.put(file, outputStream);
      }

      final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
      assertEquals(0, ToolRunner.run(dfsAdmin,
          new String[]{"-listOpenFiles"}));
      verifyOpenFilesListing(closedFileSet, openFilesMap);

      for (int count = 0; count < numOpenFiles; count++) {
        closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1));
        resetStream();
        assertEquals(0, ToolRunner.run(dfsAdmin,
            new String[]{"-listOpenFiles"}));
        verifyOpenFilesListing(closedFileSet, openFilesMap);
      }

      // test -listOpenFiles command with option <path>
      openFilesMap.clear();
      Path file;
      HashMap<Path, FSDataOutputStream> openFiles1 = new HashMap<>();
      HashMap<Path, FSDataOutputStream> openFiles2 = new HashMap<>();
      for (int i = 0; i < numOpenFiles; i++) {
        if (i % 2 == 0) {
          file = new Path(new Path("/tmp/files/a"), "open-file-" + i);
        } else {
          file = new Path(new Path("/tmp/files/b"), "open-file-" + i);
        }

        DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
        FSDataOutputStream outputStream = fs.append(file);

        if (i % 2 == 0) {
          openFiles1.put(file, outputStream);
        } else {
          openFiles2.put(file, outputStream);
        }
        openFilesMap.put(file, outputStream);
      }

      resetStream();
      // list all open files
      assertEquals(0,
          ToolRunner.run(dfsAdmin, new String[] {"-listOpenFiles"}));
      verifyOpenFilesListing(null, openFilesMap);

      resetStream();
      // list open files under directory path /tmp/files/a
      assertEquals(0, ToolRunner.run(dfsAdmin,
          new String[] {"-listOpenFiles", "-path", "/tmp/files/a"}));
      verifyOpenFilesListing(null, openFiles1);

      resetStream();
      // list open files without input path
      assertEquals(-1, ToolRunner.run(dfsAdmin,
          new String[] {"-listOpenFiles", "-path"}));
      // verify the error
      String outStr = scanIntoString(err);
      assertTrue(outStr.contains("listOpenFiles: option"
          + " -path requires 1 argument"));

      resetStream();
      // list open files with empty path
      assertEquals(0, ToolRunner.run(dfsAdmin,
          new String[] {"-listOpenFiles", "-path", ""}));
      // all the open files will be listed
      verifyOpenFilesListing(null, openFilesMap);

      resetStream();
      // list invalid path file
      assertEquals(0, ToolRunner.run(dfsAdmin,
          new String[] {"-listOpenFiles", "-path", "/invalid_path"}));
      outStr = scanIntoString(out);
      for (Path openFilePath : openFilesMap.keySet()) {
        assertThat(outStr).doesNotContain(openFilePath.toString());
      }
      DFSTestUtil.closeOpenFiles(openFilesMap, openFilesMap.size());
    }
  }

  private void verifyOpenFilesListing(HashSet<Path> closedFileSet,
      HashMap<Path, FSDataOutputStream> openFilesMap) {
    final String outStr = scanIntoString(out);
    LOG.info("dfsadmin -listOpenFiles output: \n" + out);
    if (closedFileSet != null) {
      for (Path closedFilePath : closedFileSet) {
        assertThat(outStr).doesNotContain(closedFilePath.toString() +
            System.lineSeparator());
      }
    }

    for (Path openFilePath : openFilesMap.keySet()) {
      assertThat(outStr).contains(openFilePath.toString() +
          System.lineSeparator());
    }
  }

  private void verifyNodesAndCorruptBlocks(
      final int numDn,
      final int numLiveDn,
      final int numCorruptBlocks,
      final int numCorruptECBlockGroups,
      final DFSClient client,
      final Long highestPriorityLowRedundancyReplicatedBlocks,
      final Long highestPriorityLowRedundancyECBlocks)
      throws IOException {

    /* init vars */
    final String outStr = scanIntoString(out);
    final String expectedLiveNodesStr = String.format(
        "Live datanodes (%d)",
        numLiveDn);
    final String expectedCorruptedBlocksStr = String.format(
        "Blocks with corrupt replicas: %d",
        numCorruptBlocks);
    final String expectedCorruptedECBlockGroupsStr = String.format(
        "Block groups with corrupt internal blocks: %d",
        numCorruptECBlockGroups);
    final String highestPriorityLowRedundancyReplicatedBlocksStr
        = String.format(
        "\tLow redundancy blocks with highest priority " +
            "to recover: %d",
        highestPriorityLowRedundancyReplicatedBlocks);
    final String highestPriorityLowRedundancyECBlocksStr = String.format(
        "\tLow redundancy blocks with highest priority " +
            "to recover: %d",
        highestPriorityLowRedundancyReplicatedBlocks);

    // verify nodes and corrupt blocks
    assertThat(outStr).
        contains(expectedLiveNodesStr).
        contains(expectedCorruptedBlocksStr).
        contains(expectedCorruptedECBlockGroupsStr).
        contains(highestPriorityLowRedundancyReplicatedBlocksStr).
        contains(highestPriorityLowRedundancyECBlocksStr);

    assertEquals(
        numDn,
        client.getDatanodeStorageReport(DatanodeReportType.ALL).length);
    assertEquals(
        numLiveDn,
        client.getDatanodeStorageReport(DatanodeReportType.LIVE).length);
    assertEquals(
        numDn - numLiveDn,
        client.getDatanodeStorageReport(DatanodeReportType.DEAD).length);
    assertEquals(numCorruptBlocks + numCorruptECBlockGroups,
        client.getCorruptBlocksCount());
    assertEquals(numCorruptBlocks, client.getNamenode()
        .getReplicatedBlockStats().getCorruptBlocks());
    assertEquals(highestPriorityLowRedundancyReplicatedBlocks, client.getNamenode()
        .getReplicatedBlockStats().getHighestPriorityLowRedundancyBlocks());
    assertEquals(numCorruptECBlockGroups, client.getNamenode()
        .getECBlockGroupStats().getCorruptBlockGroups());
    assertEquals(highestPriorityLowRedundancyECBlocks, client.getNamenode()
        .getECBlockGroupStats().getHighestPriorityLowRedundancyBlocks());
  }

  @Test
  public void testAllowSnapshotWhenTrashExists() throws Exception {
    final Path dirPath = new Path("/ssdir3");
    final Path trashRoot = new Path(dirPath, ".Trash");
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);

    // Case 1: trash directory exists and permission matches
    dfs.mkdirs(trashRoot);
    dfs.setPermission(trashRoot, TRASH_PERMISSION);
    // allowSnapshot should still succeed even when trash exists
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));
    // Clean up. disallowSnapshot should remove the empty trash
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-disallowSnapshot", dirPath.toString()}));
    assertFalse(dfs.exists(trashRoot));

    // Case 2: trash directory exists and but permission doesn't match
    dfs.mkdirs(trashRoot);
    dfs.setPermission(trashRoot, new FsPermission((short)0755));
    // allowSnapshot should fail here
    assertEquals(-1, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));
    // Correct trash permission and retry
    dfs.setPermission(trashRoot, TRASH_PERMISSION);
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));
    // Clean up
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-disallowSnapshot", dirPath.toString()}));
    assertFalse(dfs.exists(trashRoot));

    // Case 3: trash directory path is taken by a file
    dfs.create(trashRoot).close();
    // allowSnapshot should fail here
    assertEquals(-1, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));
    // Remove the file and retry
    dfs.delete(trashRoot, false);
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));
    // Clean up
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-disallowSnapshot", dirPath.toString()}));
    assertFalse(dfs.exists(trashRoot));

    // Cleanup
    dfs.delete(dirPath, true);
  }

  @Test
  public void testAllowDisallowSnapshot() throws Exception {
    final Path dirPath = new Path("/ssdir1");
    final Path trashRoot = new Path(dirPath, ".Trash");
    final DistributedFileSystem dfs = cluster.getFileSystem();
    final DFSAdmin dfsAdmin = new DFSAdmin(conf);

    dfs.mkdirs(dirPath);
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-allowSnapshot", dirPath.toString()}));

    // Verify .Trash creation after -allowSnapshot command
    assertTrue(dfs.exists(trashRoot));
    assertEquals(TRASH_PERMISSION,
        dfs.getFileStatus(trashRoot).getPermission());

    // Move a file to trash
    final Path file1 = new Path(dirPath, "file1");
    try (FSDataOutputStream s = dfs.create(file1)) {
      s.write(0);
    }
    FsShell fsShell = new FsShell(dfs.getConf());
    assertEquals(0, ToolRunner.run(fsShell,
        new String[]{"-rm", file1.toString()}));

    // User directory inside snapshottable directory trash should have 700
    final String username =
        UserGroupInformation.getLoginUser().getShortUserName();
    final Path trashRootUserSubdir = new Path(trashRoot, username);
    assertTrue(dfs.exists(trashRootUserSubdir));
    final FsPermission trashUserdirPermission = new FsPermission(
        FsAction.ALL, FsAction.NONE, FsAction.NONE, false);
    assertEquals(trashUserdirPermission,
        dfs.getFileStatus(trashRootUserSubdir).getPermission());

    // disallowSnapshot should fail when .Trash is not empty
    assertNotEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-disallowSnapshot", dirPath.toString()}));

    dfs.delete(trashRootUserSubdir, true);
    // disallowSnapshot should succeed now that we have an empty .Trash
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-disallowSnapshot", dirPath.toString()}));

    // Cleanup
    dfs.delete(dirPath, true);
  }

  @Test
  public void testSetBalancerBandwidth() throws Exception {
    redirectStream();

    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
    String outStr;

    // Test basic case: 10000
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-setBalancerBandwidth", "10000"}));
    outStr = scanIntoString(out);
    assertTrue(outStr.contains("Balancer " +
        "bandwidth is set to 10000"), "Did not set bandwidth!");

    // Test parsing with units
    resetStream();
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-setBalancerBandwidth", "10m"}));
    outStr = scanIntoString(out);
    assertTrue(outStr.contains("Balancer " +
        "bandwidth is set to 10485760"), "Did not set bandwidth!");

    resetStream();
    assertEquals(0, ToolRunner.run(dfsAdmin,
        new String[]{"-setBalancerBandwidth", "10k"}));
    outStr = scanIntoString(out);
    assertTrue(outStr.contains("Balancer " +
        "bandwidth is set to 10240"), "Did not set bandwidth!");

    // Test negative numbers
    assertEquals(-1, ToolRunner.run(dfsAdmin,
        new String[]{"-setBalancerBandwidth", "-10000"}));
    assertEquals(-1, ToolRunner.run(dfsAdmin,
        new String[]{"-setBalancerBandwidth", "-10m"}));
  }

  @Test
  @Timeout(300)
  public void testCheckNumOfBlocksInReportCommand() throws Exception {
    DistributedFileSystem dfs = cluster.getFileSystem();
    Path path = new Path("/tmp.txt");

    DatanodeInfo[] dn = dfs.getDataNodeStats();
    assertEquals(dn.length, NUM_DATANODES);
    // Block count should be 0, as no files are created
    int actualBlockCount = 0;
    for (DatanodeInfo d : dn) {
      actualBlockCount += d.getNumBlocks();
    }
    assertEquals(0, actualBlockCount);

    // Create a file with 2 blocks
    DFSTestUtil.createFile(dfs, path, 1024, (short) 1, 0);
    int expectedBlockCount = 2;

    // Wait for One Heartbeat
    Thread.sleep(3 * 1000);

    dn = dfs.getDataNodeStats();
    assertEquals(dn.length, NUM_DATANODES);

    // Block count should be 2, as file is created with block count 2
    actualBlockCount = 0;
    for (DatanodeInfo d : dn) {
      actualBlockCount += d.getNumBlocks();
    }
    assertEquals(expectedBlockCount, actualBlockCount);
  }

  @Test
  public void testRefreshProxyUser() throws Exception {
    Path dirPath = new Path("/testdir1");
    Path subDirPath = new Path("/testdir1/subdir1");
    UserGroupInformation loginUserUgi =  UserGroupInformation.getLoginUser();
    String proxyUser = "fakeuser";
    String realUser = loginUserUgi.getShortUserName();

    UserGroupInformation proxyUgi =
        UserGroupInformation.createProxyUserForTesting(proxyUser,
            loginUserUgi, loginUserUgi.getGroupNames());

    // create a directory as login user and re-assign it to proxy user
    loginUserUgi.doAs(new PrivilegedExceptionAction<Integer>() {
      @Override
      public Integer run() throws Exception {
        cluster.getFileSystem().mkdirs(dirPath);
        cluster.getFileSystem().setOwner(dirPath, proxyUser,
            proxyUgi.getPrimaryGroupName());
        return 0;
      }
    });

    // try creating subdirectory inside the directory as proxy user,
    // This should fail because of the current user hasn't still been proxied
    try {
      proxyUgi.doAs(new PrivilegedExceptionAction<Integer>() {
        @Override public Integer run() throws Exception {
          cluster.getFileSystem().mkdirs(subDirPath);
          return 0;
        }
      });
    } catch (RemoteException re) {
      assertTrue(re.unwrapRemoteException()
          instanceof AccessControlException);
      assertTrue(re.unwrapRemoteException().getMessage()
          .equals("User: " + realUser +
              " is not allowed to impersonate " + proxyUser));
    }

    // refresh will look at configuration on the server side
    // add additional resource with the new value
    // so the server side will pick it up
    String userKeyGroups = DefaultImpersonationProvider.getTestProvider().
        getProxySuperuserGroupConfKey(realUser);
    String userKeyHosts = DefaultImpersonationProvider.getTestProvider().
        getProxySuperuserIpConfKey(realUser);
    String rsrc = "testGroupMappingRefresh_rsrc.xml";
    tempResource = TestRefreshUserMappings.addNewConfigResource(rsrc,
        userKeyGroups, "*", userKeyHosts, "*");

    String[] args = new String[]{"-refreshSuperUserGroupsConfiguration"};
    admin.run(args);

    // After proxying the fakeuser, the mkdir should work
    proxyUgi.doAs(new PrivilegedExceptionAction<Integer>() {
      @Override
      public Integer run() throws Exception {
        cluster.getFileSystem().mkdirs(dirPath);
        return 0;
      }
    });
  }

  @Test
  @Order(1)
  public void testAllDatanodesReconfig()
      throws IOException, InterruptedException, TimeoutException {
    ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
    cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil);
    cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil);

    List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
    changes.add(new ReconfigurationUtil.PropertyChange(
        DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true",
        datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY)));
    when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
        any(Configuration.class))).thenReturn(changes);

    int result = admin.startReconfiguration("datanode", "livenodes");
    assertThat(result).isEqualTo(0);
    final List<String> outsForStartReconf = new ArrayList<>();
    final List<String> errsForStartReconf = new ArrayList<>();
    reconfigurationOutErrFormatter("startReconfiguration", "datanode",
        "livenodes", outsForStartReconf, errsForStartReconf);
    String started = "Started reconfiguration task on node";
    String starting =
        "Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.";
    assertThat(outsForStartReconf).hasSize(3);
    assertThat(errsForStartReconf).hasSize(0);
    assertThat(outsForStartReconf.get(0)).startsWith(started);
    assertThat(outsForStartReconf.get(1)).startsWith(started);
    assertThat(outsForStartReconf.get(2)).startsWith(starting);

    Thread.sleep(1000);
    final List<String> outs = new ArrayList<>();
    final List<String> errs = new ArrayList<>();
    awaitReconfigurationFinished("datanode", "livenodes", outs, errs);
    assertThat(outs).hasSize(9);
    assertThat(errs).hasSize(0);
    LOG.info("dfsadmin -status -livenodes output:");
    outs.forEach(s -> LOG.info("{}", s));
    assertThat(outs.get(0)).startsWith("Reconfiguring status for node");

    String success = "SUCCESS: Changed property dfs.datanode.peer.stats.enabled";
    String from = "\tFrom: \"false\"";
    String to = "\tTo: \"true\"";
    String retrieval =
        "Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.";

    assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to);
    assertThat(outs.subList(5, 9)).containsSubsequence(success, from, to, retrieval);
  }

  @Test
  public void testDecommissionDataNodesReconfig()
      throws IOException, InterruptedException, TimeoutException {
    redirectStream();
    final Configuration dfsConf = new HdfsConfiguration();
    try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf)
        .numDataNodes(3).build()) {
      ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
      miniCluster.getDataNodes().forEach(node -> node.setReconfigurationUtil(reconfigurationUtil));
      List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
      changes.add(new ReconfigurationUtil.PropertyChange(
          DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, "1000",
          datanode.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)));
      when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
          any(Configuration.class))).thenReturn(changes);

      DFSAdmin dfsAdmin = Mockito.spy(new DFSAdmin(dfsConf));
      DistributedFileSystem dfs = Mockito.spy(miniCluster.getFileSystem());
      DatanodeInfo decommissioningNode1 = dfs.getDataNodeStats()[0];
      DatanodeInfo decommissioningNode2 = dfs.getDataNodeStats()[1];
      DatanodeInfo[] dataNodeStats = new DatanodeInfo[]{decommissioningNode1, decommissioningNode2};
      when(dfsAdmin.getDFS()).thenReturn(dfs);
      when(dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING)).thenReturn(dataNodeStats);

      int ret = dfsAdmin.startReconfiguration("datanode", "decomnodes");

      // collect outputs
      final List<String> outsForStartReconf = Lists.newArrayList();
      final List<String> errsForStartReconf = Lists.newArrayList();
      scanIntoList(out, outsForStartReconf);
      scanIntoList(err, errsForStartReconf);

      // verify startReconfiguration results is as expected
      assertEquals(0, ret);
      String started = "Started reconfiguration task on node";
      String starting =
          "Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.";
      assertThat(outsForStartReconf).hasSize(3);
      assertThat(errsForStartReconf).hasSize(0);
      assertThat(outsForStartReconf.get(0)).startsWith(started);
      assertThat(outsForStartReconf.get(1)).startsWith(started);
      assertThat(outsForStartReconf.get(2)).startsWith(starting);

      // verify getReconfigurationStatus results is as expected
      Thread.sleep(1000);
      resetStream();
      final List<String> outsForFinishReconf = Lists.newArrayList();
      final List<String> errsForFinishReconf = Lists.newArrayList();
      waitForReconfigurationDecommissionNode("datanode", "decomnodes",
          dfsAdmin, outsForFinishReconf, errsForFinishReconf);
      String success = "SUCCESS: Changed property " +
          DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
      String from = "\tFrom: \"0\"";
      String to = "\tTo: \"1000\"";
      String retrieval =
          "Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.";

      assertThat(outsForFinishReconf.subList(1, 5)).
          containsSubsequence(success, from, to);
      assertThat(outsForFinishReconf.subList(5, 9)).
          containsSubsequence(success, from, to, retrieval);

      // verify refreshed decommissioningNode is as expected
      String node1Addr = decommissioningNode1.getIpAddr() + ":" +
          decommissioningNode1.getIpcPort();
      String node2Addr = decommissioningNode2.getIpAddr() + ":" +
          decommissioningNode2.getIpcPort();
      int finishedReconfCount = 0;
      for (String outMessage : outsForFinishReconf) {
        finishedReconfCount = outMessage.contains(node1Addr) ?
            finishedReconfCount + 1 : finishedReconfCount + 0;
        finishedReconfCount = outMessage.contains(node2Addr) ?
            finishedReconfCount + 1 : finishedReconfCount + 0;
      }
      assertTrue(finishedReconfCount == 2);
    }
  }

  private void waitForReconfigurationDecommissionNode(final String nodeType, final String address,
      DFSAdmin dfsAdmin, List<String> outs, List<String> errs)
      throws TimeoutException, InterruptedException {
    PrintStream outStream = new PrintStream(out);
    PrintStream errStream = new PrintStream(err);
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        LocatedBlocks blocks = null;
        try {
          dfsAdmin.getReconfigurationStatusUtil("datanode", "decomnodes",
              outStream, errStream);
        } catch (IOException | InterruptedException e) {
          LOG.error(String.format(
              "call getReconfigurationStatus on %s[%s] failed.", nodeType,
              address), e);
        }
        scanIntoList(out, outs);
        scanIntoList(err, errs);
        return !outs.isEmpty() && outs.get(0).contains("finished");
      }
    }, 100, 100 * 100);
  }
}