TestRefreshBlockPlacementPolicy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.net.Node;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.junit.Assert.assertEquals;
/**
* Test refresh block placement policy.
*/
public class TestRefreshBlockPlacementPolicy {
private MiniDFSCluster cluster;
private Configuration config;
private static int counter = 0;
static class MockBlockPlacementPolicy extends BlockPlacementPolicyDefault {
@Override
public DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) {
counter++;
return super.chooseTarget(srcPath, numOfReplicas, writer, chosen,
returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
}
}
@Before
public void setup() throws IOException {
config = new Configuration();
config.setClass(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
config.setClass(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(9).build();
cluster.waitActive();
}
@After
public void cleanup() throws IOException {
cluster.shutdown();
}
@Test
public void testRefreshReplicationPolicy() throws Exception {
Path file = new Path("/test-file");
DistributedFileSystem dfs = cluster.getFileSystem();
verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
.reconfigurePropertyImpl(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, null));
}
@Test
public void testRefreshEcPolicy() throws Exception {
Path ecDir = new Path("/ec");
Path file = new Path("/ec/test-file");
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.mkdir(ecDir, FsPermission.createImmutable((short)755));
dfs.setErasureCodingPolicy(ecDir, null);
verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
.reconfigurePropertyImpl(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, null));
}
@FunctionalInterface
private interface Refresh {
void refresh() throws ReconfigurationException;
}
private void verifyRefreshPolicy(DistributedFileSystem dfs, Path file,
Refresh func) throws IOException, ReconfigurationException {
// Choose datanode using the mock policy.
int lastCounter = counter;
OutputStream out = dfs.create(file, true);
out.write("test".getBytes());
out.close();
assert(counter > lastCounter);
// Refresh to the default policy.
func.refresh();
lastCounter = counter;
dfs.delete(file, true);
out = dfs.create(file, true);
out.write("test".getBytes());
out.close();
assertEquals(lastCounter, counter);
}
}