TestBlockReaderLocal.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client.impl;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.ReadStatistics;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class TestBlockReaderLocal {
private static TemporarySocketDirectory sockDir;
@BeforeAll
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterAll
public static void shutdown() throws IOException {
sockDir.close();
}
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
if (buf1[off1 + i] != buf2[off2 + i]) {
fail("arrays differ at byte " + i + ". " +
"The first array has " + (int)buf1[off1 + i] +
", but the second array has " + (int)buf2[off2 + i]);
}
}
}
/**
* Similar to IOUtils#readFully(). Reads bytes in a loop.
*
* @param reader The BlockReaderLocal to read bytes from
* @param buf The ByteBuffer to read into
* @param off The offset in the buffer to read into
* @param len The number of bytes to read.
*
* @throws IOException If it could not read the requested number of bytes
*/
private static void readFully(BlockReaderLocal reader,
ByteBuffer buf, int off, int len) throws IOException {
int amt = len;
while (amt > 0) {
buf.limit(off + len);
buf.position(off);
long ret = reader.read(buf);
if (ret < 0) {
throw new EOFException( "Premature EOF from BlockReaderLocal " +
"after reading " + (len - amt) + " byte(s).");
}
amt -= ret;
off += ret;
}
}
private static class BlockReaderLocalTest {
final static int TEST_LENGTH = 1234567;
final static int BYTES_PER_CHECKSUM = 512;
public void setConfiguration(HdfsConfiguration conf) {
// default: no-op
}
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
// default: no-op
}
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
// default: no-op
}
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
// default: no-op
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead, int shortCircuitCachesNum)
throws IOException {
assumeTrue(DomainSocket.getLoadingFailureReason() == null);
MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
!checksum);
conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
shortCircuitCachesNum);
test.setConfiguration(conf);
FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L;
final int blockSize = 10 * 1024;
BlockReaderLocal blockReaderLocal = null;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
ShortCircuitShm shm = null;
RandomAccessFile raf = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, 1024,
BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
fsIn.close();
fsIn = null;
for (int i = 0; i < shortCircuitCachesNum; i++) {
ExtendedBlock block = DFSTestUtil.getAllBlocks(
fs, TEST_PATH).get(i).getBlock();
File dataFile = cluster.getBlockFile(0, block);
File metaFile = cluster.getBlockMetadataFile(0, block);
ShortCircuitCache shortCircuitCache =
ClientContext.getFromConf(conf).getShortCircuitCache(
block.getBlockId());
test.setup(dataFile, checksum);
FileInputStream[] streams = {
new FileInputStream(dataFile),
new FileInputStream(metaFile)
};
dataIn = streams[0];
metaIn = streams[1];
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
raf = new RandomAccessFile(
new File(sockDir.getDir().getAbsolutePath(),
UUID.randomUUID().toString()), "rw");
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder(
new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setShortCircuitReplica(replica).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
build();
dataIn = null;
metaIn = null;
test.doTest(blockReaderLocal, original, i * blockSize);
// BlockReaderLocal should not alter the file position.
assertEquals(0, streams[0].getChannel().position());
assertEquals(0, streams[1].getChannel().position());
}
cluster.shutdown();
cluster = null;
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
if (metaIn != null) metaIn.close();
if (blockReaderLocal != null) blockReaderLocal.close();
if (shm != null) shm.free();
if (raf != null) raf.close();
}
}
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException {
runBlockReaderLocalTest(test, checksum, readahead, 1);
}
private static class TestBlockReaderLocalImmediateClose
extends BlockReaderLocalTest {
}
@Test
public void testBlockReaderLocalImmediateClose() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
}
private static class TestBlockReaderSimpleReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, 0, buf, 0, 512);
reader.readFully(buf, 512, 512);
assertArrayRegionsEqual(original, 512, buf, 512, 512);
reader.readFully(buf, 1024, 513);
assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
// Readahead is always at least the size of one chunk in this test.
assertTrue(reader.getMaxReadaheadLength() >=
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderSimpleReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
}
@Test
public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
}
@Test
public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
}
private static class TestBlockReaderLocalArrayReads2
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf, 10, 100);
reader.readFully(buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf, 110, 700);
reader.readFully(buf, 810, 1); // from offset 810 to offset 811
reader.readFully(buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf, 811, 5);
reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
reader.readFully(buf, 1716, 5);
assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
}
}
@Test
public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalArrayReads2NoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalArrayReads2NoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
}
@Test
public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
}
private static class TestBlockReaderLocalByteBufferReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
readFully(reader, buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
readFully(reader, buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
readFully(reader, buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
reader.skip(1); // skip from offset 810 to offset 811
readFully(reader, buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
@Test
public void testBlockReaderLocalByteBufferReads()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(
new TestBlockReaderLocalByteBufferReads(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, 0);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
false, 0);
}
/**
* Test reads that bypass the bounce buffer (because they are aligned
* and bigger than the readahead).
*/
private static class TestBlockReaderLocalByteBufferFastLaneReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
readFully(reader, buf, 0, 5120);
buf.flip();
assertArrayRegionsEqual(original, 0,
DFSTestUtil.asArray(buf), 0,
5120);
reader.skip(1537);
readFully(reader, buf, 0, 1);
buf.flip();
assertArrayRegionsEqual(original, 6657,
DFSTestUtil.asArray(buf), 0,
1);
reader.forceAnchorable();
readFully(reader, buf, 0, 5120);
buf.flip();
assertArrayRegionsEqual(original, 6658,
DFSTestUtil.asArray(buf), 0,
5120);
reader.forceUnanchorable();
readFully(reader, buf, 0, 513);
buf.flip();
assertArrayRegionsEqual(original, 11778,
DFSTestUtil.asArray(buf), 0,
513);
reader.skip(3);
readFully(reader, buf, 0, 50);
buf.flip();
assertArrayRegionsEqual(original, 12294,
DFSTestUtil.asArray(buf), 0,
50);
}
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReads()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(
new TestBlockReaderLocalByteBufferFastLaneReads(),
false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
true, 0);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
false, 0);
}
private static class TestBlockReaderLocalReadCorruptStart
extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
RandomAccessFile bf = null;
this.usingChecksums = usingChecksums;
try {
bf = new RandomAccessFile(blockFile, "rw");
bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
} finally {
if (bf != null) bf.close();
}
}
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
if (usingChecksums) {
try {
reader.readFully(buf, 0, 10);
fail("did not detect corruption");
} catch (IOException e) {
// expected
}
} else {
reader.readFully(buf, 0, 10);
}
}
}
@Test
public void testBlockReaderLocalReadCorruptStart()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
private static class TestBlockReaderLocalReadCorrupt
extends BlockReaderLocalTest {
boolean usingChecksums = false;
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
RandomAccessFile bf = null;
this.usingChecksums = usingChecksums;
try {
bf = new RandomAccessFile(blockFile, "rw");
bf.seek(1539);
bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
} finally {
if (bf != null) bf.close();
}
}
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
try {
reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf, 10, 100);
reader.readFully(buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf, 110, 700);
reader.skip(1); // skip from offset 810 to offset 811
reader.readFully(buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf, 811, 5);
reader.readFully(buf, 816, 900);
if (usingChecksums) {
// We should detect the corruption when using a checksum file.
fail("did not detect corruption");
}
} catch (ChecksumException e) {
if (!usingChecksums) {
fail("didn't expect to get ChecksumException: not " +
"using checksums.");
}
}
}
}
@Test
public void testBlockReaderLocalReadCorrupt()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadCorruptNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadCorruptNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
}
@Test
public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
}
private static class TestBlockReaderLocalWithMlockChanges
extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
}
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
reader.skip(1);
readFully(reader, buf, 1, 9);
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
readFully(reader, buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
reader.forceAnchorable();
readFully(reader, buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
reader.forceUnanchorable();
reader.skip(1); // skip from offset 810 to offset 811
readFully(reader, buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
@Test
public void testBlockReaderLocalWithMlockChanges()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
true, 0);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
false, 0);
}
private static class TestBlockReaderLocalOnFileWithoutChecksum
extends BlockReaderLocalTest {
@Override
public void setConfiguration(HdfsConfiguration conf) {
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
}
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
assertTrue(!reader.getVerifyChecksum());
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
reader.skip(1);
readFully(reader, buf, 1, 9);
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
readFully(reader, buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
reader.forceAnchorable();
readFully(reader, buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
reader.forceUnanchorable();
reader.skip(1); // skip from offset 810 to offset 811
readFully(reader, buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
private static class TestBlockReaderLocalReadZeroBytes
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte emptyArr[] = new byte[0];
assertEquals(0, reader.read(emptyArr, 0, 0));
ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr);
assertEquals(0, reader.read(emptyBuf));
reader.skip(1);
assertEquals(0, reader.read(emptyArr, 0, 0));
assertEquals(0, reader.read(emptyBuf));
reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1);
assertEquals(-1, reader.read(emptyArr, 0, 0));
assertEquals(-1, reader.read(emptyBuf));
}
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
true, 0);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, 0);
}
@Test
public void testBlockReaderLocalReadZeroBytes()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadZeroBytesNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadZeroBytesNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
true, 0);
}
@Test
public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
false, 0);
}
@Test
@Timeout(value = 60)
public void TestStatisticsForShortCircuitLocalRead() throws Exception {
testStatistics(true);
}
@Test
@Timeout(value = 60)
public void TestStatisticsForLocalRead() throws Exception {
testStatistics(false);
}
private void testStatistics(boolean isShortCircuit) throws Exception {
assumeTrue(DomainSocket.getLoadingFailureReason() == null);
HdfsConfiguration conf = new HdfsConfiguration();
TemporarySocketDirectory sockDir = null;
if (isShortCircuit) {
DFSInputStream.tcpReadsDisabledForTesting = true;
sockDir = new TemporarySocketDirectory();
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
getAbsolutePath());
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
DomainSocket.disableBindPathValidation();
} else {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
}
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).
hosts(new String[] {NetUtils.getLocalHostname()}).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalBytesRead());
assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalLocalBytesRead());
if (isShortCircuit) {
assertEquals(BlockReaderLocalTest.TEST_LENGTH,
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
} else {
assertEquals(0,
dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
}
fsIn.close();
fsIn = null;
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}
}
@Test
@Timeout(value = 60)
public void testStatisticsForErasureCodingRead() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
final int numDataNodes =
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
// The length of test file is one full strip + one partial stripe. And
// it is not bound to the stripe cell size.
final int length = ecPolicy.getCellSize() * (numDataNodes + 1) + 123;
final long randomSeed = 4567L;
final short repl = 1;
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).build()) {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(ecPolicy.getName());
Path ecDir = new Path("/ec");
fs.mkdirs(ecDir);
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
Path nonEcDir = new Path("/noEc");
fs.mkdirs(nonEcDir);
byte[] buf = new byte[length];
Path nonEcFile = new Path(nonEcDir, "file1");
DFSTestUtil.createFile(fs, nonEcFile, length, repl, randomSeed);
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(nonEcFile)) {
IOUtils.readFully(in, buf, 0, length);
ReadStatistics stats = in.getReadStatistics();
assertEquals(BlockType.CONTIGUOUS, stats.getBlockType());
assertEquals(length, stats.getTotalBytesRead());
assertEquals(length, stats.getTotalLocalBytesRead());
}
Path ecFile = new Path(ecDir, "file2");
DFSTestUtil.createFile(fs, ecFile, length, repl, randomSeed);
// Shutdown a DataNode that holds a data block, to trigger EC decoding.
final BlockLocation[] locs = fs.getFileBlockLocations(ecFile, 0, length);
final String[] nodes = locs[0].getNames();
cluster.stopDataNode(nodes[0]);
try (HdfsDataInputStream in = (HdfsDataInputStream) fs.open(ecFile)) {
IOUtils.readFully(in, buf, 0, length);
ReadStatistics stats = in.getReadStatistics();
assertEquals(BlockType.STRIPED, stats.getBlockType());
assertEquals(length, stats.getTotalLocalBytesRead());
assertEquals(length, stats.getTotalBytesRead());
assertTrue(stats.getTotalEcDecodingTimeMillis() > 0);
}
}
}
private static class TestBlockReaderFiveShortCircutCachesReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, shift, buf, 0, 512);
reader.readFully(buf, 512, 512);
assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
reader.readFully(buf, 1024, 513);
assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
// Readahead is always at least the size of one chunk in this test.
assertTrue(reader.getMaxReadaheadLength() >=
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, 0, 5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, 0, 5);
}
@Test
public void testBlockReaderShortCircutCachesOutOfRangeBelow()
throws IOException {
assumeTrue(DomainSocket.getLoadingFailureReason() == null);
assertThrows(IllegalArgumentException.class, () -> {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
0);
});
}
@Test
public void testBlockReaderShortCircutCachesOutOfRangeAbove()
throws IOException {
assumeTrue(DomainSocket.getLoadingFailureReason() == null);
assertThrows(IllegalArgumentException.class, () -> {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
555);
});
}
}