TestMover.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.mover;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KEYTAB_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
public class TestMover {
private static final Logger LOG = LoggerFactory.getLogger(TestMover.class);
private static final int DEFAULT_BLOCK_SIZE = 100;
private File keytabFile;
private String principal;
static {
TestBalancer.initTestSetup();
}
static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
}
static Mover newMover(Configuration conf) throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
final List<NameNodeConnector> nncs = NameNodeConnector.
newNameNodeConnectors(nnMap, Mover.class.getSimpleName(),
HdfsServerConstants.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>());
}
@Test
public void testScheduleSameBlock() throws IOException {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(4).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testScheduleSameBlock/file";
{
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testScheduleSameBlock");
out.close();
}
final Mover mover = newMover(conf);
mover.init();
final Mover.Processor processor = mover.new Processor();
final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
final List<MLocation> locations = MLocation.toLocations(lb);
final MLocation ml = locations.get(0);
final DBlock db = mover.newDBlock(lb, locations, null);
final List<StorageType> storageTypes = new ArrayList<StorageType>(
Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
Assert.assertTrue(processor.scheduleMoveReplica(db, ml, storageTypes));
Assert.assertFalse(processor.scheduleMoveReplica(db, ml, storageTypes));
} finally {
cluster.shutdown();
}
}
private void testMovementWithLocalityOption(Configuration conf,
boolean sameNode) throws Exception {
final MiniDFSCluster cluster;
if (sameNode) {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
.build();
} else {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK}, {StorageType.ARCHIVE}})
.build();
}
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testScheduleWithinSameNode/file";
Path dir = new Path("/testScheduleWithinSameNode");
dfs.mkdirs(dir);
// write to DISK
dfs.setStoragePolicy(dir, "HOT");
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testScheduleWithinSameNode");
out.close();
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// move to ARCHIVE
dfs.setStoragePolicy(dir, "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", dir.toString()});
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
// Wait till namenode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs, file, sameNode ? 3 : 1);
MetricsRecordBuilder rb =
getMetrics(cluster.getDataNodes().get(1).getMetrics().name());
if (!sameNode) {
testReplaceBlockOpLocalityMetrics(0, 0, 1, rb);
} else if (conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false)) {
testReplaceBlockOpLocalityMetrics(1, 1, 0, rb);
} else {
testReplaceBlockOpLocalityMetrics(1, 0, 0, rb);
}
} finally {
cluster.shutdown();
}
}
private void testReplaceBlockOpLocalityMetrics(
long sameHost,
long sameMount,
long otherHost,
MetricsRecordBuilder rb) {
assertCounter("ReplaceBlockOpOnSameHost",
sameHost, rb);
assertCounter("ReplaceBlockOpOnSameMount",
sameMount, rb);
assertCounter("ReplaceBlockOpToOtherHost",
otherHost, rb);
}
private void setupStoragePoliciesAndPaths(DistributedFileSystem dfs1,
DistributedFileSystem dfs2,
Path dir, String file)
throws Exception {
dfs1.mkdirs(dir);
dfs2.mkdirs(dir);
//Write to DISK on nn1
dfs1.setStoragePolicy(dir, "HOT");
FSDataOutputStream out = dfs1.create(new Path(file));
out.writeChars("testScheduleWithinSameNode");
out.close();
//Write to Archive on nn2
dfs2.setStoragePolicy(dir, "COLD");
out = dfs2.create(new Path(file));
out.writeChars("testScheduleWithinSameNode");
out.close();
//verify before movement
LocatedBlock lb = dfs1.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
//verify before movement
lb = dfs2.getClient().getLocatedBlocks(file, 0).get(0);
storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.ARCHIVE == storageType);
}
}
private void waitForLocatedBlockWithDiskStorageType(
final DistributedFileSystem dfs, final String file,
int expectedDiskCount) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int diskCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (StorageType.DISK == storageType) {
diskCount++;
}
}
LOG.info("Archive replica count, expected={} and actual={}",
expectedDiskCount, diskCount);
return expectedDiskCount == diskCount;
}
}, 100, 3000);
}
@Test(timeout = 300000)
public void testWithFederateClusterWithinSameNode() throws
Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(4).storageTypes( new StorageType[] {StorageType.DISK,
StorageType.ARCHIVE}).nnTopology(MiniDFSNNTopology
.simpleFederatedTopology(2)).build();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
cluster.waitActive();
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = cluster.getFileSystem(0);
final DistributedFileSystem dfs2 = cluster.getFileSystem(1);
URI nn1 = dfs1.getUri();
URI nn2 = dfs2.getUri();
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
// move to ARCHIVE
dfs1.setStoragePolicy(dir, "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString()});
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
//move to DISK
dfs2.setStoragePolicy(dir, "HOT");
rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
// Wait till namenode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWithFederatedCluster() throws Exception{
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf)
.storageTypes(new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE})
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(4).build();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
cluster.waitActive();
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = cluster.getFileSystem(0);
final DistributedFileSystem dfs2 = cluster.getFileSystem(1);
URI nn1 = dfs1.getUri();
URI nn2 = dfs2.getUri();
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
//Changing storage policies
dfs1.setStoragePolicy(dir, "COLD");
dfs2.setStoragePolicy(dir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWithFederatedHACluster() throws Exception{
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf)
.storageTypes(new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE})
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(4).build();
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
cluster.transitionToActive(0);
cluster.transitionToActive(2);
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
.get(nn1, conf);
final DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem
.get(nn2, conf);
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
//Changing Storage Policies
dfs1.setStoragePolicy(dir, "COLD");
dfs2.setStoragePolicy(dir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
private void waitForLocatedBlockWithArchiveStorageType(
final DistributedFileSystem dfs, final String file,
int expectedArchiveCount) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int archiveCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (StorageType.ARCHIVE == storageType) {
archiveCount++;
}
}
LOG.info("Archive replica count, expected={} and actual={}",
expectedArchiveCount, archiveCount);
return expectedArchiveCount == archiveCount;
}
}, 100, 3000);
}
/**
* Test block movement with different block locality scenarios.
* 1) Block will be copied to local host,
* if there is target storage type on same datanode.
* 2) Block will be moved within local mount with hardlink,
* if disk/archive are on same mount with same-disk-tiering feature on.
* 3) Block will be moved to another datanode,
* if there is no available target storage type on local datanode.
*/
@Test
public void testScheduleBlockLocality() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
testMovementWithLocalityOption(conf, true);
// Test movement with hardlink, when same disk tiering is enabled.
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
conf.setDouble(DFSConfigKeys
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5);
testMovementWithLocalityOption(conf, true);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, false);
testMovementWithLocalityOption(conf, false);
}
private void checkMovePaths(List<Path> actual, Path... expected) {
Assert.assertEquals(expected.length, actual.size());
for (Path p : expected) {
Assert.assertTrue(actual.contains(p));
}
}
/**
* Test Mover Cli by specifying a list of files/directories using option "-p".
* There is only one namenode (and hence name service) specified in the conf.
*/
@Test
public void testMoverCli() throws Exception {
final Configuration clusterConf = new HdfsConfiguration();
clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(clusterConf).numDataNodes(0).build();
try {
final Configuration conf = cluster.getConfiguration(0);
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "bar");
Assert.fail("Expected exception for illegal path bar");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("bar is not absolute", e);
}
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn));
Assert.assertNull(movePaths.get(nn));
movePaths = Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo", "/bar");
namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, movePaths.size());
nn = namenodes.iterator().next();
Assert.assertTrue(movePaths.containsKey(nn));
checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithHAConf() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0).build();
HATestUtil.setFailoverConfigurations(cluster, conf, "MyCluster");
try {
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", "/foo", "/bar");
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Assert.assertEquals(1, movePaths.size());
URI nn = namenodes.iterator().next();
Assert.assertEquals(new URI("hdfs://MyCluster"), nn);
Assert.assertTrue(movePaths.containsKey(nn));
checkMovePaths(movePaths.get(nn), new Path("/foo"), new Path("/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithFederation() throws Exception {
final Configuration clusterConf = new HdfsConfiguration();
clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(clusterConf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
.numDataNodes(0).build();
final Configuration conf = new HdfsConfiguration();
clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "/foo");
Assert.fail("Expect exception for missing authority information");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"does not contain scheme and authority", e);
}
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "hdfs:///foo");
Assert.fail("Expect exception for missing authority information");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains(
"does not contain scheme and authority", e);
}
try {
Mover.Cli.getNameNodePathsToMove(conf, "-p", "wrong-hdfs://ns1/foo");
Assert.fail("Expect exception for wrong scheme");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot resolve the path", e);
}
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar");
Assert.assertEquals(2, movePaths.size());
checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar"));
checkMovePaths(movePaths.get(nn2), new Path("/foo/bar"));
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverCliWithFederationHA() throws Exception {
final Configuration clusterConf = new HdfsConfiguration();
clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(clusterConf)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3))
.numDataNodes(0).build();
final Configuration conf = new HdfsConfiguration();
clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(3, namenodes.size());
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
URI nn3 = iter.next();
Map<URI, List<Path>> movePaths = Mover.Cli.getNameNodePathsToMove(conf,
"-p", nn1 + "/foo", nn1 + "/bar", nn2 + "/foo/bar", nn3 + "/foobar");
Assert.assertEquals(3, movePaths.size());
checkMovePaths(movePaths.get(nn1), new Path("/foo"), new Path("/bar"));
checkMovePaths(movePaths.get(nn2), new Path("/foo/bar"));
checkMovePaths(movePaths.get(nn3), new Path("/foobar"));
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] { { StorageType.DISK, StorageType.ARCHIVE },
{ StorageType.DISK, StorageType.DISK },
{ StorageType.DISK, StorageType.ARCHIVE } }).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testForTwoReplicaSameStorageTypeShouldNotSelect";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testForTwoReplicaSameStorageTypeShouldNotSelect");
out.close();
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", file.toString() });
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
// Wait till namenode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs, file, 2);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] { { StorageType.DISK }, { StorageType.DISK },
{ StorageType.DISK } }).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWhenStoragePolicyNotSatisfying";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testMoveWhenStoragePolicyNotSatisfying");
out.close();
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", file.toString() });
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK}, {StorageType.DISK},
{StorageType.DISK}}).build();
try {
cluster.waitActive();
// Simulate External sps by creating #getNameNodeConnector instance.
DFSTestUtil.getNameNodeConnector(conf, HdfsServerConstants.MOVER_ID_PATH,
1, true);
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWhenStoragePolicySatisfierIsRunning";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testMoveWhenStoragePolicySatisfierIsRunning");
out.close();
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
int exitcode = ExitStatus.IO_EXCEPTION.getExitCode();
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}}).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverFailedRetry";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testMoverFailedRetry");
out.close();
// Delete block file so, block move will fail with FileNotFoundException
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement should fail after some retry",
ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
@Test(timeout=100000)
public void testBalancerMaxIterationTimeNotAffectMover() throws Exception {
long blockSize = 10*1024*1024;
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1);
// set a fairly large block size to run into the limitation
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
// set a somewhat grater than zero max iteration time to have the move time
// to surely exceed it
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L);
conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1);
// set client socket timeout to have an IN_PROGRESS notification back from
// the DataNode about the copy in every second.
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
try {
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
final String file = "/testMaxIterationTime.dat";
final Path path = new Path(file);
short rep_factor = 1;
int seed = 0xFAFAFA;
// write to DISK
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed);
// move to ARCHIVE
fs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file});
Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).",
ExitStatus.SUCCESS.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private final int dataBlocks = ecPolicy.getNumDataUnits();
private final int parityBlocks = ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize();
private final int stripesPerBlock = 4;
private final int defaultBlockSize = cellSize * stripesPerBlock;
void initConfWithStripe(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.NONE.toString());
}
@Test(timeout = 300000)
public void testMoverWithStripedFile() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConfWithStripe(conf);
// start 10 datanodes
int numOfDatanodes =10;
int storagesPerDatanode=2;
long capacity = 10 * defaultBlockSize;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
for (int i = 0; i < numOfDatanodes; i++) {
for(int j=0;j<storagesPerDatanode;j++){
capacities[i][j]=capacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
.storageTypes(new StorageType[][]{
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}})
.storageCapacities(capacities)
.build();
try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
// set "/bar" directory with HOT storage policy.
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
String barDir = "/bar";
client.mkdirs(barDir, new FsPermission((short) 777), true);
client.setStoragePolicy(barDir,
HdfsConstants.HOT_STORAGE_POLICY_NAME);
// set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
// write file to barDir
final String fooFile = "/bar/foo";
long fileLen = 20 * defaultBlockSize;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
fileLen,(short) 3, 0);
// verify storage types and locations
LocatedBlocks locatedBlocks =
client.getBlockLocations(fooFile, 0, fileLen);
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
for( StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.DISK, type);
}
}
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// start 5 more datanodes
numOfDatanodes +=5;
capacities = new long[5][storagesPerDatanode];
for (int i = 0; i < 5; i++) {
for(int j=0;j<storagesPerDatanode;j++){
capacities[i][j]=capacity;
}
}
cluster.startDataNodes(conf, 5,
new StorageType[][]{
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}},
true, null, null, null, capacities, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();
// move file to ARCHIVE
client.setStoragePolicy(barDir, "COLD");
// run Mover
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", barDir });
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
// Verify storage types and locations.
// Wait until Namenode confirms ARCHIVE storage type for all blocks of
// fooFile.
waitForUpdatedStorageType(client, fooFile, fileLen, StorageType.ARCHIVE);
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// start 5 more datanodes
numOfDatanodes += 5;
capacities = new long[5][storagesPerDatanode];
for (int i = 0; i < 5; i++) {
for (int j = 0; j < storagesPerDatanode; j++) {
capacities[i][j] = capacity;
}
}
cluster.startDataNodes(conf, 5,
new StorageType[][] { { StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK },
{ StorageType.SSD, StorageType.DISK } },
true, null, null, null, capacities, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();
// move file blocks to ONE_SSD policy
client.setStoragePolicy(barDir, "ONE_SSD");
// run Mover
rc = ToolRunner.run(conf, new Mover.Cli(), new String[] { "-p", barDir });
// verify storage types and locations
// Movements should have been ignored for the unsupported policy on
// striped file
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (StorageType type : lb.getStorageTypes()) {
Assert.assertEquals(StorageType.ARCHIVE, type);
}
}
}finally{
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testMoverWithStripedFileMaintenance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConfWithStripe(conf);
// Start 9 datanodes
int numOfDatanodes = 9;
int storagesPerDatanode = 2;
long capacity = 9 * defaultBlockSize;
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
for (int i = 0; i < numOfDatanodes; i++) {
for(int j = 0; j < storagesPerDatanode; j++){
capacities[i][j] = capacity;
}
}
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.storagesPerDatanode(storagesPerDatanode)
.storageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD},
{StorageType.SSD, StorageType.SSD}})
.storageCapacities(capacities)
.build();
try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
String barDir = "/bar";
client.mkdirs(barDir, new FsPermission((short) 777), true);
// Set "/bar" directory with ALL_SSD storage policy.
client.setStoragePolicy(barDir, "ALL_SSD");
// Set an EC policy on "/bar" directory
client.setErasureCodingPolicy(barDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
// Write file to barDir
final String fooFile = "/bar/foo";
long fileLen = 6 * defaultBlockSize;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
fileLen, (short) 3, 0);
// Verify storage types and locations
LocatedBlocks locatedBlocks =
client.getBlockLocations(fooFile, 0, fileLen);
DatanodeInfoWithStorage location = null;
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
location = lb.getLocations()[8];
for(StorageType type : lb.getStorageTypes()){
Assert.assertEquals(StorageType.SSD, type);
}
}
// Maintain the last datanode later
FSNamesystem ns = cluster.getNamesystem(0);
DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager();
DatanodeDescriptor dn = datanodeManager.getDatanode(location.getDatanodeUuid());
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
dataBlocks + parityBlocks);
// Start 5 more datanodes for mover
capacities = new long[5][storagesPerDatanode];
for (int i = 0; i < 5; i++) {
for(int j = 0; j < storagesPerDatanode; j++){
capacities[i][j] = capacity;
}
}
cluster.startDataNodes(conf, 5,
new StorageType[][]{
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}},
true, null, null, null, capacities,
null, false, false, false, null, null, null);
cluster.triggerHeartbeats();
// Move blocks to DISK
client.setStoragePolicy(barDir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[]{"-p", barDir});
// Verify the number of DISK storage types
waitForLocatedBlockWithDiskStorageType(cluster.getFileSystem(), fooFile, 5);
// Maintain a datanode that simulates that one node in the location list
// is in ENTERING_MAINTENANCE status.
datanodeManager.getDatanode(dn.getDatanodeUuid()).startMaintenance();
waitNodeState(dn, DatanodeInfo.AdminStates.ENTERING_MAINTENANCE);
// Move blocks back to SSD.
// Without HDFS-17599, locations and indices lengths might not match,
// resulting in getting the wrong blockId in DBlockStriped#getInternalBlock,
// and mover will fail to run.
client.setStoragePolicy(barDir, "ALL_SSD");
rc = ToolRunner.run(conf, new Mover.Cli(),
new String[]{"-p", barDir});
Assert.assertEquals("Movement to HOT should be successful", 0, rc);
} finally {
cluster.shutdown();
}
}
/**
* Wait till DataNode is transitioned to the expected state.
*/
protected void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
waitNodeState(Lists.newArrayList(node), state);
}
/**
* Wait till all DataNodes are transitioned to the expected state.
*/
protected void waitNodeState(List<DatanodeInfo> nodes, DatanodeInfo.AdminStates state) {
for (DatanodeInfo node : nodes) {
boolean done = (state == node.getAdminState());
while (!done) {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(DFS_HEARTBEAT_INTERVAL_DEFAULT * 10);
} catch (InterruptedException e) {
// nothing
}
done = (state == node.getAdminState());
}
LOG.info("node " + node + " reached the state " + state);
}
}
/**
* Wait until Namenode reports expected storage type for all blocks of
* given file.
*
* @param client handle all RPC calls to Namenode.
* @param file file for which we are expecting same storage type of all
* located blocks.
* @param fileLen length of the file.
* @param expectedStorageType storage type to expect for all blocks of the
* given file.
* @throws TimeoutException if the wait timed out.
* @throws InterruptedException if interrupted while waiting for the response.
*/
private void waitForUpdatedStorageType(ClientProtocol client, String file,
long fileLen, StorageType expectedStorageType)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
LocatedBlocks blocks;
try {
blocks = client.getBlockLocations(file, 0, fileLen);
} catch (IOException e) {
throw new RuntimeException(e);
}
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
for (StorageType type : lb.getStorageTypes()) {
if (!expectedStorageType.equals(type)) {
LOG.info("Block {} has StorageType: {}. It might not have been "
+ "updated yet, awaiting the latest update.",
lb.getBlock().toString(), type);
return false;
}
}
}
return true;
}, 500, 5000, "Blocks storage type must be ARCHIVE");
}
private void initSecureConf(Configuration conf) throws Exception {
String username = "mover";
File baseDir = GenericTestUtils.getTestDir(TestMover.class.getSimpleName());
FileUtil.fullyDelete(baseDir);
Assert.assertTrue(baseDir.mkdirs());
Properties kdcConf = MiniKdc.createConf();
MiniKdc kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
SecurityUtil.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
KerberosName.resetDefaultRealm();
Assert.assertTrue("Expected configuration to enable security",
UserGroupInformation.isSecurityEnabled());
keytabFile = new File(baseDir, username + ".keytab");
String keytab = keytabFile.getAbsolutePath();
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
principal = username + "/" + krbInstance + "@" + kdc.getRealm();
String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
"HTTP/" + krbInstance);
conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
conf.setBoolean(DFS_MOVER_KEYTAB_ENABLED_KEY, true);
conf.set(DFS_MOVER_ADDRESS_KEY, "localhost:0");
conf.set(DFS_MOVER_KEYTAB_FILE_KEY, keytab);
conf.set(DFS_MOVER_KERBEROS_PRINCIPAL_KEY, principal);
String keystoresDir = baseDir.getAbsolutePath();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestMover.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
KeyStoreTestUtil.getClientSSLConfigFileName());
conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
KeyStoreTestUtil.getServerSSLConfigFileName());
initConf(conf);
}
/**
* Test Mover runs fine when logging in with a keytab in kerberized env.
* Reusing testMovementWithLocalityOption
* here for basic functionality testing.
*/
@Test(timeout = 300000)
public void testMoverWithKeytabs() throws Exception {
final Configuration conf = new HdfsConfiguration();
try {
initSecureConf(conf);
final UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal,
keytabFile.getAbsolutePath());
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// verify that mover runs Ok.
testMovementWithLocalityOption(conf, true);
// verify that UGI was logged in using keytab.
Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
return null;
}
});
} finally {
// Reset UGI so that other tests are not affected.
UserGroupInformation.reset();
UserGroupInformation.setConfiguration(new Configuration());
}
}
/**
* Test to verify that mover can't move pinned blocks.
*/
@Test(timeout = 90000)
public void testMoverWithPinnedBlocks() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
// Sets bigger retry max attempts value so that test case will timed out if
// block pinning errors are not handled properly during block movement.
conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 10000);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverWithPinnedBlocks/file";
Path dir = new Path("/testMoverWithPinnedBlocks");
dfs.mkdirs(dir);
// write to DISK
dfs.setStoragePolicy(dir, "HOT");
final FSDataOutputStream out = dfs.create(new Path(file));
byte[] fileData = StripedFileTestUtil
.generateBytes(DEFAULT_BLOCK_SIZE * 3);
out.write(fileData);
out.close();
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
// Adding one SSD based data node to the cluster.
StorageType[][] newtypes = new StorageType[][] {{StorageType.SSD}};
startAdditionalDNs(conf, 1, newtypes, cluster);
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
DataNode dn = cluster.getDataNodes().get(i);
LOG.info("Simulate block pinning in datanode {}", dn);
InternalDataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
}
// move file blocks to ONE_SSD policy
dfs.setStoragePolicy(dir, "ONE_SSD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", dir.toString()});
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
Assert.assertEquals("Movement should fail", exitcode, rc);
} finally {
cluster.shutdown();
}
}
/**
* Test to verify that mover should work well with pinned blocks as well as
* failed blocks. Mover should continue retrying the failed blocks only.
*/
@Test(timeout = 90000)
public void testMoverFailedRetryWithPinnedBlocks() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}}).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String parenDir = "/parent";
dfs.mkdirs(new Path(parenDir));
final String file1 = "/parent/testMoverFailedRetryWithPinnedBlocks1";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file1), (short) 2);
byte[] fileData = StripedFileTestUtil
.generateBytes(DEFAULT_BLOCK_SIZE * 2);
out.write(fileData);
out.close();
// Adding pinned blocks.
createFileWithFavoredDatanodes(conf, cluster, dfs);
// Delete block file so, block move will fail with FileNotFoundException
LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
Assert.assertEquals("Wrong block count", 2,
locatedBlocks.locatedBlockCount());
LocatedBlock lb = locatedBlocks.get(0);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
// move to ARCHIVE
dfs.setStoragePolicy(new Path(parenDir), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", parenDir.toString()});
Assert.assertEquals("Movement should fail after some retry",
ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testMoverWhenStoragePolicyUnset() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
.build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverWhenStoragePolicyUnset";
// write to DISK
DFSTestUtil.createFile(dfs, new Path(file), 1L, (short) 1, 0L);
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
// Wait till namenode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs, file, 1);
// verify before unset policy
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
Assert.assertTrue(StorageType.ARCHIVE == (lb.getStorageTypes())[0]);
// unset storage policy
dfs.unsetStoragePolicy(new Path(file));
rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
Assert.assertTrue(StorageType.DISK == (lb.getStorageTypes())[0]);
} finally {
cluster.shutdown();
}
}
@Test(timeout=100000)
public void testMoverMetrics() throws Exception {
long blockSize = 10*1024*1024;
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
.build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
final String file = "/testMaxIterationTime.dat";
final Path path = new Path(file);
short repFactor = 1;
int seed = 0xFAFAFA;
// write to DISK
DFSTestUtil.createFile(fs, path, 4L * blockSize, repFactor, seed);
// move to ARCHIVE
fs.setStoragePolicy(new Path(file), "COLD");
Map<URI, List<Path>> nnWithPath = new HashMap<>();
List<Path> paths = new ArrayList<>();
paths.add(path);
nnWithPath
.put(DFSUtil.getInternalNsRpcUris(conf).iterator().next(), paths);
Mover.run(nnWithPath, conf);
final String moverMetricsName = "Mover-"
+ cluster.getNameNode(0).getNamesystem().getBlockPoolId();
MetricsSource moverMetrics =
DefaultMetricsSystem.instance().getSource(moverMetricsName);
assertNotNull(moverMetrics);
MetricsRecordBuilder rb = MetricsAsserts.getMetrics(moverMetricsName);
// Check metrics
assertEquals(4, MetricsAsserts.getLongCounter("BlocksScheduled", rb));
assertEquals(1, MetricsAsserts.getLongCounter("FilesProcessed", rb));
assertEquals(41943040, MetricsAsserts.getLongGauge("BytesMoved", rb));
assertEquals(4, MetricsAsserts.getLongGauge("BlocksMoved", rb));
assertEquals(0, MetricsAsserts.getLongGauge("BlocksFailed", rb));
}
private void createFileWithFavoredDatanodes(final Configuration conf,
final MiniDFSCluster cluster, final DistributedFileSystem dfs)
throws IOException {
// Adding two DISK based data node to the cluster.
// Also, ensure that blocks are pinned in these new data nodes.
StorageType[][] newtypes =
new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}};
startAdditionalDNs(conf, 2, newtypes, cluster);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
InetSocketAddress[] favoredNodes = new InetSocketAddress[2];
int j = 0;
for (int i = dataNodes.size() - 1; i >= 2; i--) {
favoredNodes[j++] = dataNodes.get(i).getXferAddress();
}
final String file = "/parent/testMoverFailedRetryWithPinnedBlocks2";
final FSDataOutputStream out = dfs.create(new Path(file),
FsPermission.getDefault(), true, DEFAULT_BLOCK_SIZE, (short) 2,
DEFAULT_BLOCK_SIZE, null, favoredNodes);
byte[] fileData = StripedFileTestUtil.generateBytes(DEFAULT_BLOCK_SIZE * 2);
out.write(fileData);
out.close();
// Mock FsDatasetSpi#getPinning to show that the block is pinned.
LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file, 0);
Assert.assertEquals("Wrong block count", 2,
locatedBlocks.locatedBlockCount());
LocatedBlock lb = locatedBlocks.get(0);
DatanodeInfo datanodeInfo = lb.getLocations()[0];
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeId().getDatanodeUuid()
.equals(datanodeInfo.getDatanodeUuid())) {
LOG.info("Simulate block pinning in datanode {}", datanodeInfo);
InternalDataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
break;
}
}
}
private void startAdditionalDNs(final Configuration conf,
int newNodesRequired, StorageType[][] newTypes,
final MiniDFSCluster cluster) throws IOException {
cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
null, null, null, false, false, false, null, null, null);
cluster.triggerHeartbeats();
}
}