TestBlockInfoStriped.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
/**
* Test {@link BlockInfoStriped}.
*/
@RunWith(Parameterized.class)
public class TestBlockInfoStriped {
private static final long BASE_ID = -1600;
private final Block baseBlock = new Block(BASE_ID);
private final ErasureCodingPolicy testECPolicy;
private final int totalBlocks;
private final BlockInfoStriped info;
public TestBlockInfoStriped(ErasureCodingPolicy policy) {
testECPolicy = policy;
totalBlocks = testECPolicy.getNumDataUnits()
+ testECPolicy.getNumParityUnits();
info = new BlockInfoStriped(baseBlock, testECPolicy);
}
@Parameterized.Parameters(name = "{index}: {0}")
public static Collection<Object[]> policies() {
return StripedFileTestUtil.getECPolicies();
}
private Block[] createReportedBlocks(int num) {
Block[] blocks = new Block[num];
for (int i = 0; i < num; i++) {
blocks[i] = new Block(BASE_ID + i);
}
return blocks;
}
@Rule
public Timeout globalTimeout = new Timeout(300000);
/**
* Test adding storage and reported block.
*/
@Test
public void testAddStorage() {
// first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete
// group of blocks/storages
DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos(
totalBlocks);
Block[] blocks = createReportedBlocks(totalBlocks);
int i = 0;
for (; i < storageInfos.length; i += 2) {
info.addStorage(storageInfos[i], blocks[i]);
Assert.assertEquals(i/2 + 1, info.numNodes());
}
i /= 2;
for (int j = 1; j < storageInfos.length; j += 2) {
Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j]));
Assert.assertEquals(i + (j+1)/2, info.numNodes());
}
// check
byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(totalBlocks, info.getCapacity());
Assert.assertEquals(totalBlocks, indices.length);
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
int index = info.findStorageInfo(storage);
Assert.assertEquals(i++, index);
Assert.assertEquals(index, indices[index]);
}
// the same block is reported from the same storage twice
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
Assert.assertTrue(info.addStorage(storage, blocks[i++]));
}
Assert.assertEquals(totalBlocks, info.getCapacity());
Assert.assertEquals(totalBlocks, info.numNodes());
Assert.assertEquals(totalBlocks, indices.length);
i = 0;
for (DatanodeStorageInfo storage : storageInfos) {
int index = info.findStorageInfo(storage);
Assert.assertEquals(i++, index);
Assert.assertEquals(index, indices[index]);
}
// the same block is reported from another storage
DatanodeStorageInfo[] storageInfos2 =
DFSTestUtil.createDatanodeStorageInfos(totalBlocks * 2);
// only add the second half of info2
for (i = totalBlocks; i < storageInfos2.length; i++) {
info.addStorage(storageInfos2[i], blocks[i % totalBlocks]);
Assert.assertEquals(i + 1, info.getCapacity());
Assert.assertEquals(i + 1, info.numNodes());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(i + 1, indices.length);
}
for (i = totalBlocks; i < storageInfos2.length; i++) {
int index = info.findStorageInfo(storageInfos2[i]);
Assert.assertEquals(i++, index);
Assert.assertEquals(index - totalBlocks, indices[index]);
}
}
@Test
public void testRemoveStorage() {
// first add TOTAL_NUM_BLOCKS into the BlockInfoStriped
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
totalBlocks);
Block[] blocks = createReportedBlocks(totalBlocks);
for (int i = 0; i < storages.length; i++) {
info.addStorage(storages[i], blocks[i]);
}
// remove two storages
info.removeStorage(storages[0]);
info.removeStorage(storages[2]);
// check
Assert.assertEquals(totalBlocks, info.getCapacity());
Assert.assertEquals(totalBlocks - 2, info.numNodes());
byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
for (int i = 0; i < storages.length; i++) {
int index = info.findStorageInfo(storages[i]);
if (i != 0 && i != 2) {
Assert.assertEquals(i, index);
Assert.assertEquals(index, indices[index]);
} else {
Assert.assertEquals(-1, index);
Assert.assertEquals(-1, indices[i]);
}
}
// the same block is reported from another storage
DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos(
totalBlocks * 2);
for (int i = totalBlocks; i < storages2.length; i++) {
info.addStorage(storages2[i], blocks[i % totalBlocks]);
}
// now we should have 8 storages
Assert.assertEquals(totalBlocks * 2 - 2, info.numNodes());
Assert.assertEquals(totalBlocks * 2 - 2, info.getCapacity());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(totalBlocks * 2 - 2, indices.length);
int j = totalBlocks;
for (int i = totalBlocks; i < storages2.length; i++) {
int index = info.findStorageInfo(storages2[i]);
if (i == totalBlocks || i == totalBlocks + 2) {
Assert.assertEquals(i - totalBlocks, index);
} else {
Assert.assertEquals(j++, index);
}
}
// remove the storages from storages2
for (int i = 0; i < totalBlocks; i++) {
info.removeStorage(storages2[i + totalBlocks]);
}
// now we should have 3 storages
Assert.assertEquals(totalBlocks - 2, info.numNodes());
Assert.assertEquals(totalBlocks * 2 - 2, info.getCapacity());
indices = (byte[]) Whitebox.getInternalState(info, "indices");
Assert.assertEquals(totalBlocks * 2 - 2, indices.length);
for (int i = 0; i < totalBlocks; i++) {
if (i == 0 || i == 2) {
int index = info.findStorageInfo(storages2[i + totalBlocks]);
Assert.assertEquals(-1, index);
} else {
int index = info.findStorageInfo(storages[i]);
Assert.assertEquals(i, index);
}
}
for (int i = totalBlocks; i < totalBlocks * 2 - 2; i++) {
Assert.assertEquals(-1, indices[i]);
Assert.assertNull(info.getDatanode(i));
}
}
@Test
public void testGetBlockInfo() throws IllegalArgumentException, Exception {
int dataBlocks = testECPolicy.getNumDataUnits();
int parityBlocks = testECPolicy.getNumParityUnits();
int totalSize = dataBlocks + parityBlocks;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
Configuration conf = new Configuration();
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf, builderBaseDir).numDataNodes(totalSize)
.build()) {
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
fs.enableErasureCodingPolicy(testECPolicy.getName());
fs.mkdirs(new Path("/ecDir"));
fs.setErasureCodingPolicy(new Path("/ecDir"), testECPolicy.getName());
DFSTestUtil.createFile(fs, new Path("/ecDir/ecFile"),
fs.getDefaultBlockSize() * dataBlocks, (short) 1, 1024);
ExtendedBlock blk = DFSTestUtil
.getAllBlocks(fs, new Path("/ecDir/ecFile")).get(0).getBlock();
String id = "blk_" + Long.toString(blk.getBlockId());
BlockInfo bInfo = cluster.getNameNode().getNamesystem().getBlockManager()
.getStoredBlock(blk.getLocalBlock());
DatanodeStorageInfo[] dnStorageInfo = cluster.getNameNode()
.getNamesystem().getBlockManager().getStorages(bInfo);
bInfo.removeStorage(dnStorageInfo[1]);
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
assertEquals(0, ToolRunner.run(new DFSck(conf, out), new String[] {
new Path("/ecDir/ecFile").toString(), "-blockId", id }));
assertFalse(out.toString().contains("null"));
}
}
@Test
public void testWrite() {
long blkID = 1;
long numBytes = 1;
long generationStamp = 1;
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3);
byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(byteStream);
BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes,
generationStamp), testECPolicy);
try {
blk.write(out);
} catch(Exception ex) {
fail("testWrite error:" + ex.getMessage());
}
assertEquals(byteBuffer.array().length, byteStream.toByteArray().length);
assertArrayEquals(byteBuffer.array(), byteStream.toByteArray());
}
@Test(expected=IllegalArgumentException.class)
public void testAddStorageWithReplicatedBlock() {
DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
"storageID", "127.0.0.1");
BlockInfo replica = new BlockInfoContiguous(new Block(1000L), (short) 3);
info.addStorage(storage, replica);
}
@Test(expected=IllegalArgumentException.class)
public void testAddStorageWithDifferentBlockGroup() {
DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
"storageID", "127.0.0.1");
BlockInfo diffGroup = new BlockInfoStriped(new Block(BASE_ID + 100),
testECPolicy);
info.addStorage(storage, diffGroup);
}
}