TestDiskBalancerCommand.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.hdfs.server.diskbalancer.command;


import static java.lang.Thread.sleep;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.NODE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.OUTFILE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.SKIPDATECHECK;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Scanner;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/**
 * Tests various CLI commands of DiskBalancer.
 */
public class TestDiskBalancerCommand {

  @Rule
  public ExpectedException thrown = ExpectedException.none();
  private MiniDFSCluster cluster;
  private URI clusterJson;
  private Configuration conf = new HdfsConfiguration();

  private final static int DEFAULT_BLOCK_SIZE = 1024;
  private final static int FILE_LEN = 200 * 1024;
  private final static long CAPCACITY = 300 * 1024;
  private final static long[] CAPACITIES = new long[] {CAPCACITY, CAPCACITY};

  @Before
  public void setUp() throws Exception {
    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
        .storagesPerDatanode(2).build();
    cluster.waitActive();

    clusterJson = getClass().getResource(
        "/diskBalancer/data-cluster-64node-3disk.json").toURI();
  }

  @After
  public void tearDown() throws Exception {
    if (cluster != null) {
      // Just make sure we can shutdown datanodes.
      for (int i = 0; i < cluster.getDataNodes().size(); i++) {
        cluster.getDataNodes().get(i).shutdown();
      }
      cluster.shutdown();
    }
  }

  /**
   * Tests if it's allowed to submit and execute plan when Datanode is in status
   * other than REGULAR.
   */
  @Test(timeout = 60000)
  public void testSubmitPlanInNonRegularStatus() throws Exception {
    final int numDatanodes = 1;
    MiniDFSCluster miniCluster = null;
    final Configuration hdfsConf = new HdfsConfiguration();

    try {
      /* new cluster with imbalanced capacity */
      miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
          hdfsConf,
          numDatanodes,
          CAPACITIES,
          DEFAULT_BLOCK_SIZE,
          FILE_LEN,
          StartupOption.ROLLBACK);

      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      try {
        /* run execute command */
        final String cmdLine = String.format(
            "hdfs diskbalancer -%s %s",
            EXECUTE,
            planFileFullName);
        runCommand(cmdLine, hdfsConf, miniCluster);
      } catch(RemoteException e) {
        assertThat(e.getClassName(), containsString("DiskBalancerException"));
        assertThat(e.toString(),
            is(allOf(
                containsString("Datanode is in special state"),
                containsString("Disk balancing not permitted."))));
      }
    } finally {
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }

