TestExternalStoragePolicySatisfier.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.sps;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.function.Supplier;

/**
 * Tests the external sps service plugins.
 */
public class TestExternalStoragePolicySatisfier {
  private static final String ONE_SSD = "ONE_SSD";
  private static final String COLD = "COLD";
  private StorageType[][] allDiskTypes =
      new StorageType[][]{{StorageType.DISK, StorageType.DISK},
          {StorageType.DISK, StorageType.DISK},
          {StorageType.DISK, StorageType.DISK}};
  private File keytabFile;
  private String principal;
  private MiniKdc kdc;
  private File baseDir;
  private NameNodeConnector nnc;
  private StoragePolicySatisfier externalSps;
  private ExternalSPSContext externalCtxt;
  private DistributedFileSystem dfs = null;
  private MiniDFSCluster hdfsCluster = null;
  private Configuration config = null;
  private static final int NUM_OF_DATANODES = 3;
  private static final int STORAGES_PER_DATANODE = 2;
  private static final long CAPACITY = 2 * 256 * 1024 * 1024;
  private static final String FILE = "/testMoveToSatisfyStoragePolicy";
  private static final int DEFAULT_BLOCK_SIZE = 1024;
  private static final Logger LOG =
      LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
  private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() {
    @Override
    public void mockAnException(int retry) throws IOException {
      if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) {
        throw new IOException("IO exception");
      }
    }
  };

  @Before
  public void setUp() {
    config = new HdfsConfiguration();
    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        StoragePolicySatisfierMode.EXTERNAL.toString());
    // Most of the tests are restarting DNs and NN. So, reduced refresh cycle to
    // update latest datanodes.
    config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
        3000);
    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        StoragePolicySatisfierMode.EXTERNAL.toString());
  }

  @After
  public void destroy() throws Exception {
    if (kdc != null) {
      kdc.stop();
      FileUtil.fullyDelete(baseDir);
    }
    if (hdfsCluster != null) {
      hdfsCluster.shutdown();
    }
  }

  /**
   * Sets hdfs cluster.
   */
  private void setCluster(MiniDFSCluster cluster) {
    this.hdfsCluster = cluster;
  }

  /**
   * @return conf.
   */
  private Configuration getConf() {
    return this.config;
  }

  /**
   * @return hdfs cluster.
   */
  private MiniDFSCluster getCluster() {
    return hdfsCluster;
  }

  /**
   * Gets distributed file system.
   *
   * @throws IOException
   */
  private DistributedFileSystem getFS() throws IOException {
    this.dfs = hdfsCluster.getFileSystem();
    return this.dfs;
  }

  private void shutdownCluster() {
    if (externalSps != null) {
      externalSps.stopGracefully();
    }
  }

  private void stopExternalSps() {
    if (externalSps != null) {
      externalSps.stopGracefully();
    }
  }

  private void startExternalSps() {
    externalSps = new StoragePolicySatisfier(getConf());
    externalCtxt = new ExternalSPSContext(externalSps, nnc);

    externalSps.init(externalCtxt);
    externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
  }

  private void createCluster() throws IOException {
    getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
    setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
        STORAGES_PER_DATANODE, CAPACITY));
    getFS();
    writeContent(FILE);
  }

  private void createCluster(boolean createMoverPath) throws IOException {
    getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
    setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
        STORAGES_PER_DATANODE, CAPACITY, createMoverPath, true));
    getFS();
    writeContent(FILE);
  }

  private void createClusterDoNotStartSPS() throws IOException {
    getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
    setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
        STORAGES_PER_DATANODE, CAPACITY, true, false));
    getFS();
    writeContent(FILE);
  }

  private MiniDFSCluster startCluster(final Configuration conf,
      StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
      long nodeCapacity) throws IOException {
    return startCluster(conf, storageTypes, numberOfDatanodes, storagesPerDn,
        nodeCapacity, false, true);
  }

  private MiniDFSCluster startCluster(final Configuration conf,
      StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
      long nodeCapacity, boolean createMoverPath, boolean startSPS) throws IOException {
    long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
    for (int i = 0; i < numberOfDatanodes; i++) {
      for (int j = 0; j < storagesPerDn; j++) {
        capacities[i][j] = nodeCapacity;
      }
    }
    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
        .storageTypes(storageTypes).storageCapacities(capacities).build();
    cluster.waitActive();

    if (startSPS) {
      nnc = DFSTestUtil.getNameNodeConnector(getConf(),
          HdfsServerConstants.MOVER_ID_PATH, 1, createMoverPath);

      externalSps = new StoragePolicySatisfier(getConf());
      externalCtxt = new ExternalSPSContext(externalSps, nnc);

      externalSps.init(externalCtxt);
      externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
    }
    return cluster;
  }

  private void restartNamenode() throws IOException{
    if (externalSps != null) {
      externalSps.stopGracefully();
    }

    getCluster().restartNameNodes();
    getCluster().waitActive();
    externalSps = new StoragePolicySatisfier(getConf());

    externalCtxt = new ExternalSPSContext(externalSps, nnc);
    externalSps.init(externalCtxt);
    externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
  }

  private void initSecureConf(Configuration conf) throws Exception {
    String username = "externalSPS";
    baseDir = GenericTestUtils
        .getTestDir(TestExternalStoragePolicySatisfier.class.getSimpleName());
    FileUtil.fullyDelete(baseDir);
    Assert.assertTrue(baseDir.mkdirs());

    Properties kdcConf = MiniKdc.createConf();
    kdc = new MiniKdc(kdcConf, baseDir);
    kdc.start();

    SecurityUtil.setAuthenticationMethod(
        UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
    UserGroupInformation.setConfiguration(conf);
    KerberosName.resetDefaultRealm();
    Assert.assertTrue("Expected configuration to enable security",
        UserGroupInformation.isSecurityEnabled());

    keytabFile = new File(baseDir, username + ".keytab");
    String keytab = keytabFile.getAbsolutePath();
    // Windows will not reverse name lookup "127.0.0.1" to "localhost".
    String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
    principal = username + "/" + krbInstance + "@" + kdc.getRealm();
    String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
    kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
        "HTTP/" + krbInstance);

    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);

    conf.set(DFS_SPS_ADDRESS_KEY, "localhost:0");
    conf.set(DFS_SPS_KEYTAB_FILE_KEY, keytab);
    conf.set(DFS_SPS_KERBEROS_PRINCIPAL_KEY, principal);

    String keystoresDir = baseDir.getAbsolutePath();
    String sslConfDir = KeyStoreTestUtil
        .getClasspathDir(TestExternalStoragePolicySatisfier.class);
    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);

    conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getClientSSLConfigFileName());
    conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getServerSSLConfigFileName());

    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
  }

  /**
   * Test SPS runs fine when logging in with a keytab in kerberized env. Reusing
   * testWhenStoragePolicySetToALLSSD here for basic functionality testing.
   */
  @Test(timeout = 300000)
  public void testWithKeytabs() throws Exception {
    try {
      initSecureConf(getConf());
      final UserGroupInformation ugi = UserGroupInformation
          .loginUserFromKeytabAndReturnUGI(principal,
              keytabFile.getAbsolutePath());
      ugi.doAs(new PrivilegedExceptionAction<Void>() {
        @Override
        public Void run() throws Exception {
          // verify that sps runs Ok.
          testWhenStoragePolicySetToALLSSD();
          // verify that UGI was logged in using keytab.
          Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
          return null;
        }
      });
    } finally {
      // Reset UGI so that other tests are not affected.
      UserGroupInformation.reset();
      UserGroupInformation.setConfiguration(new Configuration());
    }
  }

  /**
   * Test verifies that SPS call will throw exception if the call Q exceeds
   * OutstandingQueueLimit value.
   *
   * @throws Exception
   */
  @Test(timeout = 300000)
  public void testOutstandingQueueLimitExceeds() throws Exception {
    try {
      getConf().setInt(DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, 3);
      createCluster();
      List<String> files = new ArrayList<>();
      files.add(FILE);
      DistributedFileSystem fs = getFS();

      // stops sps to make the SPS Q with many outstanding requests.
      externalSps.stopGracefully();
      // Creates 4 more files. Send all of them for satisfying the storage
      // policy together.
      for (int i = 0; i < 3; i++) {
        String file1 = "/testOutstandingQueueLimitExceeds_" + i;
        files.add(file1);
        writeContent(file1);
        fs.satisfyStoragePolicy(new Path(file1));
      }
      String fileExceeds = "/testOutstandingQueueLimitExceeds_" + 4;
      files.add(fileExceeds);
      writeContent(fileExceeds);
      try {
        fs.satisfyStoragePolicy(new Path(fileExceeds));
        Assert.fail("Should throw exception as it exceeds "
            + "outstanding SPS call Q limit");
      } catch (IOException ioe) {
        GenericTestUtils.assertExceptionContains(
            "Outstanding satisfier queue limit: 3 exceeded, try later!", ioe);
      }
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that SPS should be able to start when the Mover ID file
   * is not being hold by a Mover. This can be the case when Mover exits
   * ungracefully without deleting the ID file from HDFS.
   */
  @Test(timeout = 300000)
  public void testWhenMoverExitsWithoutDeleteMoverIDFile()
      throws IOException {
    try {
      createCluster();
      // Simulate the case by creating MOVER_ID file
      DFSTestUtil.createFile(getCluster().getFileSystem(),
          HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
      restartNamenode();
      boolean running = externalCtxt.isRunning();
      Assert.assertTrue("SPS should be running as "
          + "no Mover really running", running);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * This test need not run as external scan is not a batch based scanning right
   * now.
   */
  @Ignore("ExternalFileIdCollector is not batch based right now."
      + " So, ignoring it.")
  public void testBatchProcessingForSPSDirectory() throws Exception {
  }

  /**
   * This test case is more specific to internal.
   */
  @Ignore("This test is specific to internal, so skipping here.")
  public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
      throws Exception {
  }

  /**
   * This test is specific to internal SPS. So, ignoring it.
   */
  @Ignore("This test is specific to internal SPS. So, ignoring it.")
  public void testTraverseWhenParentDeleted() throws Exception {
  }

  /**
   * This test is specific to internal SPS. So, ignoring it.
   */
  @Ignore("This test is specific to internal SPS. So, ignoring it.")
  public void testTraverseWhenRootParentDeleted() throws Exception {
  }


  @Test(timeout = 300000)
  public void testWhenStoragePolicySetToCOLD()
      throws Exception {

    try {
      createCluster();
      doTestWhenStoragePolicySetToCOLD();
    } finally {
      shutdownCluster();
    }
  }

  @Test(timeout = 300000)
  public void testInfiniteStartWhenAnotherSPSRunning()
      throws Exception {

    try {
      // Create cluster and create mover path when get NameNodeConnector.
      createCluster(true);

      // Disable system exit for assert.
      ExitUtil.disableSystemExit();

      // Get NameNodeConnector one more time to simulate starting other sps process.
      // It should exit immediately when another sps is running.
      LambdaTestUtils.intercept(ExitUtil.ExitException.class,
          "Exit immediately because another ExternalStoragePolicySatisfier is running",
          () -> ExternalStoragePolicySatisfier.getNameNodeConnector(config));
    } finally {
      // Reset first exit exception to avoid AssertionError in MiniDFSCluster#shutdown.
      // This has no effect on functionality.
      ExitUtil.resetFirstExitException();
      shutdownCluster();
    }
  }

  @Test(timeout = 300000)
  public void testWhenStoragePolicySetToCOLDWithException()
      throws Exception {

    try {
      createCluster();
      // Mock an IOException 3 times, and moving tasks should succeed finally.
      ExternalSPSFaultInjector.setInstance(injector);
      doTestWhenStoragePolicySetToCOLD();
    } finally {
      shutdownCluster();
    }
  }

  private void doTestWhenStoragePolicySetToCOLD() throws Exception {
    // Change policy to COLD
    dfs.setStoragePolicy(new Path(FILE), COLD);

    StorageType[][] newtypes =
        new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
            {StorageType.ARCHIVE, StorageType.ARCHIVE},
            {StorageType.ARCHIVE, StorageType.ARCHIVE}};
    startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
        STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

    hdfsCluster.triggerHeartbeats();
    dfs.satisfyStoragePolicy(new Path(FILE));
    // Wait till namenode notified about the block location details
    DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000,
        dfs);
  }

  @Test(timeout = 300000)
  public void testWhenStoragePolicySetToALLNVDIMM()
          throws Exception {
    try {
      createCluster();
      // Change policy to ALL_NVDIMM
      dfs.setStoragePolicy(new Path(FILE), "ALL_NVDIMM");

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.NVDIMM, StorageType.DISK},
              {StorageType.NVDIMM, StorageType.DISK},
              {StorageType.NVDIMM, StorageType.DISK}};

      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
              STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      // Wait till StorgePolicySatisfier Identified that block
      // to move to MVDIMM areas
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.NVDIMM, 3,
          30000, dfs);
    } finally {
      shutdownCluster();
    }
  }

  @Test(timeout = 300000)
  public void testWhenStoragePolicySetToALLSSD()
      throws Exception {
    try {
      createCluster();
      // Change policy to ALL_SSD
      dfs.setStoragePolicy(new Path(FILE), "ALL_SSD");

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.SSD, StorageType.DISK},
              {StorageType.SSD, StorageType.DISK},
              {StorageType.SSD, StorageType.DISK}};

      // Making sure SDD based nodes added to cluster. Adding SSD based
      // datanodes.
      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      // Wait till StorgePolicySatisfier Identified that block to move to SSD
      // areas
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs);
    } finally {
      shutdownCluster();
    }
  }

  @Test(timeout = 300000)
  public void testWhenStoragePolicySetToONESSD()
      throws Exception {
    try {
      createCluster();
      // Change policy to ONE_SSD
      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};

      // Making sure SDD based nodes added to cluster. Adding SSD based
      // datanodes.
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      // Wait till StorgePolicySatisfier Identified that block to move to SSD
      // areas
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
          dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that the block storage movement report will be propagated
   * to Namenode via datanode heartbeat.
   */
  @Test(timeout = 300000)
  public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
    try {
      createCluster();
      // Change policy to ONE_SSD
      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};

      // Making sure SDD based nodes added to cluster. Adding SSD based
      // datanodes.
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();

      // Wait till the block is moved to SSD areas
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
          dfs);

      waitForBlocksMovementAttemptReport(1, 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that multiple files are giving to satisfy storage policy
   * and should work well altogether.
   */
  @Test(timeout = 300000)
  public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
    try {
      createCluster();
      List<String> files = new ArrayList<>();
      files.add(FILE);

      // Creates 4 more files. Send all of them for satisfying the storage
      // policy together.
      for (int i = 0; i < 4; i++) {
        String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
        files.add(file1);
        writeContent(file1);
      }
      // Change policy to ONE_SSD
      for (String fileName : files) {
        dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
        dfs.satisfyStoragePolicy(new Path(fileName));
      }

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};

      // Making sure SDD based nodes added to cluster. Adding SSD based
      // datanodes.
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
      hdfsCluster.triggerHeartbeats();

      for (String fileName : files) {
        // Wait till the block is moved to SSD areas
        DFSTestUtil.waitExpectedStorageType(
            fileName, StorageType.SSD, 1, 30000, dfs);
        DFSTestUtil.waitExpectedStorageType(
            fileName, StorageType.DISK, 2, 30000, dfs);
      }

      waitForBlocksMovementAttemptReport(files.size(), 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for file.
   * @throws Exception
   */
  @Test(timeout = 300000)
  public void testSatisfyFileWithHdfsAdmin() throws Exception {
    try {
      createCluster();
      HdfsAdmin hdfsAdmin =
          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
      // Change policy to COLD
      dfs.setStoragePolicy(new Path(FILE), COLD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
              {StorageType.DISK, StorageType.ARCHIVE},
              {StorageType.DISK, StorageType.ARCHIVE}};
      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      hdfsAdmin.satisfyStoragePolicy(new Path(FILE));

      hdfsCluster.triggerHeartbeats();
      // Wait till namenode notified about the block location details
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000,
          dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for dir.
   * @throws Exception
   */
  @Test(timeout = 300000)
  public void testSatisfyDirWithHdfsAdmin() throws Exception {
    try {
      createCluster();
      HdfsAdmin hdfsAdmin =
          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
      final String subDir = "/subDir";
      final String subFile1 = subDir + "/subFile1";
      final String subDir2 = subDir + "/subDir2";
      final String subFile2 = subDir2 + "/subFile2";
      dfs.mkdirs(new Path(subDir));
      writeContent(subFile1);
      dfs.mkdirs(new Path(subDir2));
      writeContent(subFile2);

      // Change policy to COLD
      dfs.setStoragePolicy(new Path(subDir), ONE_SSD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      hdfsAdmin.satisfyStoragePolicy(new Path(subDir));

      hdfsCluster.triggerHeartbeats();

      // take effect for the file in the directory.
      DFSTestUtil.waitExpectedStorageType(
          subFile1, StorageType.SSD, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(
          subFile1, StorageType.DISK, 2, 30000, dfs);

      // take no effect for the sub-dir's file in the directory.
      DFSTestUtil.waitExpectedStorageType(
          subFile2, StorageType.SSD, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(
          subFile2, StorageType.DISK, 2, 30000, dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify hdfsAdmin.satisfyStoragePolicy exceptions.
   * @throws Exception
   */
  @Test(timeout = 300000)
  public void testSatisfyWithExceptions() throws Exception {
    try {
      createCluster();
      final String nonExistingFile = "/noneExistingFile";
      hdfsCluster.getConfiguration(0).
          setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
      restartNamenode();
      HdfsAdmin hdfsAdmin =
          new HdfsAdmin(FileSystem.getDefaultUri(config), config);

      try {
        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
        Assert.fail(String.format(
            "Should failed to satisfy storage policy "
                + "for %s since %s is set to false.",
            FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
      } catch (IOException e) {
        GenericTestUtils.assertExceptionContains(String.format(
            "Failed to satisfy storage policy since %s is set to false.",
            DFS_STORAGE_POLICY_ENABLED_KEY), e);
      }

      hdfsCluster.getConfiguration(0).
          setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
      restartNamenode();

      hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config);
      try {
        hdfsAdmin.satisfyStoragePolicy(new Path(nonExistingFile));
        Assert.fail("Should throw FileNotFoundException for " +
            nonExistingFile);
      } catch (FileNotFoundException e) {

      }

      try {
        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
      } catch (Exception e) {
        Assert.fail(String.format("Allow to invoke mutlipe times "
            + "#satisfyStoragePolicy() api for a path %s , internally just "
            + "skipping addtion to satisfy movement queue.", FILE));
      }
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that for the given path, some of the blocks or block src
   * locations(src nodes) under the given path will be scheduled for block
   * movement.
   *
   * For example, there are two block for a file:
   *
   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
   * Only one datanode is available with storage type ARCHIVE, say D.
   *
   * SPS will schedule block movement to the coordinator node with the details,
   * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)].
   */
  @Test(timeout = 300000)
  public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
      throws Exception {
    try {
      createCluster();
      // Change policy to COLD
      dfs.setStoragePolicy(new Path(FILE), COLD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};

      // Adding ARCHIVE based datanodes.
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      // Wait till StorgePolicySatisfier identified that block to move to
      // ARCHIVE area.
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
          dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
          dfs);

      waitForBlocksMovementAttemptReport(1, 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that for the given path, no blocks or block src
   * locations(src nodes) under the given path will be scheduled for block
   * movement as there are no available datanode with required storage type.
   *
   * For example, there are two block for a file:
   *
   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
   * No datanode is available with storage type ARCHIVE.
   *
   * SPS won't schedule any block movement for this path.
   */
  @Test(timeout = 300000)
  public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
      throws Exception {
    try {
      createCluster();
      // Change policy to COLD
      dfs.setStoragePolicy(new Path(FILE), COLD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
      // Adding DISK based datanodes
      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();

      // No block movement will be scheduled as there is no target node
      // available with the required storage type.
      waitForAttemptedItems(1, 30000);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
          dfs);
      // Since there is no target node the item will get timed out and then
      // re-attempted.
      waitForAttemptedItems(1, 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test to verify that satisfy worker can't move blocks. If the given block is
   * pinned it shouldn't be considered for retries.
   */
  @Test(timeout = 120000)
  public void testMoveWithBlockPinning() throws Exception {
    try{
      config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
      hdfsCluster = startCluster(config, allDiskTypes, 3, 2, CAPACITY);

      hdfsCluster.waitActive();
      dfs = hdfsCluster.getFileSystem();

      // create a file with replication factor 3 and mark 2 pinned block
      // locations.
      final String file1 = createFileAndSimulateFavoredNodes(2);

      // Change policy to COLD
      dfs.setStoragePolicy(new Path(file1), COLD);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
              {StorageType.ARCHIVE, StorageType.ARCHIVE},
              {StorageType.ARCHIVE, StorageType.ARCHIVE}};
      // Adding DISK based datanodes
      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      dfs.satisfyStoragePolicy(new Path(file1));
      hdfsCluster.triggerHeartbeats();

      // No block movement will be scheduled as there is no target node
      // available with the required storage type.
      waitForAttemptedItems(1, 30000);
      waitForBlocksMovementAttemptReport(1, 30000);
      DFSTestUtil.waitExpectedStorageType(
          file1, StorageType.ARCHIVE, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(
          file1, StorageType.DISK, 2, 30000, dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests to verify that for the given path, only few of the blocks or block
   * src locations(src nodes) under the given path will be scheduled for block
   * movement.
   *
   * For example, there are two block for a file:
   *
   * File1 => two blocks and default storage policy(HOT).
   * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)],
   * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)].
   *
   * Now, set storage policy to COLD.
   * Only two Dns are available with expected storage type ARCHIVE, say A, E.
   *
   * SPS will schedule block movement to the coordinator node with the details,
   * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)],
   * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)].
   */
  @Test(timeout = 300000)
  public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
      throws Exception {
    try {
      int numOfDns = 5;
      config.setLong("dfs.block.size", 1024);
      allDiskTypes =
          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
              {StorageType.DISK, StorageType.DISK},
              {StorageType.DISK, StorageType.DISK},
              {StorageType.DISK, StorageType.DISK},
              {StorageType.DISK, StorageType.ARCHIVE}};
      hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      writeContent(FILE, (short) 5);

      // Change policy to COLD
      dfs.setStoragePolicy(new Path(FILE), COLD);

      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      // Wait till StorgePolicySatisfier identified that block to move to
      // ARCHIVE area.
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
          dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
          dfs);

      waitForBlocksMovementAttemptReport(1, 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests that moving block storage with in the same datanode. Let's say we
   * have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when
   * storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block
   * should move to DN2[SSD] successfully.
   */
  @Test(timeout = 300000)
  public void testBlockMoveInSameDatanodeWithONESSD() throws Exception {
    StorageType[][] diskTypes =
        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.RAM_DISK}};
    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
    try {
      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      writeContent(FILE);

      // Change policy to ONE_SSD
      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);

      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
          dfs);

    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests that moving block storage with in the same datanode and remote node.
   * Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK],
   * DN4[DISK,DISK] when storagepolicy set to WARM and request
   * satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and
   * DN2[ARCHIVE] successfully.
   */
  @Test(timeout = 300000)
  public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception {
    StorageType[][] diskTypes =
        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
            {StorageType.ARCHIVE, StorageType.SSD},
            {StorageType.DISK, StorageType.DISK},
            {StorageType.DISK, StorageType.DISK}};

    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
    try {
      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      writeContent(FILE);

      // Change policy to WARM
      dfs.setStoragePolicy(new Path(FILE), "WARM");
      dfs.satisfyStoragePolicy(new Path(FILE));
      hdfsCluster.triggerHeartbeats();

      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000,
          dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
          dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * If replica with expected storage type already exist in source DN then that
   * DN should be skipped.
   */
  @Test(timeout = 300000)
  public void testSPSWhenReplicaWithExpectedStorageAlreadyAvailableInSource()
      throws Exception {
    StorageType[][] diskTypes = new StorageType[][] {
        {StorageType.DISK, StorageType.ARCHIVE},
        {StorageType.DISK, StorageType.ARCHIVE},
        {StorageType.DISK, StorageType.ARCHIVE}};

    try {
      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      // 1. Write two replica on disk
      DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE,
          (short) 2, 0);
      // 2. Change policy to COLD, so third replica will be written to ARCHIVE.
      dfs.setStoragePolicy(new Path(FILE), "COLD");

      // 3.Change replication factor to 3.
      dfs.setReplication(new Path(FILE), (short) 3);

      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
          dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
          dfs);

      // 4. Change policy to HOT, so we can move the all block to DISK.
      dfs.setStoragePolicy(new Path(FILE), "HOT");

      // 4. Satisfy the policy.
      dfs.satisfyStoragePolicy(new Path(FILE));

      // 5. Block should move successfully .
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
          dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests that movements should not be assigned when there is no space in
   * target DN.
   */
  @Test(timeout = 300000)
  public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
      throws Exception {
    StorageType[][] diskTypes =
        new StorageType[][]{{StorageType.DISK, StorageType.DISK},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.DISK}};
    config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE);
    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
        false);
    long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
    try {
      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, dnCapacity);
      dfs = hdfsCluster.getFileSystem();
      writeContent(FILE);

      // Change policy to ONE_SSD
      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
      Path filePath = new Path("/testChooseInSameDatanode");
      final FSDataOutputStream out =
          dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
      try {
        dfs.setStoragePolicy(filePath, ONE_SSD);
        // Try to fill up SSD part by writing content
        long remaining = dfs.getStatus().getRemaining() / (3 * 2);
        for (int i = 0; i < remaining; i++) {
          out.write(i);
        }
      } finally {
        out.close();
      }
      hdfsCluster.triggerHeartbeats();
      ArrayList<DataNode> dataNodes = hdfsCluster.getDataNodes();
      // Temporarily disable heart beats, so that we can assert whether any
      // items schedules for DNs even though DN's does not have space to write.
      // Disabling heart beats can keep scheduled items on DatanodeDescriptor
      // itself.
      for (DataNode dataNode : dataNodes) {
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
      }
      dfs.satisfyStoragePolicy(new Path(FILE));

      // Wait for items to be processed
      waitForAttemptedItems(1, 30000);

      // Enable heart beats now
      for (DataNode dataNode : dataNodes) {
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false);
      }
      hdfsCluster.triggerHeartbeats();

      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
          dfs);
      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Tests that Xattrs should be cleaned if satisfy storage policy called on EC
   * file with unsuitable storage policy set.
   *
   * @throws Exception
   */
  @Test(timeout = 300000)
  public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
      throws Exception {
    StorageType[][] diskTypes =
        new StorageType[][]{{StorageType.SSD, StorageType.DISK},
            {StorageType.SSD, StorageType.DISK},
            {StorageType.SSD, StorageType.DISK},
            {StorageType.SSD, StorageType.DISK},
            {StorageType.SSD, StorageType.DISK},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.SSD},
            {StorageType.DISK, StorageType.SSD}};

    int defaultStripedBlockSize =
        StripedFileTestUtil.getDefaultECPolicy().getCellSize() * 4;
    config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
    config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
    config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
        1L);
    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
        false);
    try {
      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      dfs.enableErasureCodingPolicy(
          StripedFileTestUtil.getDefaultECPolicy().getName());

      // set "/foo" directory with ONE_SSD storage policy.
      ClientProtocol client = NameNodeProxies.createProxy(config,
          hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class)
          .getProxy();
      String fooDir = "/foo";
      client.mkdirs(fooDir, new FsPermission((short) 777), true);
      // set an EC policy on "/foo" directory
      client.setErasureCodingPolicy(fooDir,
          StripedFileTestUtil.getDefaultECPolicy().getName());

      // write file to fooDir
      final String testFile = "/foo/bar";
      long fileLen = 20 * defaultStripedBlockSize;
      DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0);

      // ONESSD is unsuitable storage policy on EC files
      client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
      dfs.satisfyStoragePolicy(new Path(testFile));

      // Thread.sleep(9000); // To make sure SPS triggered
      // verify storage types and locations
      LocatedBlocks locatedBlocks =
          client.getBlockLocations(testFile, 0, fileLen);
      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
        for (StorageType type : lb.getStorageTypes()) {
          Assert.assertEquals(StorageType.DISK, type);
        }
      }

      // Make sure satisfy xattr has been removed.
      DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY,
          hdfsCluster.getNamesystem(), 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS with empty file.
   * 1. Create one empty file.
   * 2. Call satisfyStoragePolicy for empty file.
   * 3. SPS should skip this file and xattr should not be added for empty file.
   */
  @Test(timeout = 300000)
  public void testSPSWhenFileLengthIsZero() throws Exception {
    try {
      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, CAPACITY);
      hdfsCluster.waitActive();
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      Path filePath = new Path("/zeroSizeFile");
      DFSTestUtil.createFile(fs, filePath, 0, (short) 1, 0);
      FSEditLog editlog = hdfsCluster.getNameNode().getNamesystem()
          .getEditLog();
      long lastWrittenTxId = editlog.getLastWrittenTxId();
      fs.satisfyStoragePolicy(filePath);
      Assert.assertEquals("Xattr should not be added for the file",
          lastWrittenTxId, editlog.getLastWrittenTxId());
      INode inode = hdfsCluster.getNameNode().getNamesystem().getFSDirectory()
          .getINode(filePath.toString());
      Assert.assertTrue("XAttrFeature should be null for file",
          inode.getXAttrFeature() == null);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS for low redundant file blocks.
   * 1. Create cluster with 3 datanode.
   * 1. Create one file with 3 replica.
   * 2. Set policy and call satisfyStoragePolicy for file.
   * 3. Stop NameNode and Datanodes.
   * 4. Start NameNode with 2 datanode and wait for block movement.
   * 5. Start third datanode.
   * 6. Third Datanode replica also should be moved in proper
   * sorage based on policy.
   */
  @Test(timeout = 300000)
  public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
    try {
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
          "3000");
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
          "5000");
      StorageType[][] newtypes = new StorageType[][] {
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK}};
      hdfsCluster = startCluster(config, newtypes, 3, 2, CAPACITY);
      hdfsCluster.waitActive();
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      Path filePath = new Path("/zeroSizeFile");
      DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0);
      fs.setStoragePolicy(filePath, "COLD");
      List<DataNodeProperties> list = new ArrayList<>();
      list.add(hdfsCluster.stopDataNode(0));
      list.add(hdfsCluster.stopDataNode(0));
      list.add(hdfsCluster.stopDataNode(0));
      restartNamenode();
      hdfsCluster.restartDataNode(list.get(0), false);
      hdfsCluster.restartDataNode(list.get(1), false);
      hdfsCluster.waitActive();
      fs.satisfyStoragePolicy(filePath);
      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
          StorageType.ARCHIVE, 2, 30000, hdfsCluster.getFileSystem());
      hdfsCluster.restartDataNode(list.get(2), false);
      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS for extra redundant file blocks.
   * 1. Create cluster with 5 datanode.
   * 2. Create one file with 5 replica.
   * 3. Set file replication to 3.
   * 4. Set policy and call satisfyStoragePolicy for file.
   * 5. Block should be moved successfully.
   */
  @Test(timeout = 600000)
  public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
    try {
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
          "3000");
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
          "5000");
      StorageType[][] newtypes = new StorageType[][] {
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK}};
      hdfsCluster = startCluster(config, newtypes, 5, 2, CAPACITY);
      hdfsCluster.waitActive();
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      Path filePath = new Path("/zeroSizeFile");
      DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0);
      fs.setReplication(filePath, (short) 3);
      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
          LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class));
      fs.setStoragePolicy(filePath, "COLD");
      fs.satisfyStoragePolicy(filePath);
      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
          StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem());
      assertFalse("Log output does not contain expected log message: ",
          logs.getOutput().contains("some of the blocks are low redundant"));
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS for empty directory, xAttr should be removed.
   */
  @Test(timeout = 300000)
  public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
      InterruptedException {
    try {
      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, CAPACITY);
      hdfsCluster.waitActive();
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      Path emptyDir = new Path("/emptyDir");
      fs.mkdirs(emptyDir);
      fs.satisfyStoragePolicy(emptyDir);
      // Make sure satisfy xattr has been removed.
      DFSTestUtil.waitForXattrRemoved("/emptyDir",
          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS for not exist directory.
   */
  @Test(timeout = 300000)
  public void testSPSForNonExistDirectory() throws Exception {
    try {
      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, CAPACITY);
      hdfsCluster.waitActive();
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      Path emptyDir = new Path("/emptyDir");
      try {
        fs.satisfyStoragePolicy(emptyDir);
        fail("FileNotFoundException should throw");
      } catch (FileNotFoundException e) {
        // nothing to do
      }
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS for directory tree which doesn't have files.
   */
  @Test(timeout = 300000)
  public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
    try {
      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
          STORAGES_PER_DATANODE, CAPACITY);
      hdfsCluster.waitActive();
      // Create directories
      /*
       *                   root
       *                    |
       *           A--------C--------D
       *                    |
       *               G----H----I
       *                    |
       *                    O
       */
      DistributedFileSystem fs = hdfsCluster.getFileSystem();
      fs.mkdirs(new Path("/root/C/H/O"));
      fs.mkdirs(new Path("/root/A"));
      fs.mkdirs(new Path("/root/D"));
      fs.mkdirs(new Path("/root/C/G"));
      fs.mkdirs(new Path("/root/C/I"));
      fs.satisfyStoragePolicy(new Path("/root"));
      // Make sure satisfy xattr has been removed.
      DFSTestUtil.waitForXattrRemoved("/root",
          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test SPS that satisfy the files and then delete the files before start SPS.
   */
  @Test(timeout = 300000)
  public void testSPSSatisfyAndThenDeleteFileBeforeStartSPS() throws Exception {
    try {
      createCluster();
      HdfsAdmin hdfsAdmin =
          new HdfsAdmin(FileSystem.getDefaultUri(config), config);

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
              {StorageType.DISK, StorageType.ARCHIVE},
              {StorageType.DISK, StorageType.ARCHIVE}};
      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      stopExternalSps();

      dfs.setStoragePolicy(new Path(FILE), COLD);
      hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
      dfs.delete(new Path(FILE), true);

      startExternalSps();

      String file1 = "/testMoveToSatisfyStoragePolicy_1";
      writeContent(file1);
      dfs.setStoragePolicy(new Path(file1), COLD);
      hdfsAdmin.satisfyStoragePolicy(new Path(file1));

      hdfsCluster.triggerHeartbeats();
      DFSTestUtil.waitExpectedStorageType(file1, StorageType.ARCHIVE, 3, 30000,
          dfs);
    } finally {
      shutdownCluster();
    }
  }


  /**
   * Test SPS for directory which has multilevel directories.
   */
  @Test(timeout = 300000)
  public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
      throws Exception {
    try {
      StorageType[][] diskTypes = new StorageType[][] {
          {StorageType.DISK, StorageType.ARCHIVE},
          {StorageType.ARCHIVE, StorageType.SSD},
          {StorageType.DISK, StorageType.DISK}};
      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
          STORAGES_PER_DATANODE, CAPACITY);
      dfs = hdfsCluster.getFileSystem();
      createDirectoryTree(dfs);

      List<String> files = getDFSListOfTree();
      dfs.setStoragePolicy(new Path("/root"), COLD);
      dfs.satisfyStoragePolicy(new Path("/root"));
      for (String fileName : files) {
        // Wait till the block is moved to ARCHIVE
        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
            30000, dfs);
      }
    } finally {
      shutdownCluster();
    }
  }

  /**
   * Test storage move blocks while under replication block tasks exists in the
   * system. So, both will share the max transfer streams.
   *
   * 1. Create cluster with 3 datanode.
   * 2. Create 20 files with 2 replica.
   * 3. Start 2 more DNs with DISK & SSD types
   * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
   * 5. Set policy to SSD to the 2nd set of files from 11-20
   * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
   * 7. Wait for the under replica and SPS tasks completion
   */
  @Test(timeout = 300000)
  public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
    try {
      config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
          "3000");
      config.set(DFSConfigKeys
          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
          "5000");

      StorageType[][] storagetypes = new StorageType[][] {
          {StorageType.ARCHIVE, StorageType.DISK},
          {StorageType.ARCHIVE, StorageType.DISK}};

      hdfsCluster = startCluster(config, storagetypes, 2, 2, CAPACITY);
      hdfsCluster.waitActive();
      dfs = hdfsCluster.getFileSystem();

      // Below files will be used for pending replication block tasks.
      for (int i=1; i<=20; i++){
        Path filePath = new Path("/file" + i);
        DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2,
            0);
      }

      StorageType[][] newtypes =
          new StorageType[][]{{StorageType.DISK, StorageType.SSD},
              {StorageType.DISK, StorageType.SSD}};
      startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes,
          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

      // increase replication factor to 4 for the first 10 files and thus
      // initiate replica tasks
      for (int i=1; i<=10; i++){
        Path filePath = new Path("/file" + i);
        dfs.setReplication(filePath, (short) 4);
      }

      // invoke SPS for 11-20 files
      for (int i = 11; i <= 20; i++) {
        Path filePath = new Path("/file" + i);
        dfs.setStoragePolicy(filePath, "ALL_SSD");
        dfs.satisfyStoragePolicy(filePath);
      }

      for (int i = 1; i <= 10; i++) {
        Path filePath = new Path("/file" + i);
        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
            StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem());
      }
      for (int i = 11; i <= 20; i++) {
        Path filePath = new Path("/file" + i);
        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
            StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
      }
    } finally {
      shutdownCluster();
    }
  }

  @Test(timeout = 300000)
  public void testExternalSPSMetrics()
      throws Exception {

    try {
      createClusterDoNotStartSPS();
      dfs.satisfyStoragePolicy(new Path(FILE));
      // Assert metrics.
      assertEquals(1, hdfsCluster.getNamesystem().getPendingSPSPaths());
    } finally {
      shutdownCluster();
    }
  }

  private static void createDirectoryTree(DistributedFileSystem dfs)
      throws Exception {
    // tree structure
    /*
     *                           root
     *                             |
     *           A--------B--------C--------D--------E
     *                    |                 |
     *          F----G----H----I       J----K----L----M
     *               |                           |
     *          N----O----P                 Q----R----S
     *                    |                 |
     *                    T                 U
     */
    // create root Node and child
    dfs.mkdirs(new Path("/root"));
    DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
    dfs.mkdirs(new Path("/root/B"));
    DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
    dfs.mkdirs(new Path("/root/D"));
    DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);

    // Create /root/B child
    DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
    dfs.mkdirs(new Path("/root/B/G"));
    DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
    DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);

    // Create /root/D child
    DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
    DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
    dfs.mkdirs(new Path("/root/D/L"));
    DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);

    // Create /root/B/G child
    DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
    DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
    dfs.mkdirs(new Path("/root/B/G/P"));

    // Create /root/D/L child
    dfs.mkdirs(new Path("/root/D/L/Q"));
    DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
    DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);

    // Create /root/B/G/P child
    DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);

    // Create /root/D/L/Q child
    DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
  }

  private List<String> getDFSListOfTree() {
    List<String> dfsList = new ArrayList<>();
    dfsList.add("/root/A");
    dfsList.add("/root/B/F");
    dfsList.add("/root/B/G/N");
    dfsList.add("/root/B/G/O");
    dfsList.add("/root/B/G/P/T");
    dfsList.add("/root/B/H");
    dfsList.add("/root/B/I");
    dfsList.add("/root/C");
    dfsList.add("/root/D/J");
    dfsList.add("/root/D/K");
    dfsList.add("/root/D/L/Q/U");
    dfsList.add("/root/D/L/R");
    dfsList.add("/root/D/L/S");
    dfsList.add("/root/D/M");
    dfsList.add("/root/E");
    return dfsList;
  }

  private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
      throws IOException {
    ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
    final String file1 = "/testMoveWithBlockPinning";
    // replication factor 3
    InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount];
    for (int i = 0; i < favoredNodesCount; i++) {
      favoredNodes[i] = dns.get(i).getXferAddress();
    }
    DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100,
        DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes);

    LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
    Assert.assertEquals("Wrong block count", 1,
        locatedBlocks.locatedBlockCount());

    // verify storage type before movement
    LocatedBlock lb = locatedBlocks.get(0);
    StorageType[] storageTypes = lb.getStorageTypes();
    for (StorageType storageType : storageTypes) {
      Assert.assertTrue(StorageType.DISK == storageType);
    }

    // Mock FsDatasetSpi#getPinning to show that the block is pinned.
    DatanodeInfo[] locations = lb.getLocations();
    Assert.assertEquals(3, locations.length);
    Assert.assertTrue(favoredNodesCount < locations.length);
    for(DatanodeInfo dnInfo: locations){
      LOG.info("Simulate block pinning in datanode {}",
          locations[favoredNodesCount]);
      DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort());
      InternalDataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
      favoredNodesCount--;
      if (favoredNodesCount <= 0) {
        break; // marked favoredNodesCount number of pinned block location
      }
    }
    return file1;
  }

  public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
      int timeout) throws TimeoutException, InterruptedException {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
            expectedBlkMovAttemptedCount,
            ((BlockStorageMovementAttemptedItems) (externalSps
                .getAttemptedItemsMonitor())).getAttemptedItemsCount());
        return ((BlockStorageMovementAttemptedItems) (externalSps
            .getAttemptedItemsMonitor()))
                .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
      }
    }, 100, timeout);
  }

  public void waitForBlocksMovementAttemptReport(
      long expectedMovementFinishedBlocksCount, int timeout)
          throws TimeoutException, InterruptedException {
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        int actualCount = externalSps.getAttemptedItemsMonitor()
            .getAttemptedItemsCount();
        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
            expectedMovementFinishedBlocksCount, actualCount);
        return actualCount
            >= expectedMovementFinishedBlocksCount;
      }
    }, 100, timeout);
  }

  public void writeContent(final String fileName) throws IOException {
    writeContent(fileName, (short) 3);
  }

  private void writeContent(final String fileName, short replicatonFactor)
      throws IOException {
    // write to DISK
    final FSDataOutputStream out = dfs.create(new Path(fileName),
        replicatonFactor);
    for (int i = 0; i < 1024; i++) {
      out.write(i);
    }
    out.close();
  }

  private void startAdditionalDNs(final Configuration conf,
      int newNodesRequired, int existingNodesNum, StorageType[][] newTypes,
      int storagesPerDn, long nodeCapacity, final MiniDFSCluster cluster)
          throws IOException {
    long[][] capacities;
    existingNodesNum += newNodesRequired;
    capacities = new long[newNodesRequired][storagesPerDn];
    for (int i = 0; i < newNodesRequired; i++) {
      for (int j = 0; j < storagesPerDn; j++) {
        capacities[i][j] = nodeCapacity;
      }
    }

    cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
        null, capacities, null, false, false, false, null, null, null);
    cluster.triggerHeartbeats();
  }

  /**
   * Implementation of listener callback, where it collects all the sps move
   * attempted blocks for assertion.
   */
  public static final class ExternalBlockMovementListener
      implements BlockMovementListener {

    private List<Block> actualBlockMovements = new ArrayList<>();

    @Override
    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
      for (Block block : moveAttemptFinishedBlks) {
        actualBlockMovements.add(block);
      }
      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
    }

    public List<Block> getActualBlockMovements() {
      return actualBlockMovements;
    }

    public void clear() {
      actualBlockMovements.clear();
    }
  }

  @Test(timeout = 300000)
  public void testExternalSPSMetricsExposedToJMX() throws Exception {
    try {
      createCluster();
      // Start JMX but stop SPS thread to prevent mock data from being consumed.
      externalSps.stop(true);
      externalCtxt.initMetrics(externalSps);

      ExternalSPSBeanMetrics spsBeanMetrics = externalCtxt.getSpsBeanMetrics();
      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxBeanName = new ObjectName("Hadoop:service=ExternalSPS,name=ExternalSPS");
      // Assert metrics before update.
      assertEquals(0, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
      assertEquals(0, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
      assertEquals(0, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));

      // Update metrics.
      spsBeanMetrics.updateAttemptedItemsCount();
      spsBeanMetrics.updateProcessingQueueSize();
      spsBeanMetrics.updateMovementFinishedBlocksCount();

      // Assert metrics after update.
      assertEquals(1, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
      assertEquals(1, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
      assertEquals(1, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
    } finally {
      shutdownCluster();
    }
  }
}