TestCacheByPmemMappableBlockLoader.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
import static org.junit.Assume.assumeTrue;
/**
* Tests HDFS persistent memory cache by PmemMappableBlockLoader.
*
* Bogus persistent memory volume is used to cache blocks.
*/
public class TestCacheByPmemMappableBlockLoader {
protected static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
protected static final long CACHE_CAPACITY = 64 * 1024;
protected static final long BLOCK_SIZE = 4 * 1024;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
private static DistributedFileSystem fs;
private static DataNode dn;
private static FsDatasetCache cacheManager;
/**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
*/
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
private static final String PMEM_DIR_0 =
MiniDFSCluster.getBaseDirectory() + "pmem0";
private static final String PMEM_DIR_1 =
MiniDFSCluster.getBaseDirectory() + "pmem1";
static {
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
}
@BeforeClass
public static void setUpClass() throws Exception {
assumeTrue("Requires PMDK", NativeIO.POSIX.isPmdkAvailable());
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
@Override
public void startOfferService() throws Exception {
lock.readLock().lock();
}
@Override
public void endOfferService() throws Exception {
lock.readLock().unlock();
}
});
}
@AfterClass
public static void tearDownClass() throws Exception {
DataNodeFaultInjector.set(oldInjector);
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
// Configuration for pmem cache
new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
// Configure two bogus pmem volumes
conf.set(DFS_DATANODE_PMEM_CACHE_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dn = cluster.getDataNodes().get(0);
cacheManager = ((FsDatasetImpl) dn.getFSDataset()).cacheManager;
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
}
protected static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testPmemVolumeManager() throws IOException {
PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
assertNotNull(pmemVolumeManager);
assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
// Test round-robin selection policy
long count1 = 0, count2 = 0;
for (int i = 0; i < 10; i++) {
Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
String volume = pmemVolumeManager.getVolumeByIndex(index);
if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
count1++;
} else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
count2++;
} else {
fail("Unexpected persistent storage location:" + volume);
}
}
assertEquals(count1, count2);
}
public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen)
throws IOException {
List<ExtendedBlockId> keys = new ArrayList<>();
HdfsBlockLocation[] locs =
(HdfsBlockLocation[]) fs.getFileBlockLocations(filePath, 0, fileLen);
for (HdfsBlockLocation loc : locs) {
long bkid = loc.getLocatedBlock().getBlock().getBlockId();
String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
keys.add(new ExtendedBlockId(bkid, bpid));
}
return keys;
}
@Test(timeout = 60000)
public void testCacheAndUncache() throws Exception {
final int maxCacheBlocksNum =
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
assertEquals(CACHE_CAPACITY, cacheManager.getCacheCapacity());
// DRAM cache is expected to be disabled.
assertEquals(0L, cacheManager.getMemCacheCapacity());
final Path testFile = new Path("/testFile");
final long testFileLen = maxCacheBlocksNum * BLOCK_SIZE;
DFSTestUtil.createFile(fs, testFile,
testFileLen, (short) 1, 0xbeef);
List<ExtendedBlockId> blockKeys =
getExtendedBlockId(testFile, testFileLen);
fs.addCachePool(new CachePoolInfo("testPool"));
final long cacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(testFile).setReplication((short) 1).build());
// wait for caching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != maxCacheBlocksNum) {
LOG.info("waiting for " + maxCacheBlocksNum + " blocks to " +
"be cached. Right now " + blocksCached + " blocks are cached.");
return false;
}
LOG.info(maxCacheBlocksNum + " blocks are now cached.");
return true;
}
}, 1000, 30000);
// The pmem cache space is expected to have been used up.
assertEquals(CACHE_CAPACITY, cacheManager.getCacheUsed());
// There should be no cache used on DRAM.
assertEquals(0L, cacheManager.getMemCacheUsed());
Map<ExtendedBlockId, Byte> blockKeyToVolume =
PmemVolumeManager.getInstance().getBlockKeyToVolume();
// All block keys should be kept in blockKeyToVolume
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Test each replica's cache file path
for (ExtendedBlockId key : blockKeys) {
String cachePath = cacheManager.
getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
// The cachePath shouldn't be null if the replica has been cached
// to pmem.
assertNotNull(cachePath);
Path path = new Path(cachePath);
String fileName = path.getName();
if (cachePath.startsWith(PMEM_DIR_0)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_0) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else if (cachePath.startsWith(PMEM_DIR_1)) {
String expectPath = PmemVolumeManager.
getRealPmemDir(PMEM_DIR_1) + "/" + key.getBlockPoolId();
assertTrue(path.toString().startsWith(expectPath));
assertTrue(key.getBlockId() == Long.parseLong(fileName));
} else {
fail("The cache path is not the expected one: " + cachePath);
}
}
// Try to cache another file. Caching this file should fail
// due to lack of available cache space.
final Path smallTestFile = new Path("/smallTestFile");
final long smallTestFileLen = BLOCK_SIZE;
DFSTestUtil.createFile(fs, smallTestFile,
smallTestFileLen, (short) 1, 0xbeef);
// Try to cache more blocks when no cache space is available.
final long smallFileCacheDirectiveId = fs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool("testPool").
setPath(smallTestFile).setReplication((short) 1).build());
// Wait for enough time to verify smallTestFile could not be cached.
Thread.sleep(10000);
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
// The cached block num should not be increased.
assertTrue(blocksCached == maxCacheBlocksNum);
// The blockKeyToVolume should just keep the block keys for the testFile.
assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
// Stop trying to cache smallTestFile to avoid interfering the
// verification for uncache functionality.
fs.removeCacheDirective(smallFileCacheDirectiveId);
// Uncache the test file
fs.removeCacheDirective(cacheDirectiveId);
// Wait for uncaching
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksUncached =
MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
if (blocksUncached != maxCacheBlocksNum) {
LOG.info("waiting for " + maxCacheBlocksNum + " blocks to be " +
"uncached. Right now " + blocksUncached +
" blocks are uncached.");
return false;
}
LOG.info(maxCacheBlocksNum + " blocks have been uncached.");
return true;
}
}, 1000, 30000);
// It is expected that no pmem cache space is used.
assertEquals(0, cacheManager.getCacheUsed());
// No record should be kept by blockKeyToVolume after testFile is uncached.
assertEquals(blockKeyToVolume.size(), 0);
}
}