TestDiskBalancerWithMockMover.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.server.diskbalancer;
import org.apache.hadoop.util.Preconditions;
import java.util.function.Supplier;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Tests diskbalancer with a mock mover.
*/
public class TestDiskBalancerWithMockMover {
static final Logger LOG =
LoggerFactory.getLogger(TestDiskBalancerWithMockMover.class);
@Rule
public ExpectedException thrown = ExpectedException.none();
private static final String PLAN_FILE = "/system/current.plan.json";
private MiniDFSCluster cluster;
private String sourceName;
private String destName;
private String sourceUUID;
private String destUUID;
private String nodeID;
private DataNode dataNode;
/**
* Checks that we return the right error if diskbalancer is not enabled.
*/
@Test
public void testDiskBalancerDisabled() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
restartDataNode();
TestMover blockMover = new TestMover(cluster.getDataNodes()
.get(0).getFSDataset());
DiskBalancer balancer = new DiskBalancerBuilder(conf)
.setMover(blockMover)
.build();
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.DISK_BALANCER_NOT_ENABLED));
balancer.queryWorkStatus();
}
/**
* Checks that Enable flag works correctly.
*
* @throws DiskBalancerException
*/
@Test
public void testDiskBalancerEnabled() throws DiskBalancerException {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
TestMover blockMover = new TestMover(cluster.getDataNodes()
.get(0).getFSDataset());
DiskBalancer balancer = new DiskBalancerBuilder(conf)
.setMover(blockMover)
.build();
DiskBalancerWorkStatus status = balancer.queryWorkStatus();
assertEquals(NO_PLAN, status.getResult());
}
private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer,
int version) throws IOException {
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
balancer.submitPlan(planID, version, PLAN_FILE, planJson, false);
}
private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
throws IOException {
executeSubmitPlan(plan, balancer, 1);
}
/**
* Test a second submit plan fails.
*
* @throws Exception
*/
@Test
public void testResubmitDiskBalancerPlan() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
// ask block mover to get stuck in copy block
mockMoverHelper.getBlockMover().setSleep();
executeSubmitPlan(plan, balancer);
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.PLAN_ALREADY_IN_PROGRESS));
executeSubmitPlan(plan, balancer);
// Not needed but this is the cleanup step.
mockMoverHelper.getBlockMover().clearSleep();
}
@Test
public void testSubmitDiskBalancerPlan() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
final DiskBalancer balancer = mockMoverHelper.getBalancer();
executeSubmitPlan(plan, balancer);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return balancer.queryWorkStatus().getResult() ==
DiskBalancerWorkStatus.Result.PLAN_DONE;
} catch (IOException ex) {
return false;
}
}
}, 1000, 100000);
// Asserts that submit plan caused an execution in the background.
assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);
}
@Test
public void testSubmitWithOlderPlan() throws Exception {
final long millisecondInAnHour = 1000 * 60 * 60L;
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
plan.setTimeStamp(Time.now() - (32 * millisecondInAnHour));
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.OLD_PLAN_SUBMITTED));
executeSubmitPlan(plan, balancer);
}
@Test
public void testSubmitWithOldInvalidVersion() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.INVALID_PLAN_VERSION));
// Plan version is invalid -- there is no version 0.
executeSubmitPlan(plan, balancer, 0);
}
@Test
public void testSubmitWithNullPlan() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.INVALID_PLAN));
balancer.submitPlan(planID, 1, "no-plan-file.json", null, false);
}
@Test
public void testSubmitWithInvalidHash() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
char repChar = planID.charAt(0);
repChar++;
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.INVALID_PLAN_HASH));
balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
1, PLAN_FILE, planJson, false);
}
/**
* Test Cancel Plan.
*
* @throws Exception
*/
@Test
public void testCancelDiskBalancerPlan() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
// ask block mover to delay execution
mockMoverHelper.getBlockMover().setSleep();
executeSubmitPlan(plan, balancer);
String planJson = plan.toJson();
String planID = DigestUtils.sha1Hex(planJson);
balancer.cancelPlan(planID);
DiskBalancerWorkStatus status = balancer.queryWorkStatus();
assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
status.getResult());
executeSubmitPlan(plan, balancer);
// Send a Wrong cancellation request.
char first = planID.charAt(0);
first++;
thrown.expect(DiskBalancerException.class);
thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
.Result.NO_SUCH_PLAN));
balancer.cancelPlan(planID.replace(planID.charAt(0), first));
// Now cancel the real one
balancer.cancelPlan(planID);
mockMoverHelper.getBlockMover().clearSleep(); // unblock mover.
status = balancer.queryWorkStatus();
assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
status.getResult());
}
/**
* Test Custom bandwidth.
*
* @throws Exception
*/
@Test
public void testCustomBandwidth() throws Exception {
MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
NodePlan plan = mockMoverHelper.getPlan();
DiskBalancer balancer = mockMoverHelper.getBalancer();
for(Step step : plan.getVolumeSetPlans()){
MoveStep tempStep = (MoveStep) step;
tempStep.setBandwidth(100);
}
executeSubmitPlan(plan, balancer);
DiskBalancerWorkStatus status = balancer
.queryWorkStatus();
assertNotNull(status);
DiskBalancerWorkStatus.DiskBalancerWorkEntry entry =
balancer.queryWorkStatus().getCurrentState().get(0);
assertEquals(100L, entry.getWorkItem().getBandwidth());
}
@Before
public void setUp() throws Exception {
Configuration conf = new HdfsConfiguration();
final int numStoragesPerDn = 2;
cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3)
.storagesPerDatanode(numStoragesPerDn)
.build();
cluster.waitActive();
dataNode = cluster.getDataNodes().get(0);
FsDatasetSpi.FsVolumeReferences references = dataNode.getFSDataset()
.getFsVolumeReferences();
nodeID = dataNode.getDatanodeUuid();
sourceName = references.get(0).getBaseURI().getPath();
destName = references.get(1).getBaseURI().getPath();
sourceUUID = references.get(0).getStorageID();
destUUID = references.get(1).getStorageID();
references.close();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
private void restartDataNode() throws IOException {
if (cluster != null) {
cluster.restartDataNode(0);
}
}
/**
* Allows us to control mover class for test purposes.
*/
public static class TestMover implements DiskBalancer.BlockMover {
private AtomicBoolean shouldRun;
private FsDatasetSpi dataset;
private int runCount;
private volatile boolean sleepInCopyBlocks;
private long delay;
public TestMover(FsDatasetSpi dataset) {
this.dataset = dataset;
this.shouldRun = new AtomicBoolean(false);
}
public void setSleep() {
sleepInCopyBlocks = true;
}
public void clearSleep() {
sleepInCopyBlocks = false;
}
public void setDelay(long milliseconds) {
this.delay = milliseconds;
}
/**
* Copies blocks from a set of volumes.
*
* @param pair - Source and Destination Volumes.
* @param item - Number of bytes to move from volumes.
*/
@Override
public void copyBlocks(DiskBalancer.VolumePair pair,
DiskBalancerWorkItem item) {
try {
// get stuck if we are asked to sleep.
while (sleepInCopyBlocks) {
if (!this.shouldRun()) {
return;
}
Thread.sleep(10);
}
if (delay > 0) {
Thread.sleep(delay);
}
synchronized (this) {
if (shouldRun()) {
runCount++;
}
}
} catch (InterruptedException ex) {
// A failure here can be safely ignored with no impact for tests.
LOG.error(ex.toString());
}
}
/**
* Sets copyblocks into runnable state.
*/
@Override
public void setRunnable() {
this.shouldRun.set(true);
}
/**
* Signals copy block to exit.
*/
@Override
public void setExitFlag() {
this.shouldRun.set(false);
}
/**
* Returns the shouldRun boolean flag.
*/
public boolean shouldRun() {
return this.shouldRun.get();
}
@Override
public FsDatasetSpi getDataset() {
return this.dataset;
}
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
@Override
public long getStartTime() {
return 0;
}
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
@Override
public long getElapsedSeconds() {
return 0;
}
public int getRunCount() {
synchronized (this) {
LOG.info("Run count : " + runCount);
return runCount;
}
}
}
private class MockMoverHelper {
private DiskBalancer balancer;
private NodePlan plan;
private TestMover blockMover;
public DiskBalancer getBalancer() {
return balancer;
}
public NodePlan getPlan() {
return plan;
}
public TestMover getBlockMover() {
return blockMover;
}
public MockMoverHelper invoke() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
restartDataNode();
blockMover = new TestMover(dataNode.getFSDataset());
blockMover.setRunnable();
balancer = new DiskBalancerBuilder(conf)
.setMover(blockMover)
.setNodeID(nodeID)
.build();
DiskBalancerCluster diskBalancerCluster = new DiskBalancerClusterBuilder()
.setClusterSource("/diskBalancer/data-cluster-3node-3disk.json")
.build();
plan = new PlanBuilder(diskBalancerCluster, nodeID)
.setPathMap(sourceName, destName)
.setUUIDMap(sourceUUID, destUUID)
.build();
return this;
}
}
private static class DiskBalancerBuilder {
private TestMover blockMover;
private Configuration conf;
private String nodeID;
public DiskBalancerBuilder(Configuration conf) {
this.conf = conf;
}
public DiskBalancerBuilder setNodeID(String nodeID) {
this.nodeID = nodeID;
return this;
}
public DiskBalancerBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public DiskBalancerBuilder setMover(TestMover mover) {
this.blockMover = mover;
return this;
}
public DiskBalancerBuilder setRunnable() {
blockMover.setRunnable();
return this;
}
public DiskBalancer build() {
Preconditions.checkNotNull(blockMover);
return new DiskBalancer(nodeID, conf,
blockMover);
}
}
private static class DiskBalancerClusterBuilder {
private String jsonFilePath;
private Configuration conf;
public DiskBalancerClusterBuilder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public DiskBalancerClusterBuilder setClusterSource(String jsonFilePath)
throws Exception {
this.jsonFilePath = jsonFilePath;
return this;
}
public DiskBalancerCluster build() throws Exception {
DiskBalancerCluster diskBalancerCluster;
URI clusterJson = getClass().getResource(jsonFilePath).toURI();
ClusterConnector jsonConnector =
ConnectorFactory.getCluster(clusterJson, conf);
diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
diskBalancerCluster.readClusterInfo();
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
return diskBalancerCluster;
}
}
private static class PlanBuilder {
private String sourcePath;
private String destPath;
private String sourceUUID;
private String destUUID;
private DiskBalancerCluster balancerCluster;
private String nodeID;
public PlanBuilder(DiskBalancerCluster balancerCluster, String nodeID) {
this.balancerCluster = balancerCluster;
this.nodeID = nodeID;
}
public PlanBuilder setPathMap(String sourcePath, String destPath) {
this.sourcePath = sourcePath;
this.destPath = destPath;
return this;
}
public PlanBuilder setUUIDMap(String sourceUUID, String destUUID) {
this.sourceUUID = sourceUUID;
this.destUUID = destUUID;
return this;
}
public NodePlan build() throws Exception {
final int dnIndex = 0;
Preconditions.checkNotNull(balancerCluster);
Preconditions.checkState(nodeID.length() > 0);
DiskBalancerDataNode node = balancerCluster.getNodes().get(dnIndex);
node.setDataNodeUUID(nodeID);
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
NodePlan plan = new NodePlan(node.getDataNodeName(),
node.getDataNodePort());
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
setVolumeNames(plan);
return plan;
}
private void setVolumeNames(NodePlan plan) {
Iterator<Step> iter = plan.getVolumeSetPlans().iterator();
while (iter.hasNext()) {
MoveStep nextStep = (MoveStep) iter.next();
nextStep.getSourceVolume().setPath(sourcePath);
nextStep.getSourceVolume().setUuid(sourceUUID);
nextStep.getDestinationVolume().setPath(destPath);
nextStep.getDestinationVolume().setUuid(destUUID);
}
}
}
}