  /**
   * Tests running multiple commands under on setup. This mainly covers
   * {@link org.apache.hadoop.hdfs.server.diskbalancer.command.Command#close}
   */
  @Test(timeout = 120000)
  public void testRunMultipleCommandsUnderOneSetup() throws Exception {

    final int numDatanodes = 1;
    MiniDFSCluster miniCluster = null;
    final Configuration hdfsConf = new HdfsConfiguration();

    try {
      /* new cluster with imbalanced capacity */
      miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
          hdfsConf,
          numDatanodes,
          CAPACITIES,
          DEFAULT_BLOCK_SIZE,
          FILE_LEN);

      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      /* run execute command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s",
          EXECUTE,
          planFileFullName);
      runCommand(cmdLine, hdfsConf, miniCluster);
    } finally {
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }



  @Test(timeout = 600000)
  public void testDiskBalancerExecuteOptionPlanValidityWithException() throws
      Exception {
    final int numDatanodes = 1;

    final Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");

    /* new cluster with imbalanced capacity */
    final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
        newImbalancedCluster(
        hdfsConf,
        numDatanodes,
        CAPACITIES,
        DEFAULT_BLOCK_SIZE,
        FILE_LEN);

    try {
      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      /* run execute command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s",
          EXECUTE,
          planFileFullName);

      LambdaTestUtils.intercept(
          RemoteException.class,
          "DiskBalancerException",
          "Plan was generated more than 0d ago",
          () -> {
            runCommand(cmdLine, hdfsConf, miniCluster);
          });
    }  finally{
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }

  @Test(timeout = 600000)
  public void testDiskBalancerExecutePlanValidityWithOutUnitException()
      throws
      Exception {
    final int numDatanodes = 1;

    final Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0");

    /* new cluster with imbalanced capacity */
    final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
        newImbalancedCluster(
            hdfsConf,
            numDatanodes,
            CAPACITIES,
            DEFAULT_BLOCK_SIZE,
            FILE_LEN);

    try {
      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      /* run execute command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s",
          EXECUTE,
          planFileFullName);

      LambdaTestUtils.intercept(
          RemoteException.class,
          "DiskBalancerException",
          "Plan was generated more than 0ms ago",
          () -> {
            runCommand(cmdLine, hdfsConf, miniCluster);
          });
    }  finally{
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }

  @Test(timeout = 600000)
  public void testDiskBalancerForceExecute() throws
      Exception {
    final int numDatanodes = 1;

    final Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "0d");

    /* new cluster with imbalanced capacity */
    final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
        newImbalancedCluster(
            hdfsConf,
            numDatanodes,
            CAPACITIES,
            DEFAULT_BLOCK_SIZE,
            FILE_LEN);

    try {
      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      /* run execute command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s -%s",
          EXECUTE,
          planFileFullName,
          SKIPDATECHECK);

      // Disk Balancer should execute the plan, as skipDateCheck Option is
      // specified
      runCommand(cmdLine, hdfsConf, miniCluster);
    }  finally{
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }


  @Test(timeout = 600000)
  public void testDiskBalancerExecuteOptionPlanValidity() throws Exception {
    final int numDatanodes = 1;

    final Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    hdfsConf.set(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "600s");

    /* new cluster with imbalanced capacity */
    final MiniDFSCluster miniCluster = DiskBalancerTestUtil.
        newImbalancedCluster(
            hdfsConf,
            numDatanodes,
            CAPACITIES,
            DEFAULT_BLOCK_SIZE,
            FILE_LEN);

    try {
      /* get full path of plan */
      final String planFileFullName = runAndVerifyPlan(miniCluster, hdfsConf);

      /* run execute command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s",
          EXECUTE,
          planFileFullName);

      // Plan is valid for 600 seconds, sleeping for 10seconds, so now
      // diskbalancer should execute the plan
      sleep(10000);
      runCommand(cmdLine, hdfsConf, miniCluster);
    }  finally{
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }

  private String runAndVerifyPlan(
      final MiniDFSCluster miniCluster,
      final Configuration hdfsConf) throws Exception {
    String cmdLine = "";
    List<String> outputs = null;
    final DataNode dn = miniCluster.getDataNodes().get(0);

    /* run plan command */
    cmdLine = String.format(
        "hdfs diskbalancer -%s %s",
        PLAN,
        dn.getDatanodeUuid());
    outputs = runCommand(cmdLine, hdfsConf, miniCluster);

    /* get path of plan file*/
    final String planFileName = dn.getDatanodeUuid();

    /* verify plan command */
    assertEquals(
        "There must be two lines: the 1st is writing plan to...,"
            + " the 2nd is actual full path of plan file.",
        2, outputs.size());
    assertThat(outputs.get(1), containsString(planFileName));

    /* get full path of plan file*/
    final String planFileFullName = outputs.get(1);
    return planFileFullName;
  }

  /* test exception on invalid arguments */
  @Test(timeout = 60000)
  public void testExceptionOnInvalidArguments() throws Exception {
    final String cmdLine = "hdfs diskbalancer random1 -report random2 random3";
    thrown.expect(HadoopIllegalArgumentException.class);
    thrown.expectMessage(
        "Invalid or extra Arguments: [random1, random2, random3]");
    runCommand(cmdLine);
  }

  /* test basic report */
  @Test(timeout = 60000)
  public void testReportSimple() throws Exception {
    final String cmdLine = "hdfs diskbalancer -report";
    final List<String> outputs = runCommand(cmdLine);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(containsString("No top limit specified"),
            containsString("using default top value"), containsString("100"))));
    assertThat(
        outputs.get(2),
        is(allOf(
            containsString("Reporting top"),
            containsString("64"),
            containsString(
                "DataNode(s) benefiting from running DiskBalancer"))));
    assertThat(
        outputs.get(32),
        is(allOf(containsString("30/64 null[null:0]"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
            containsString("9 volumes with node data density 1.97"))));

  }

  /* test basic report with negative top limit */
  @Test(timeout = 60000)
  public void testReportWithNegativeTopLimit()
      throws Exception {
    final String cmdLine = "hdfs diskbalancer -report -top -32";
    thrown.expect(java.lang.IllegalArgumentException.class);
    thrown.expectMessage("Top limit input should be a positive numeric value");
    runCommand(cmdLine);
  }
  /* test less than 64 DataNode(s) as total, e.g., -report -top 32 */
  @Test(timeout = 60000)
  public void testReportLessThanTotal() throws Exception {
    final String cmdLine = "hdfs diskbalancer -report -top 32";
    final List<String> outputs = runCommand(cmdLine);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(
            containsString("Reporting top"),
            containsString("32"),
            containsString(
                "DataNode(s) benefiting from running DiskBalancer"))));
    assertThat(
        outputs.get(31),
        is(allOf(containsString("30/32 null[null:0]"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
            containsString("9 volumes with node data density 1.97"))));
  }

  /**
   * This test simulates DiskBalancerCLI Report command run from a shell
   * with a generic option 'fs'.
   * @throws Exception
   */
  @Test(timeout = 60000)
  public void testReportWithGenericOptionFS() throws Exception {
    final String topReportArg = "5";
    final String reportArgs = String.format("-%s file:%s -%s -%s %s",
        "fs", clusterJson.getPath(),
        REPORT, "top", topReportArg);
    final String cmdLine = String.format("%s", reportArgs);
    final List<String> outputs = runCommand(cmdLine);

    assertThat(outputs.get(0), containsString("Processing report command"));
    assertThat(outputs.get(1),
        is(allOf(containsString("Reporting top"), containsString(topReportArg),
            containsString(
                "DataNode(s) benefiting from running DiskBalancer"))));
  }

  /* test more than 64 DataNode(s) as total, e.g., -report -top 128 */
  @Test(timeout = 60000)
  public void testReportMoreThanTotal() throws Exception {
    final String cmdLine = "hdfs diskbalancer -report -top 128";
    final List<String> outputs = runCommand(cmdLine);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(
            containsString("Reporting top"),
            containsString("64"),
            containsString(
                "DataNode(s) benefiting from running DiskBalancer"))));
    assertThat(
        outputs.get(31),
        is(allOf(containsString("30/64 null[null:0]"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
            containsString("9 volumes with node data density 1.97"))));

  }

  /* test invalid top limit, e.g., -report -top xx */
  @Test(timeout = 60000)
  public void testReportInvalidTopLimit() throws Exception {
    final String cmdLine = "hdfs diskbalancer -report -top xx";
    final List<String> outputs = runCommand(cmdLine);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(containsString("Top limit input is not numeric"),
            containsString("using default top value"), containsString("100"))));
    assertThat(
        outputs.get(2),
        is(allOf(
            containsString("Reporting top"),
            containsString("64"),
            containsString(
                "DataNode(s) benefiting from running DiskBalancer"))));
    assertThat(
        outputs.get(32),
        is(allOf(containsString("30/64 null[null:0]"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
            containsString("9 volumes with node data density 1.97"))));
  }

