TestDiskBalancerWithMockMover.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.hadoop.hdfs.server.diskbalancer;

import org.apache.hadoop.util.Preconditions;
import java.util.function.Supplier;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
 * Tests diskbalancer with a mock mover.
 */
public class TestDiskBalancerWithMockMover {
  static final Logger LOG =
      LoggerFactory.getLogger(TestDiskBalancerWithMockMover.class);

  @Rule
  public ExpectedException thrown = ExpectedException.none();

  private static final String PLAN_FILE = "/system/current.plan.json";
  private MiniDFSCluster cluster;
  private String sourceName;
  private String destName;
  private String sourceUUID;
  private String destUUID;
  private String nodeID;
  private DataNode dataNode;

  /**
   * Checks that we return the right error if diskbalancer is not enabled.
   */
  @Test
  public void testDiskBalancerDisabled() throws Exception {
    Configuration conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
    restartDataNode();

    TestMover blockMover = new TestMover(cluster.getDataNodes()
        .get(0).getFSDataset());

    DiskBalancer balancer = new DiskBalancerBuilder(conf)
        .setMover(blockMover)
        .build();

    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.DISK_BALANCER_NOT_ENABLED));

    balancer.queryWorkStatus();
  }

  /**
   * Checks that Enable flag works correctly.
   *
   * @throws DiskBalancerException
   */
  @Test
  public void testDiskBalancerEnabled() throws DiskBalancerException {
    Configuration conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);

    TestMover blockMover = new TestMover(cluster.getDataNodes()
        .get(0).getFSDataset());

    DiskBalancer balancer = new DiskBalancerBuilder(conf)
        .setMover(blockMover)
        .build();

    DiskBalancerWorkStatus status = balancer.queryWorkStatus();
    assertEquals(NO_PLAN, status.getResult());

  }

  private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer,
                                 int version) throws IOException {
    String planJson = plan.toJson();
    String planID = DigestUtils.sha1Hex(planJson);
    balancer.submitPlan(planID, version, PLAN_FILE, planJson, false);
  }

  private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
      throws IOException {
    executeSubmitPlan(plan, balancer, 1);
  }

  /**
   * Test a second submit plan fails.
   *
   * @throws Exception
   */
  @Test
  public void testResubmitDiskBalancerPlan() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();

    // ask block mover to get stuck in copy block
    mockMoverHelper.getBlockMover().setSleep();
    executeSubmitPlan(plan, balancer);
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.PLAN_ALREADY_IN_PROGRESS));
    executeSubmitPlan(plan, balancer);

    // Not needed but this is the cleanup step.
    mockMoverHelper.getBlockMover().clearSleep();
  }

  @Test
  public void testSubmitDiskBalancerPlan() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    final DiskBalancer balancer = mockMoverHelper.getBalancer();

    executeSubmitPlan(plan, balancer);
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      @Override
      public Boolean get() {
        try {
          return balancer.queryWorkStatus().getResult() ==
              DiskBalancerWorkStatus.Result.PLAN_DONE;
        } catch (IOException ex) {
          return false;
        }
      }
    }, 1000, 100000);

    // Asserts that submit plan caused an execution in the background.
    assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);
  }

  @Test
  public void testSubmitWithOlderPlan() throws Exception {
    final long millisecondInAnHour = 1000 * 60 * 60L;
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();

    plan.setTimeStamp(Time.now() - (32 * millisecondInAnHour));
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.OLD_PLAN_SUBMITTED));
    executeSubmitPlan(plan, balancer);
  }

  @Test
  public void testSubmitWithOldInvalidVersion() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();

    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.INVALID_PLAN_VERSION));

    // Plan version is invalid -- there is no version 0.
    executeSubmitPlan(plan, balancer, 0);
  }

  @Test
  public void testSubmitWithNullPlan() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();
    String planJson = plan.toJson();
    String planID = DigestUtils.sha1Hex(planJson);

    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.INVALID_PLAN));

    balancer.submitPlan(planID, 1, "no-plan-file.json", null, false);
  }

  @Test
  public void testSubmitWithInvalidHash() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();


    String planJson = plan.toJson();
    String planID = DigestUtils.sha1Hex(planJson);
    char repChar = planID.charAt(0);
    repChar++;

    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.INVALID_PLAN_HASH));
    balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
        1, PLAN_FILE, planJson, false);

  }

  /**
   * Test Cancel Plan.
   *
   * @throws Exception
   */
  @Test
  public void testCancelDiskBalancerPlan() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();


    // ask block mover to delay execution
    mockMoverHelper.getBlockMover().setSleep();
    executeSubmitPlan(plan, balancer);


    String planJson = plan.toJson();
    String planID = DigestUtils.sha1Hex(planJson);
    balancer.cancelPlan(planID);

    DiskBalancerWorkStatus status = balancer.queryWorkStatus();
    assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
        status.getResult());


    executeSubmitPlan(plan, balancer);

    // Send a Wrong cancellation request.
    char first = planID.charAt(0);
    first++;
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
        .Result.NO_SUCH_PLAN));
    balancer.cancelPlan(planID.replace(planID.charAt(0), first));

    // Now cancel the real one
    balancer.cancelPlan(planID);
    mockMoverHelper.getBlockMover().clearSleep(); // unblock mover.

    status = balancer.queryWorkStatus();
    assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
        status.getResult());

  }


  /**
   * Test Custom bandwidth.
   *
   * @throws Exception
   */
  @Test
  public void testCustomBandwidth() throws Exception {
    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
    NodePlan plan = mockMoverHelper.getPlan();
    DiskBalancer balancer = mockMoverHelper.getBalancer();

    for(Step step : plan.getVolumeSetPlans()){
      MoveStep tempStep = (MoveStep) step;
      tempStep.setBandwidth(100);
    }
    executeSubmitPlan(plan, balancer);
    DiskBalancerWorkStatus status = balancer
        .queryWorkStatus();
    assertNotNull(status);

    DiskBalancerWorkStatus.DiskBalancerWorkEntry entry =
        balancer.queryWorkStatus().getCurrentState().get(0);
    assertEquals(100L, entry.getWorkItem().getBandwidth());

  }


  @Before
  public void setUp() throws Exception {
    Configuration conf = new HdfsConfiguration();
    final int numStoragesPerDn = 2;
    cluster = new MiniDFSCluster
        .Builder(conf).numDataNodes(3)
        .storagesPerDatanode(numStoragesPerDn)
        .build();
    cluster.waitActive();
    dataNode = cluster.getDataNodes().get(0);
    FsDatasetSpi.FsVolumeReferences references = dataNode.getFSDataset()
        .getFsVolumeReferences();

    nodeID = dataNode.getDatanodeUuid();
    sourceName = references.get(0).getBaseURI().getPath();
    destName = references.get(1).getBaseURI().getPath();
    sourceUUID = references.get(0).getStorageID();
    destUUID = references.get(1).getStorageID();
    references.close();
  }

  @After
  public void tearDown() throws Exception {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  private void restartDataNode() throws IOException {
    if (cluster != null) {
      cluster.restartDataNode(0);
    }
  }

  /**
   * Allows us to control mover class for test purposes.
   */
  public static class TestMover implements DiskBalancer.BlockMover {

    private AtomicBoolean shouldRun;
    private FsDatasetSpi dataset;
    private int runCount;
    private volatile boolean sleepInCopyBlocks;
    private long delay;

    public TestMover(FsDatasetSpi dataset) {
      this.dataset = dataset;
      this.shouldRun = new AtomicBoolean(false);
    }

    public void setSleep() {
      sleepInCopyBlocks = true;
    }

    public void clearSleep() {
      sleepInCopyBlocks = false;
    }

    public void setDelay(long milliseconds) {
      this.delay = milliseconds;
    }

    /**
     * Copies blocks from a set of volumes.
     *
     * @param pair - Source and Destination Volumes.
     * @param item - Number of bytes to move from volumes.
     */
    @Override
    public void copyBlocks(DiskBalancer.VolumePair pair,
                           DiskBalancerWorkItem item) {

      try {
        // get stuck if we are asked to sleep.
        while (sleepInCopyBlocks) {
          if (!this.shouldRun()) {
            return;
          }
          Thread.sleep(10);
        }
        if (delay > 0) {
          Thread.sleep(delay);
        }
        synchronized (this) {
          if (shouldRun()) {
            runCount++;
          }
        }
      } catch (InterruptedException ex) {
        // A failure here can be safely ignored with no impact for tests.
        LOG.error(ex.toString());
      }
    }

    /**
     * Sets copyblocks into runnable state.
     */
    @Override
    public void setRunnable() {
      this.shouldRun.set(true);
    }

    /**
     * Signals copy block to exit.
     */
    @Override
    public void setExitFlag() {
      this.shouldRun.set(false);
    }

    /**
     * Returns the shouldRun boolean flag.
     */
    public boolean shouldRun() {
      return this.shouldRun.get();
    }

    @Override
    public FsDatasetSpi getDataset() {
      return this.dataset;
    }

    /**
     * Returns time when this plan started executing.
     *
     * @return Start time in milliseconds.
     */
    @Override
    public long getStartTime() {
      return 0;
    }

    /**
     * Number of seconds elapsed.
     *
     * @return time in seconds
     */
    @Override
    public long getElapsedSeconds() {
      return 0;
    }

    public int getRunCount() {
      synchronized (this) {
        LOG.info("Run count : " + runCount);
        return runCount;
      }
    }
  }

  private class MockMoverHelper {
    private DiskBalancer balancer;
    private NodePlan plan;
    private TestMover blockMover;

    public DiskBalancer getBalancer() {
      return balancer;
    }

    public NodePlan getPlan() {
      return plan;
    }

    public TestMover getBlockMover() {
      return blockMover;
    }

    public MockMoverHelper invoke() throws Exception {
      Configuration conf = new HdfsConfiguration();
      conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
      restartDataNode();

      blockMover = new TestMover(dataNode.getFSDataset());
      blockMover.setRunnable();

      balancer = new DiskBalancerBuilder(conf)
          .setMover(blockMover)
          .setNodeID(nodeID)
          .build();

      DiskBalancerCluster diskBalancerCluster = new DiskBalancerClusterBuilder()
          .setClusterSource("/diskBalancer/data-cluster-3node-3disk.json")
          .build();

      plan = new PlanBuilder(diskBalancerCluster, nodeID)
          .setPathMap(sourceName, destName)
          .setUUIDMap(sourceUUID, destUUID)
          .build();
      return this;
    }
  }

  private static class DiskBalancerBuilder {
    private TestMover blockMover;
    private Configuration conf;
    private String nodeID;

    public DiskBalancerBuilder(Configuration conf) {
      this.conf = conf;
    }

    public DiskBalancerBuilder setNodeID(String nodeID) {
      this.nodeID = nodeID;
      return this;
    }

    public DiskBalancerBuilder setConf(Configuration conf) {
      this.conf = conf;
      return this;
    }

    public DiskBalancerBuilder setMover(TestMover mover) {
      this.blockMover = mover;
      return this;
    }

    public DiskBalancerBuilder setRunnable() {
      blockMover.setRunnable();
      return this;
    }

    public DiskBalancer build() {
      Preconditions.checkNotNull(blockMover);
      return new DiskBalancer(nodeID, conf,
          blockMover);
    }
  }

  private static class DiskBalancerClusterBuilder {
    private String jsonFilePath;
    private Configuration conf;

    public DiskBalancerClusterBuilder setConf(Configuration conf) {
      this.conf = conf;
      return this;
    }

    public DiskBalancerClusterBuilder setClusterSource(String jsonFilePath)
        throws Exception {
      this.jsonFilePath = jsonFilePath;
      return this;
    }

    public DiskBalancerCluster build() throws Exception {
      DiskBalancerCluster diskBalancerCluster;
      URI clusterJson = getClass().getResource(jsonFilePath).toURI();
      ClusterConnector jsonConnector =
          ConnectorFactory.getCluster(clusterJson, conf);
      diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
      diskBalancerCluster.readClusterInfo();
      diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
      return diskBalancerCluster;
    }
  }

  private static class PlanBuilder {
    private String sourcePath;
    private String destPath;
    private String sourceUUID;
    private String destUUID;
    private DiskBalancerCluster balancerCluster;
    private String nodeID;

    public PlanBuilder(DiskBalancerCluster balancerCluster, String nodeID) {
      this.balancerCluster = balancerCluster;
      this.nodeID = nodeID;
    }

    public PlanBuilder setPathMap(String sourcePath, String destPath) {
      this.sourcePath = sourcePath;
      this.destPath = destPath;
      return this;
    }

    public PlanBuilder setUUIDMap(String sourceUUID, String destUUID) {
      this.sourceUUID = sourceUUID;
      this.destUUID = destUUID;
      return this;
    }

    public NodePlan build() throws Exception {
      final int dnIndex = 0;
      Preconditions.checkNotNull(balancerCluster);
      Preconditions.checkState(nodeID.length() > 0);

      DiskBalancerDataNode node = balancerCluster.getNodes().get(dnIndex);
      node.setDataNodeUUID(nodeID);
      GreedyPlanner planner = new GreedyPlanner(10.0f, node);
      NodePlan plan = new NodePlan(node.getDataNodeName(),
          node.getDataNodePort());
      planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
      setVolumeNames(plan);
      return plan;
    }

    private void setVolumeNames(NodePlan plan) {
      Iterator<Step> iter = plan.getVolumeSetPlans().iterator();
      while (iter.hasNext()) {
        MoveStep nextStep = (MoveStep) iter.next();
        nextStep.getSourceVolume().setPath(sourcePath);
        nextStep.getSourceVolume().setUuid(sourceUUID);
        nextStep.getDestinationVolume().setPath(destPath);
        nextStep.getDestinationVolume().setUuid(destUUID);
      }
    }

  }
}