TestDiskBalancerRPC.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.hdfs.server.diskbalancer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.Result;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
import static org.junit.Assert.assertEquals;

/**
 * Test DiskBalancer RPC.
 */
public class TestDiskBalancerRPC {
  @Rule
  public ExpectedException thrown = ExpectedException.none();

  private static final String PLAN_FILE = "/system/current.plan.json";
  private MiniDFSCluster cluster;
  private Configuration conf;

  @Before
  public void setUp() throws Exception {
    conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
    cluster.waitActive();
  }

  @After
  public void tearDown() throws Exception {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  @Test
  public void testSubmitPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();
    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
  }

  @Test
  public void testSubmitPlanWithInvalidHash() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    char[] hashArray = planHash.toCharArray();
    hashArray[0]++;
    planHash = String.valueOf(hashArray);
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
  }

  @Test
  public void testSubmitPlanWithInvalidVersion() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    planVersion++;
    NodePlan plan = rpcTestHelper.getPlan();
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
  }

  @Test
  public void testSubmitPlanWithInvalidPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
    dataNode.submitDiskBalancerPlan(planHash, planVersion, "", "",
        false);
  }

  @Test
  public void testCancelPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();
    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
    dataNode.cancelDiskBalancePlan(planHash);
  }

  @Test
  public void testCancelNonExistentPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    char[] hashArray= planHash.toCharArray();
    hashArray[0]++;
    planHash = String.valueOf(hashArray);
    NodePlan plan = rpcTestHelper.getPlan();
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
    dataNode.cancelDiskBalancePlan(planHash);
  }

  @Test
  public void testCancelEmptyPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = "";
    NodePlan plan = rpcTestHelper.getPlan();
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
    dataNode.cancelDiskBalancePlan(planHash);
  }

  @Test
  public void testGetDiskBalancerVolumeMapping() throws Exception {
    final int dnIndex = 0;
    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
    String volumeNameJson = dataNode.getDiskBalancerSetting(
        DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
    Assert.assertNotNull(volumeNameJson);
    ObjectMapper mapper = new ObjectMapper();

    @SuppressWarnings("unchecked")
    Map<String, String> volumemap =
        mapper.readValue(volumeNameJson, HashMap.class);

    Assert.assertEquals(2, volumemap.size());
  }

  @Test
  public void testGetDiskBalancerInvalidSetting() throws Exception {
    final int dnIndex = 0;
    final String invalidSetting = "invalidSetting";
    DataNode dataNode = cluster.getDataNodes().get(dnIndex);
    thrown.expect(DiskBalancerException.class);
    thrown.expect(new DiskBalancerResultVerifier(Result.UNKNOWN_KEY));
    dataNode.getDiskBalancerSetting(invalidSetting);
  }

  @Test
  public void testGetDiskBalancerBandwidth() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();

    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
    String bandwidthString = dataNode.getDiskBalancerSetting(
        DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
    long value = Long.decode(bandwidthString);
    Assert.assertEquals(10L, value);
  }

  @Test
  public void testQueryPlan() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();
    String planHash = rpcTestHelper.getPlanHash();
    int planVersion = rpcTestHelper.getPlanVersion();
    NodePlan plan = rpcTestHelper.getPlan();

    dataNode.submitDiskBalancerPlan(planHash, planVersion, PLAN_FILE,
        plan.toJson(), false);
    DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
    Assert.assertTrue(status.getResult() == PLAN_UNDER_PROGRESS ||
        status.getResult() == PLAN_DONE);
  }

  @Test
  public void testQueryPlanWithoutSubmit() throws Exception {
    RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
    DataNode dataNode = rpcTestHelper.getDataNode();

    DiskBalancerWorkStatus status = dataNode.queryDiskBalancerPlan();
    Assert.assertTrue(status.getResult() == NO_PLAN);
  }

  @Test
  public void testMoveBlockAcrossVolume() throws Exception {
    Configuration conf = new HdfsConfiguration();
    final int defaultBlockSize = 100;
    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
    String fileName = "/tmp.txt";
    Path filePath = new Path(fileName);
    final int numDatanodes = 1;
    final int dnIndex = 0;
    cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(numDatanodes).build();
    FsVolumeImpl source = null;
    FsVolumeImpl dest = null;
    try {
      cluster.waitActive();
      Random r = new Random();
      FileSystem fs = cluster.getFileSystem(dnIndex);
      DFSTestUtil.createFile(fs, filePath, 10 * 1024,
          (short) 1, r.nextLong());
      DataNode dnNode = cluster.getDataNodes().get(dnIndex);
      FsDatasetSpi.FsVolumeReferences refs =
          dnNode.getFSDataset().getFsVolumeReferences();
      try {
        source = (FsVolumeImpl) refs.get(0);
        dest = (FsVolumeImpl) refs.get(1);
        DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
            source, dest);
        assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false));
      } finally {
        refs.close();
      }
    } finally {
      cluster.shutdown();
    }
  }


  private class RpcTestHelper {
    private NodePlan plan;
    private int planVersion;
    private DataNode dataNode;
    private String planHash;

    public NodePlan getPlan() {
      return plan;
    }

    public int getPlanVersion() {
      return planVersion;
    }

    public DataNode getDataNode() {
      return dataNode;
    }

    public String getPlanHash() {
      return planHash;
    }

    public RpcTestHelper invoke() throws Exception {
      final int dnIndex = 0;
      cluster.restartDataNode(dnIndex);
      cluster.waitActive();
      ClusterConnector nameNodeConnector =
          ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);

      DiskBalancerCluster diskBalancerCluster =
          new DiskBalancerCluster(nameNodeConnector);
      diskBalancerCluster.readClusterInfo();
      Assert.assertEquals(cluster.getDataNodes().size(),
          diskBalancerCluster.getNodes().size());
      diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
      dataNode = cluster.getDataNodes().get(dnIndex);
      DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
          dataNode.getDatanodeUuid());
      GreedyPlanner planner = new GreedyPlanner(10.0f, node);
      plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort());
      planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
      planVersion = 1;
      planHash = DigestUtils.sha1Hex(plan.toJson());
      return this;
    }
  }
}