  @Test(timeout = 60000)
  public void testReportNode() throws Exception {
    final String cmdLine =
        "hdfs diskbalancer -report -node " +
            "a87654a9-54c7-4693-8dd9-c9c7021dc340";
    final List<String> outputs = runCommand(cmdLine);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(containsString("Reporting volume information for DataNode"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"))));
    assertThat(
        outputs.get(2),
        is(allOf(containsString("null[null:0]"),
            containsString("a87654a9-54c7-4693-8dd9-c9c7021dc340"),
            containsString("9 volumes with node data density 1.97"))));
    assertThat(
        outputs.get(3),
        is(allOf(containsString("DISK"),
            containsString("/tmp/disk/KmHefYNURo"),
            containsString("0.20 used: 39160240782/200000000000"),
            containsString("0.80 free: 160839759218/200000000000"))));
    assertThat(
        outputs.get(4),
        is(allOf(containsString("DISK"),
            containsString("/tmp/disk/Mxfcfmb24Y"),
            containsString("0.92 used: 733099315216/800000000000"),
            containsString("0.08 free: 66900684784/800000000000"))));
    assertThat(
        outputs.get(5),
        is(allOf(containsString("DISK"),
            containsString("/tmp/disk/xx3j3ph3zd"),
            containsString("0.72 used: 289544224916/400000000000"),
            containsString("0.28 free: 110455775084/400000000000"))));
    assertThat(
        outputs.get(6),
        is(allOf(containsString("RAM_DISK"),
            containsString("/tmp/disk/BoBlQFxhfw"),
            containsString("0.60 used: 477590453390/800000000000"),
            containsString("0.40 free: 322409546610/800000000000"))));
    assertThat(
        outputs.get(7),
        is(allOf(containsString("RAM_DISK"),
            containsString("/tmp/disk/DtmAygEU6f"),
            containsString("0.34 used: 134602910470/400000000000"),
            containsString("0.66 free: 265397089530/400000000000"))));
    assertThat(
        outputs.get(8),
        is(allOf(containsString("RAM_DISK"),
            containsString("/tmp/disk/MXRyYsCz3U"),
            containsString("0.55 used: 438102096853/800000000000"),
            containsString("0.45 free: 361897903147/800000000000"))));
    assertThat(
        outputs.get(9),
        is(allOf(containsString("SSD"),
            containsString("/tmp/disk/BGe09Y77dI"),
            containsString("0.89 used: 890446265501/1000000000000"),
            containsString("0.11 free: 109553734499/1000000000000"))));
    assertThat(
        outputs.get(10),
        is(allOf(containsString("SSD"),
            containsString("/tmp/disk/JX3H8iHggM"),
            containsString("0.31 used: 2782614512957/9000000000000"),
            containsString("0.69 free: 6217385487043/9000000000000"))));
    assertThat(
        outputs.get(11),
        is(allOf(containsString("SSD"),
            containsString("/tmp/disk/uLOYmVZfWV"),
            containsString("0.75 used: 1509592146007/2000000000000"),
            containsString("0.25 free: 490407853993/2000000000000"))));
  }

  @Test(timeout = 60000)
  public void testReportNodeWithoutJson() throws Exception {
    String dataNodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
    final String planArg = String.format("-%s -%s %s",
        REPORT, NODE, dataNodeUuid);
    final String cmdLine = String
        .format(
            "hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(containsString("Reporting volume information for DataNode"),
            containsString(dataNodeUuid))));
    assertThat(
        outputs.get(2),
        is(allOf(containsString(dataNodeUuid),
            containsString("2 volumes with node data density 0.00"))));
    assertThat(
        outputs.get(3),
        is(allOf(containsString("DISK"),
            containsString(new Path(cluster.getInstanceStorageDir(0, 0)
                .getAbsolutePath()).toString()),
            containsString("0.00"),
            containsString("1.00"))));
    assertThat(
        outputs.get(4),
        is(allOf(containsString("DISK"),
            containsString(new Path(cluster.getInstanceStorageDir(0, 1)
                .getAbsolutePath()).toString()),
            containsString("0.00"),
            containsString("1.00"))));
  }

  @Test(timeout = 60000)
  public void testReadClusterFromJson() throws Exception {
    ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
        conf);
    DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(
        jsonConnector);
    diskBalancerCluster.readClusterInfo();
    assertEquals(64, diskBalancerCluster.getNodes().size());
  }

  /* test -plan  DataNodeID */
  @Test(timeout = 60000)
  public void testPlanNode() throws Exception {
    final String planArg = String.format("-%s %s", PLAN,
        cluster.getDataNodes().get(0).getDatanodeUuid());

    final String cmdLine = String
        .format(
            "hdfs diskbalancer %s", planArg);
    runCommand(cmdLine, cluster);
  }

  /* test -plan  DataNodeID */
  @Test(timeout = 60000)
  public void testPlanJsonNode() throws Exception {
    final String planArg = String.format("-%s %s", PLAN,
        "a87654a9-54c7-4693-8dd9-c9c7021dc340");
    final Path testPath = new Path(
        PathUtils.getTestPath(getClass()),
        GenericTestUtils.getMethodName());
    final String cmdLine = String
        .format(
            "hdfs diskbalancer -out %s %s", testPath, planArg);
    runCommand(cmdLine);
  }

  /* Test that illegal arguments are handled correctly*/
  @Test(timeout = 60000)
  public void testIllegalArgument() throws Exception {
    final String planArg = String.format("-%s %s", PLAN,
        "a87654a9-54c7-4693-8dd9-c9c7021dc340");

    final String cmdLine = String
        .format(
            "hdfs diskbalancer %s -report", planArg);
    // -plan and -report cannot be used together.
    // tests the validate command line arguments function.
    thrown.expect(java.lang.IllegalArgumentException.class);
    runCommand(cmdLine);
  }

  @Test(timeout = 60000)
  public void testCancelCommand() throws Exception {
    final String cancelArg = String.format("-%s %s", CANCEL, "nosuchplan");
    final String nodeArg = String.format("-%s %s", NODE,
        cluster.getDataNodes().get(0).getDatanodeUuid());

    // Port:Host format is expected. So cancel command will throw.
    thrown.expect(java.lang.IllegalArgumentException.class);
    final String cmdLine = String
        .format(
            "hdfs diskbalancer  %s %s", cancelArg, nodeArg);
    runCommand(cmdLine);
  }

  /*
   Makes an invalid query attempt to non-existent Datanode.
   */
  @Test(timeout = 60000)
  public void testQueryCommand() throws Exception {
    final String queryArg = String.format("-%s %s", QUERY,
        cluster.getDataNodes().get(0).getDatanodeUuid());
    thrown.expect(java.net.UnknownHostException.class);
    final String cmdLine = String
        .format(
            "hdfs diskbalancer %s", queryArg);
    runCommand(cmdLine);
  }

  @Test(timeout = 60000)
  public void testHelpCommand() throws Exception {
    final String helpArg = String.format("-%s", HELP);
    final String cmdLine = String
        .format(
            "hdfs diskbalancer %s", helpArg);
    runCommand(cmdLine);
  }

  @Test
  public void testPrintFullPathOfPlan()
      throws Exception {
    String parent = GenericTestUtils.getRandomizedTempPath();

    MiniDFSCluster miniCluster = null;
    try {
      Configuration hdfsConf = new HdfsConfiguration();
      List<String> outputs = null;

      /* new cluster with imbalanced capacity */
      miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
          hdfsConf,
          1,
          CAPACITIES,
          DEFAULT_BLOCK_SIZE,
          FILE_LEN);

      /* run plan command */
      final String cmdLine = String.format(
          "hdfs diskbalancer -%s %s -%s %s",
          PLAN,
          miniCluster.getDataNodes().get(0).getDatanodeUuid(),
          OUTFILE,
          parent);
      outputs = runCommand(cmdLine, hdfsConf, miniCluster);

      /* get full path */
      final String planFileFullName = new Path(
          parent,
          miniCluster.getDataNodes().get(0).getDatanodeUuid()).toString();

      /* verify the path of plan */
      assertEquals(
          "There must be two lines: the 1st is writing plan to,"
              + " the 2nd is actual full path of plan file.",
          2, outputs.size());
      assertThat(outputs.get(0), containsString("Writing plan to"));
      assertThat(outputs.get(1), containsString(planFileFullName));
    } finally {
      if (miniCluster != null) {
        miniCluster.shutdown();
      }
    }
  }

  private List<String> runCommandInternal(
      final String cmdLine,
      final Configuration clusterConf) throws Exception {
    String[] cmds = StringUtils.split(cmdLine, ' ');
    ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
    PrintStream out = new PrintStream(bufOut);

    Tool diskBalancerTool = new DiskBalancerCLI(clusterConf, out);
    ToolRunner.run(clusterConf, diskBalancerTool, cmds);

    Scanner scanner = new Scanner(bufOut.toString());
    List<String> outputs = Lists.newArrayList();
    while (scanner.hasNextLine()) {
      outputs.add(scanner.nextLine());
    }
    return outputs;
  }

  private List<String> runCommandInternal(final String cmdLine)
      throws Exception {
    return runCommandInternal(cmdLine, conf);
  }

  private List<String> runCommand(final String cmdLine) throws Exception {
    FileSystem.setDefaultUri(conf, clusterJson);
    return runCommandInternal(cmdLine);
  }

  private List<String> runCommand(final String cmdLine,
                                  MiniDFSCluster miniCluster) throws Exception {
    FileSystem.setDefaultUri(conf, miniCluster.getURI());
    return runCommandInternal(cmdLine);
  }

  private List<String> runCommand(
      final String cmdLine,
      Configuration clusterConf,
      MiniDFSCluster miniCluster) throws Exception {
    FileSystem.setDefaultUri(clusterConf, miniCluster.getURI());
    return runCommandInternal(cmdLine, clusterConf);
  }

  /**
   * Making sure that we can query the multiple nodes without having done a submit.
   * @throws Exception
   */
  @Test
  public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception {
    Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    final int numDatanodes = 2;
    File basedir = new File(GenericTestUtils.getRandomizedTempPath());
    MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
        .numDataNodes(numDatanodes).build();
    try {
      miniDFSCluster.waitActive();
      DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
      DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
      final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1
          .getIpcPort(), dataNode2.getIpcPort());
      final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
      List<String> outputs = runCommand(cmdLine);
      assertEquals(12,  outputs.size());
      assertTrue("Expected outputs: " + outputs,
          outputs.get(1).contains("localhost:" + dataNode1.getIpcPort()) ||
              outputs.get(6).contains("localhost:" + dataNode1.getIpcPort()));
      assertTrue("Expected outputs: " + outputs,
          outputs.get(1).contains("localhost:" + dataNode2.getIpcPort()) ||
              outputs.get(6).contains("localhost:" + dataNode2.getIpcPort()));
    } finally {
      miniDFSCluster.shutdown();
    }
  }

  /**
   * Making sure that we can query the node without having done a submit.
   * @throws Exception
   */
  @Test
  public void testDiskBalancerQueryWithoutSubmit() throws Exception {
    Configuration hdfsConf = new HdfsConfiguration();
    hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    final int numDatanodes = 2;
    File basedir = new File(GenericTestUtils.getRandomizedTempPath());
    MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
        .numDataNodes(numDatanodes).build();
    try {
      miniDFSCluster.waitActive();
      DataNode dataNode = miniDFSCluster.getDataNodes().get(0);
      final String queryArg = String.format("-query localhost:%d", dataNode
          .getIpcPort());
      final String cmdLine = String.format("hdfs diskbalancer %s",
          queryArg);
      runCommand(cmdLine);
    } finally {
      miniDFSCluster.shutdown();
    }
  }

  @Test(timeout = 60000)
  public void testGetNodeList() throws Exception {
    ClusterConnector jsonConnector =
        ConnectorFactory.getCluster(clusterJson, conf);
    DiskBalancerCluster diskBalancerCluster =
        new DiskBalancerCluster(jsonConnector);
    diskBalancerCluster.readClusterInfo();

    int nodeNum = 5;
    StringBuilder listArg = new StringBuilder();
    for (int i = 0; i < nodeNum; i++) {
      listArg.append(diskBalancerCluster.getNodes().get(i).getDataNodeUUID())
          .append(",");
    }

    ReportCommand command = new ReportCommand(conf, null);
    command.setCluster(diskBalancerCluster);
    List<DiskBalancerDataNode> nodeList = command.getNodes(listArg.toString());
    assertEquals(nodeNum, nodeList.size());
  }

  @Test(timeout = 60000)
  public void testReportCommandWithMultipleNodes() throws Exception {
    String dataNodeUuid1 = cluster.getDataNodes().get(0).getDatanodeUuid();
    String dataNodeUuid2 = cluster.getDataNodes().get(1).getDatanodeUuid();
    final String planArg = String.format("-%s -%s %s,%s",
        REPORT, NODE, dataNodeUuid1, dataNodeUuid2);
    final String cmdLine = String.format("hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);
    verifyOutputsOfReportCommand(outputs, dataNodeUuid1, dataNodeUuid2, true);
  }

  private void verifyOutputsOfReportCommand(List<String> outputs,
      String dataNodeUuid1, String dataNodeUuid2, boolean inputNodesStr) {
    assertThat(outputs.get(0), containsString("Processing report command"));
    if (inputNodesStr) {
      assertThat(outputs.get(1),
          is(allOf(containsString("Reporting volume information for DataNode"),
              containsString(dataNodeUuid1), containsString(dataNodeUuid2))));
    }

    // Since the order of input nodes will be disrupted when parse
    // the node string, we should compare UUID with both output lines.
    assertTrue(outputs.get(2).contains(dataNodeUuid1)
        || outputs.get(6).contains(dataNodeUuid1));
    assertTrue(outputs.get(2).contains(dataNodeUuid2)
        || outputs.get(6).contains(dataNodeUuid2));
  }

  @Test(timeout = 60000)
  public void testReportCommandWithInvalidNode() throws Exception {
    String dataNodeUuid1 = cluster.getDataNodes().get(0).getDatanodeUuid();
    String invalidNode = "invalidNode";
    final String planArg = String.format("-%s -%s %s,%s",
        REPORT, NODE, dataNodeUuid1, invalidNode);
    final String cmdLine = String.format("hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);

    assertThat(
        outputs.get(0),
        containsString("Processing report command"));
    assertThat(
        outputs.get(1),
        is(allOf(containsString("Reporting volume information for DataNode"),
            containsString(dataNodeUuid1), containsString(invalidNode))));

    String invalidNodeInfo =
        String.format("The node(s) '%s' not found. "
            + "Please make sure that '%s' exists in the cluster."
            , invalidNode, invalidNode);
    assertTrue(outputs.get(2).contains(invalidNodeInfo));
  }

  @Test(timeout = 60000)
  public void testReportCommandWithNullNodes() throws Exception {
    // don't input nodes
    final String planArg = String.format("-%s -%s ,", REPORT, NODE);
    final String cmdLine = String.format("hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);

    String invalidNodeInfo = "The number of input nodes is 0. "
        + "Please input the valid nodes.";
    assertTrue(outputs.get(2).contains(invalidNodeInfo));
  }

  @Test(timeout = 60000)
  public void testReportCommandWithReadingHostFile() throws Exception {
    final String testDir = GenericTestUtils.getTestDir().getAbsolutePath();
    File includeFile = new File(testDir, "diskbalancer.include");
    String filePath = testDir + "/diskbalancer.include";

    String dataNodeUuid1 = cluster.getDataNodes().get(0).getDatanodeUuid();
    String dataNodeUuid2 = cluster.getDataNodes().get(1).getDatanodeUuid();

    FileWriter fw = new FileWriter(filePath);
    fw.write("#This-is-comment\n");
    fw.write(dataNodeUuid1 + "\n");
    fw.write(dataNodeUuid2 + "\n");
    fw.close();

    final String planArg = String.format("-%s -%s file://%s",
        REPORT, NODE, filePath);
    final String cmdLine = String.format("hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);

    verifyOutputsOfReportCommand(outputs, dataNodeUuid1, dataNodeUuid2, false);
    includeFile.delete();
  }

  @Test(timeout = 60000)
  public void testReportCommandWithInvalidHostFilePath() throws Exception {
    final String testDir = GenericTestUtils.getTestDir().getAbsolutePath();
    String invalidFilePath = testDir + "/diskbalancer-invalid.include";

    final String planArg = String.format("-%s -%s file://%s",
        REPORT, NODE, invalidFilePath);
    final String cmdLine = String.format("hdfs diskbalancer %s", planArg);
    List<String> outputs = runCommand(cmdLine, cluster);

    String invalidNodeInfo = String.format(
        "The input host file path 'file://%s' is not a valid path.", invalidFilePath);
    assertTrue(outputs.get(2).contains(invalidNodeInfo));
  }
}