TestBPOfferService.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
public class TestBPOfferService {
private static final String FAKE_BPID = "fake bpid";
private static final String FAKE_CLUSTERID = "fake cluster";
protected static final Logger LOG = LoggerFactory.getLogger(
TestBPOfferService.class);
private static final ExtendedBlock FAKE_BLOCK =
new ExtendedBlock(FAKE_BPID, 12345L);
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
private long firstCallTime = 0;
private long secondCallTime = 0;
private long firstLeaseId = 0;
private long secondLeaseId = 0;
private long nextFullBlockReportLeaseId = 1L;
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.TRACE);
}
@Rule
public TemporaryFolder baseDir = new TemporaryFolder();
private DatanodeProtocolClientSideTranslatorPB mockNN1;
private DatanodeProtocolClientSideTranslatorPB mockNN2;
private final NNHAStatusHeartbeat[] mockHaStatuses =
new NNHAStatusHeartbeat[3];
private final DatanodeCommand[][] datanodeCommands =
new DatanodeCommand[3][0];
private final int[] heartbeatCounts = new int[3];
private DataNode mockDn;
private FsDatasetSpi<?> mockFSDataset;
private DataSetLockManager dataSetLockManager = new DataSetLockManager();
private boolean isSlownode;
private String mockStorageID;
@Before
public void setupMocks() throws Exception {
mockNN1 = setupNNMock(0);
mockNN2 = setupNNMock(1);
// Set up a mock DN with the bare-bones configuration
// objects, etc. Set as stubOnly to save memory and avoid Mockito holding
// references to each invocation. This can cause OOM in some runs.
mockDn = Mockito.mock(DataNode.class, Mockito.withSettings().stubOnly());
Mockito.doReturn(true).when(mockDn).shouldRun();
Configuration conf = new Configuration();
File dnDataDir = new File(new File(TEST_BUILD_DATA, "dfs"), "data");
conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
Mockito.doReturn(conf).when(mockDn).getConf();
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
mockFSDataset.addBlockPool(FAKE_BPID, conf);
mockStorageID = ((SimulatedFSDataset) mockFSDataset).getStorages().get(0).getStorageUuid();
// Wire the dataset to the DN.
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
}
@After
public void checkDataSetLockManager() {
dataSetLockManager.lockLeakCheck();
// make sure no lock Leak.
assertNull(dataSetLockManager.getLastException());
}
/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
throws Exception {
DatanodeProtocolClientSideTranslatorPB mock =
Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0))
.when(mock).versionRequest();
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
.when(mock).registerDatanode(Mockito.any());
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))
.when(mock).sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;
}
/**
* Mock answer for heartbeats which returns an empty set of commands
* and the HA status for the chosen NN from the
* {@link TestBPOfferService#mockHaStatuses} array.
*/
private class HeartbeatAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
HeartbeatAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++;
Boolean requestFullBlockReportLease =
(Boolean) invocation.getArguments()[8];
long fullBlockReportLeaseId = 0;
if (requestFullBlockReportLease) {
fullBlockReportLeaseId = nextFullBlockReportLeaseId++;
}
LOG.info("fullBlockReportLeaseId=" + fullBlockReportLeaseId);
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
fullBlockReportLeaseId);
//reset the command
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return heartbeatResponse;
}
}
private class HeartbeatIsSlownodeAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
HeartbeatIsSlownodeAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
HeartbeatResponse heartbeatResponse = new HeartbeatResponse(
datanodeCommands[nnIdx], mockHaStatuses[nnIdx], null,
0, isSlownode);
return heartbeatResponse;
}
}
private class HeartbeatRegisterAnswer implements Answer<HeartbeatResponse> {
private final int nnIdx;
HeartbeatRegisterAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
@Override
public HeartbeatResponse answer(InvocationOnMock invocation)
throws Throwable {
heartbeatCounts[nnIdx]++;
DatanodeCommand[] cmds = new DatanodeCommand[1];
cmds[0] = new RegisterCommand();
return new HeartbeatResponse(cmds, mockHaStatuses[nnIdx],
null, 0L);
}
}
/**
* Test that the BPOS can register to talk to two different NNs,
* sends block reports to both, etc.
*/
@Test
public void testBasicFunctionality() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForBothActors(bpos);
// The DN should have register to both NNs.
Mockito.verify(mockNN1).registerDatanode(Mockito.any());
Mockito.verify(mockNN2).registerDatanode(Mockito.any());
// Should get block reports from both NNs
waitForBlockReport(mockNN1);
waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
} finally {
bpos.stop();
bpos.join();
}
}
/**
* HDFS-15113: Test and verify missing block when re-register.
*/
@Test
public void testMissBlocksWhenReregister() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
int totalTestBlocks = 4000;
Thread addNewBlockThread = null;
final AtomicInteger count = new AtomicInteger(0);
DataNodeFaultInjector prevDNFaultInjector = null;
Set<Long> blocks = new TreeSet<>();
try {
waitForBothActors(bpos);
waitForInitialization(bpos);
prevDNFaultInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void blockUtilSendFullBlockReport() {
try {
GenericTestUtils.waitFor(() -> count.get() > 2000,
100, 1000);
} catch (Exception e) {
LOG.error("error DataNodeFaultInjector", e);
}
}
});
countBlockReportItems(FAKE_BLOCK, mockNN1, blocks);
addNewBlockThread = new Thread(() -> {
for (int i = 0; i < totalTestBlocks; i++) {
SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
String storageId = simulatedStorage.getStorageUuid();
ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i);
try {
fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
bpos.notifyNamenodeReceivingBlock(b, storageId);
fsDataset.finalizeBlock(b, false);
count.addAndGet(1);
Thread.sleep(1);
} catch (Exception e) {
LOG.error("error addNewBlockThread", e);
}
}
});
addNewBlockThread.start();
// Make sure that generate blocks for DataNode and IBR not empty now.
GenericTestUtils.waitFor(() -> count.get() > 0, 100, 1000);
// Trigger re-register using DataNode Command.
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
bpos.triggerHeartbeatForTests();
addNewBlockThread.join();
addNewBlockThread = null;
// Verify FBR/IBR count is equal to generate number.
try {
GenericTestUtils.waitFor(() -> blocks.size() == totalTestBlocks,
1000, 15000);
} catch (Exception e) {
fail(String.format("Timed out waiting for blocks count. "
+ "reported = %d, expected = %d. Exception: %s",
blocks.size(), totalTestBlocks, e.getMessage()));
}
} finally {
if (addNewBlockThread != null) {
addNewBlockThread.interrupt();
}
bpos.stop();
bpos.join();
DataNodeFaultInjector.set(prevDNFaultInjector);
}
}
@Test
public void testLocklessBlockPoolId() throws Exception {
BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1));
// bpNSInfo is not set, should take lock to check nsInfo.
assertNull(bpos.getBlockPoolId());
Mockito.verify(bpos).readLock();
// setting the bpNSInfo should cache the bp id, thus no locking.
Mockito.reset(bpos);
NamespaceInfo nsInfo = new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0);
assertNull(bpos.setNamespaceInfo(nsInfo));
assertEquals(FAKE_BPID, bpos.getBlockPoolId());
Mockito.verify(bpos, Mockito.never()).readLock();
// clearing the bpNSInfo should clear the cached bp id, thus requiring
// locking to check the bpNSInfo.
Mockito.reset(bpos);
assertEquals(nsInfo, bpos.setNamespaceInfo(null));
assertNull(bpos.getBlockPoolId());
Mockito.verify(bpos).readLock();
// test setting it again.
Mockito.reset(bpos);
assertNull(bpos.setNamespaceInfo(nsInfo));
assertEquals(FAKE_BPID, bpos.getBlockPoolId());
Mockito.verify(bpos, Mockito.never()).readLock();
}
/**
* Test that DNA_INVALIDATE commands from the standby are ignored.
*/
@Test
public void testIgnoreDeletionsFromNonActive() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
// Ask to invalidate FAKE_BLOCK when block report hits the
// standby
Mockito.doReturn(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
FAKE_BPID, new Block[] { FAKE_BLOCK.getLocalBlock() }))
.when(mockNN2).blockReport(
Mockito.any(),
Mockito.eq(FAKE_BPID),
Mockito.any(),
Mockito.any());
bpos.start();
try {
waitForInitialization(bpos);
// Should get block reports from both NNs
waitForBlockReport(mockNN1);
waitForBlockReport(mockNN2);
} finally {
bpos.stop();
bpos.join();
}
// Should ignore the delete command from the standby
Mockito.verify(mockFSDataset, Mockito.never())
.invalidate(Mockito.eq(FAKE_BPID), Mockito.any());
}
/**
* Ensure that, if the two NNs configured for a block pool
* have different block pool IDs, they will refuse to both
* register.
*/
@Test
public void testNNsFromDifferentClusters() throws Exception {
Mockito
.doReturn(new NamespaceInfo(1, "fake foreign cluster", FAKE_BPID, 0))
.when(mockNN1).versionRequest();
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForOneToFail(bpos);
} finally {
bpos.stop();
bpos.join();
}
}
/**
* Test that the DataNode determines the active NameNode correctly
* based on the HA-related information in heartbeat responses.
* See HDFS-2627.
*/
@Test
public void testPickActiveNameNode() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
assertSame(mockNN1, bpos.getActiveNN());
// NN2 claims active at a higher txid
mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 2);
bpos.triggerHeartbeatForTests();
assertSame(mockNN2, bpos.getActiveNN());
// Even after another heartbeat from the first NN, it should
// think NN2 is active, since it claimed a higher txid
bpos.triggerHeartbeatForTests();
assertSame(mockNN2, bpos.getActiveNN());
// Even if NN2 goes to standby, DN shouldn't reset to talking to NN1,
// because NN1's txid is lower than the last active txid. Instead,
// it should consider neither active.
mockHaStatuses[1] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 2);
bpos.triggerHeartbeatForTests();
assertNull(bpos.getActiveNN());
// Now if NN1 goes back to a higher txid, it should be considered active
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 3);
bpos.triggerHeartbeatForTests();
assertSame(mockNN1, bpos.getActiveNN());
} finally {
bpos.stop();
bpos.join();
}
}
/**
* Test datanode block pool initialization error handling.
* Failure in initializing a block pool should not cause NPE.
*/
@Test
public void testBPInitErrorHandling() throws Exception {
final DataNode mockDn = Mockito.mock(DataNode.class);
Mockito.doReturn(true).when(mockDn).shouldRun();
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
Configuration conf = new Configuration();
File dnDataDir = new File(
new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
Mockito.doReturn(conf).when(mockDn).getConf();
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
final AtomicInteger count = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (count.getAndIncrement() == 0) {
throw new IOException("faked initBlockPool exception");
}
// The initBlockPool is called again. Now mock init is done.
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
return null;
}
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
List<BPServiceActor> actors = bpos.getBPServiceActors();
assertEquals(2, actors.size());
bpos.start();
try {
waitForInitialization(bpos);
// even if one of the actor initialization fails, the other one will be
// finish block report.
waitForBlockReport(mockNN1, mockNN2);
} finally {
bpos.stop();
bpos.join();
}
}
private void waitForOneToFail(final BPOfferService bpos)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BPServiceActor> actors = bpos.getBPServiceActors();
int failedcount = 0;
for (BPServiceActor actor : actors) {
if (!actor.isAlive()) {
failedcount++;
}
}
return failedcount == 1;
}
}, 100, 10000);
}
/**
* Create a BPOfferService which registers with and heartbeats with the
* specified namenode proxy objects.
* @throws IOException
*/
private BPOfferService setupBPOSForNNs(
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
return setupBPOSForNNs(mockDn, nns);
}
private BPOfferService setupBPOSForNNs(DataNode mockDn,
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
// Set up some fake InetAddresses, then override the connectToNN
// function to return the corresponding proxies.
final Map<InetSocketAddress, DatanodeProtocolClientSideTranslatorPB> nnMap = Maps.newLinkedHashMap();
List<String> nnIds = Lists.newArrayListWithCapacity(nns.length);
for (int port = 0; port < nns.length; port++) {
nnMap.put(new InetSocketAddress(port), nns[port]);
Mockito.doReturn(nns[port]).when(mockDn).connectToNN(
Mockito.eq(new InetSocketAddress(port)));
nnIds.add("nn" + port);
}
return new BPOfferService("test_ns", nnIds,
Lists.newArrayList(nnMap.keySet()),
Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
}
private void waitForInitialization(final BPOfferService bpos)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return bpos.isAlive() && bpos.isInitialized();
}
}, 100, 10000);
}
private void waitForBothActors(final BPOfferService bpos)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<BPServiceActor> actors = bpos.getBPServiceActors();
return bpos.isAlive() && getRegisteredActors(actors) == 2;
}
private int getRegisteredActors(List<BPServiceActor> actors) {
int regActors = 0;
for (BPServiceActor actor : actors) {
if (actor.getBpRegistration() != null) {
regActors++;
}
}
return regActors;
}
}, 100, 10000);
}
private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Mockito.verify(mockNN).blockReport(
Mockito.any(),
Mockito.eq(FAKE_BPID),
Mockito.any(),
Mockito.any());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());
return false;
}
}
}, 500, 10000);
}
private void waitForBlockReport(
final DatanodeProtocolClientSideTranslatorPB mockNN1,
final DatanodeProtocolClientSideTranslatorPB mockNN2)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return get(mockNN1) || get(mockNN2);
}
private Boolean get(DatanodeProtocolClientSideTranslatorPB mockNN) {
try {
Mockito.verify(mockNN).blockReport(
Mockito.any(),
Mockito.eq(FAKE_BPID),
Mockito.any(),
Mockito.any());
return true;
} catch (Throwable t) {
LOG.info("waiting on block report: " + t.getMessage());
return false;
}
}
}, 500, 10000);
}
private void waitForRegistration(
final DatanodeProtocolClientSideTranslatorPB mockNN, int times)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
// The DN should have register to both NNs.
// first called by connectToNNAndHandshake, then called by reRegister.
Mockito.verify(mockNN, Mockito.times(2))
.registerDatanode(Mockito.any());
return true;
} catch (Throwable t) {
LOG.info("waiting on block registerDatanode: " + t.getMessage());
return false;
}
}
}, 500, 10000);
}
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageReceivedDeletedBlocks[]> captor =
ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Mockito.verify(mockNN).blockReceivedAndDeleted(
Mockito.any(),
Mockito.eq(fakeBlockPoolId),
captor.capture());
return true;
} catch (Throwable t) {
return false;
}
}
}, 100, 10000);
return captor.getValue()[0].getBlocks();
}
private void setTimeForSynchronousBPOSCalls() {
if (firstCallTime == 0) {
firstCallTime = Time.now();
} else {
secondCallTime = Time.now();
}
}
/**
* Record blocks counts of block report and total adding blocks count of IBR
* which assume no deleting blocks here.
*/
private void countBlockReportItems(final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN,
final Set<Long> blocks) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
// Record blocks count about the last time block report.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
for (BlockReportReplica brr : list[0].getBlocks()) {
blocks.add(brr.getBlockId());
}
return null;
}).when(mockNN).blockReport(
Mockito.any(),
Mockito.eq(fakeBlockPoolId),
captor.capture(),
Mockito.any()
);
// Record total adding blocks count and assume no deleting blocks here.
Mockito.doAnswer((Answer<Object>) invocation -> {
Object[] arguments = invocation.getArguments();
StorageReceivedDeletedBlocks[] list =
(StorageReceivedDeletedBlocks[])arguments[2];
for (ReceivedDeletedBlockInfo rdbi : list[0].getBlocks()) {
blocks.add(rdbi.getBlock().getBlockId());
}
return null;
}).when(mockNN).blockReceivedAndDeleted(
Mockito.any(),
Mockito.eq(fakeBlockPoolId),
Mockito.any());
}
private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
private final int nnIdx;
public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
this.nnIdx = nnIdx;
}
// For active namenode we will record the processTime and for standby
// namenode we will sleep for 5 seconds (This will simulate the situation
// where the standby namenode is down ) .
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (nnIdx == 0) {
setTimeForSynchronousBPOSCalls();
} else {
Thread.sleep(5000);
}
return null;
}
}
/**
* This test case test the {@link BPOfferService#reportBadBlocks} method
* such that if call to standby namenode times out then that should not
* affect the active namenode heartbeat processing since this function
* are in writeLock.
* @throws Exception
*/
@Test
public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
assertSame(mockNN1, bpos.getActiveNN());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
.when(mockNN1).reportBadBlocks(Mockito.any());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
.when(mockNN2).reportBadBlocks(Mockito.any());
bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageType());
bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageType());
Thread.sleep(10000);
long difference = secondCallTime - firstCallTime;
assertTrue("Active namenode reportBadBlock processing should be "
+ "independent of standby namenode reportBadBlock processing ",
difference < 5000);
} finally {
bpos.stop();
bpos.join();
}
}
/**
* This test case test the {@link BPOfferService#trySendErrorReport} method
* such that if call to standby namenode times out then that should not
* affect the active namenode heartbeat processing since this function
* are in writeLock.
* @throws Exception
*/
@Test
public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
assertSame(mockNN1, bpos.getActiveNN());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
.when(mockNN1).errorReport(Mockito.any(),
Mockito.anyInt(), Mockito.anyString());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
.when(mockNN2).errorReport(Mockito.any(),
Mockito.anyInt(), Mockito.anyString());
String errorString = "Can't send invalid block " + FAKE_BLOCK;
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
Thread.sleep(10000);
long difference = secondCallTime - firstCallTime;
assertTrue("Active namenode trySendErrorReport processing "
+ "should be independent of standby namenode trySendErrorReport"
+ " processing ", difference < 5000);
} finally {
bpos.stop();
bpos.join();
}
}
/**
* This test case tests whether the {@BPServiceActor#processQueueMessages}
* adds back the error report back to the queue when
* {BPServiceActorAction#reportTo} throws an IOException
* @throws Exception
*/
@Test
public void testTrySendErrorReportWhenNNThrowsIOException()
throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
assertSame(mockNN1, bpos.getActiveNN());
// Throw an IOException when this function is first called which will
// in turn add that errorReport back to the bpThreadQueue and let it
// process the next time.
Mockito.doThrow(new IOException("Throw IOException in the first call."))
.doAnswer((Answer<Void>) invocation -> {
secondCallTime = Time.now();
return null;
}).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
Mockito.anyInt(), Mockito.anyString());
String errorString = "Can't send invalid block " + FAKE_BLOCK;
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
GenericTestUtils.waitFor(() -> secondCallTime != 0, 100, 20000);
assertTrue("Active namenode didn't add the report back to the queue "
+ "when errorReport threw IOException", secondCallTime != 0);
} finally {
bpos.stop();
bpos.join();
}
}
/**
* This test case doesn't add the reportBadBlock request to
* {@link BPServiceActor#bpThreadEnqueue} when the Standby namenode throws
* {@link StandbyException}
* @throws Exception
*/
@Test
public void testReportBadBlocksWhenNNThrowsStandbyException()
throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
assertSame(mockNN1, bpos.getActiveNN());
// Return nothing when active Active Namenode calls reportBadBlocks
Mockito.doNothing().when(mockNN1).reportBadBlocks
(Mockito.any(LocatedBlock[].class));
RemoteException re = new RemoteException(StandbyException.class.
getName(), "Operation category WRITE is not supported in state "
+ "standby", RpcErrorCodeProto.ERROR_APPLICATION);
// Return StandbyException wrapped in RemoteException when Standby NN
// calls reportBadBlocks
Mockito.doThrow(re).when(mockNN2).reportBadBlocks
(Mockito.any(LocatedBlock[].class));
bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
.getStorageType());
// Send heartbeat so that the BpServiceActor can report bad block to
// namenode
bpos.triggerHeartbeatForTests();
Mockito.verify(mockNN2, Mockito.times(1))
.reportBadBlocks(Mockito.any(LocatedBlock[].class));
// Trigger another heartbeat, this will send reportBadBlock again if it
// is present in the queue.
bpos.triggerHeartbeatForTests();
Mockito.verify(mockNN2, Mockito.times(1))
.reportBadBlocks(Mockito.any(LocatedBlock[].class));
} finally {
bpos.stop();
bpos.join();
}
}
/*
* HDFS-9917 : Standby IBR accumulation when Standby was down.
*/
@Test
public void testIBRClearanceForStandbyOnReRegister() throws Exception {
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
bpos.start();
try {
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// Have NN1 claim active at txid 1
mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
bpos.triggerHeartbeatForTests();
// Now mockNN1 is acting like active namenode and mockNN2 as Standby
assertSame(mockNN1, bpos.getActiveNN());
// Return nothing when active Active Namenode gets IBRs
Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
.any(StorageReceivedDeletedBlocks[].class));
final IOException re = new IOException(
"Standby NN is currently not able to process IBR");
final AtomicBoolean ibrReported = new AtomicBoolean(false);
// throw exception for standby when first IBR is receieved
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ibrReported.set(true);
throw re;
}
}).when(mockNN2).blockReceivedAndDeleted(
Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
.any(StorageReceivedDeletedBlocks[].class));
DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
// Add IBRs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
// Send heartbeat so that the BpServiceActor can send IBR to
// namenode
bpos.triggerHeartbeatForTests();
// Wait till first IBR is received at standbyNN. Just for confirmation.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ibrReported.get();
}
}, 100, 1000);
// Send register command back to Datanode to reRegister().
// After reRegister IBRs should be cleared.
datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
assertEquals(
"IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
bpos));
bpos.triggerHeartbeatForTests();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return getStandbyIBRSize(bpos) == 0;
}
}, 100, 1000);
} finally {
bpos.stop();
bpos.join();
}
}
private int getStandbyIBRSize(BPOfferService bpos) {
List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
for (BPServiceActor bpServiceActor : bpServiceActors) {
if (bpServiceActor.state == HAServiceState.STANDBY) {
return bpServiceActor.getIbrManager().getPendingIBRSize();
}
}
return -1;
}
/*
*
*/
@Test
public void testNNHAStateUpdateFromVersionRequest() throws Exception {
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
Mockito.doReturn(true).when(mockDn).areHeartbeatsDisabledForTests();
BPServiceActor actor = bpos.getBPServiceActors().get(0);
bpos.start();
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// getNamespaceInfo() will not include HAServiceState
NamespaceInfo nsInfo = mockNN1.versionRequest();
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
assertNull(bpos.getActiveNN());
// Change mock so getNamespaceInfo() will include HAServiceState
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0,
HAServiceState.ACTIVE)).when(mockNN1).versionRequest();
// Update the bpos NamespaceInfo
nsInfo = mockNN1.versionRequest();
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
assertNotNull(bpos.getActiveNN());
}
@Test(timeout = 30000)
public void testRefreshNameNodes() throws Exception {
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
bpos.start();
try {
waitForBothActors(bpos);
// The DN should have register to both NNs.
Mockito.verify(mockNN1)
.registerDatanode(Mockito.any());
Mockito.verify(mockNN2)
.registerDatanode(Mockito.any());
// Should get block reports from both NNs
waitForBlockReport(mockNN1);
waitForBlockReport(mockNN2);
// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK,
mockNN1);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
ret = waitForBlockReceived(FAKE_BLOCK, mockNN2);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
// add new standby
DatanodeProtocolClientSideTranslatorPB mockNN3 = setupNNMock(2);
Mockito.doReturn(mockNN3).when(mockDn)
.connectToNN(Mockito.eq(new InetSocketAddress(2)));
ArrayList<InetSocketAddress> addrs = new ArrayList<>();
ArrayList<InetSocketAddress> lifelineAddrs = new ArrayList<>(
addrs.size());
// mockNN1
addrs.add(new InetSocketAddress(0));
lifelineAddrs.add(null);
// mockNN3
addrs.add(new InetSocketAddress(2));
lifelineAddrs.add(null);
ArrayList<String> nnIds = new ArrayList<>(addrs.size());
for (int i = 0; i < addrs.size(); i++) {
nnIds.add("nn" + i);
}
bpos.refreshNNList("serviceId", nnIds, addrs, lifelineAddrs);
assertEquals(2, bpos.getBPServiceActors().size());
// wait for handshake to run
Thread.sleep(1000);
// verify new NN registered
Mockito.verify(mockNN3).registerDatanode(Mockito.any());
// When we receive a block, it should report it to both NNs
bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, mockStorageID, false);
// veridfy new NN recieved block report
ret = waitForBlockReceived(FAKE_BLOCK, mockNN3);
assertEquals(1, ret.length);
assertEquals(FAKE_BLOCK.getLocalBlock(), ret[0].getBlock());
} finally {
bpos.stop();
bpos.join();
}
}
@Test(timeout = 15000)
public void testRefreshLeaseId() throws Exception {
Mockito.when(mockNN1.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
//heartbeat to old NN instance
.thenAnswer(new HeartbeatAnswer(0))
//heartbeat to new NN instance with Register Command
.thenAnswer(new HeartbeatRegisterAnswer(0))
.thenAnswer(new HeartbeatAnswer(0));
Mockito.when(mockNN1.blockReport(
Mockito.any(DatanodeRegistration.class),
Mockito.anyString(),
Mockito.any(StorageBlockReport[].class),
Mockito.any(BlockReportContext.class)))
.thenAnswer(
new Answer() {
@Override
public Object answer(InvocationOnMock invocation)
throws Throwable {
BlockReportContext context =
(BlockReportContext) invocation.getArguments()[3];
long leaseId = context.getLeaseId();
LOG.info("leaseId = "+leaseId);
// leaseId == 1 means DN make block report with old leaseId
// just reject and wait until DN request for a new leaseId
if(leaseId == 1) {
firstLeaseId = leaseId;
InvalidBlockReportLeaseException e =
new InvalidBlockReportLeaseException(context.getReportId(), 1);
throw new RemoteException(e.getClass().getName(), e.getMessage());
} else {
secondLeaseId = leaseId;
return null;
}
}
});
BPOfferService bpos = setupBPOSForNNs(mockNN1);
bpos.start();
try {
waitForInitialization(bpos);
// Should call registration 2 times
waitForRegistration(mockNN1, 2);
assertEquals(1L, firstLeaseId);
while(secondLeaseId != 2L) {
Thread.sleep(1000);
}
} finally {
bpos.stop();
}
}
@Test(timeout = 15000)
public void testSetIsSlownode() throws Exception {
assertEquals(mockDn.isSlownode(), false);
Mockito.when(mockNN1.sendHeartbeat(
Mockito.any(DatanodeRegistration.class),
Mockito.any(StorageReport[].class),
Mockito.anyLong(),
Mockito.anyLong(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
.thenAnswer(new HeartbeatIsSlownodeAnswer(0));
BPOfferService bpos = setupBPOSForNNs(mockNN1);
bpos.start();
try {
waitForInitialization(bpos);
bpos.triggerHeartbeatForTests();
assertFalse(bpos.isSlownode());
isSlownode = true;
bpos.triggerHeartbeatForTests();
assertTrue(bpos.isSlownode());
isSlownode = false;
bpos.triggerHeartbeatForTests();
assertFalse(bpos.isSlownode());
} finally {
bpos.stop();
}
}
@Test(timeout = 15000)
public void testCommandProcessingThread() throws Exception {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, baseDir.getRoot()).build()) {
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
// Try to write file and trigger NN send back command to DataNode.
FileSystem fs = cluster.getFileSystem();
Path file = new Path("/test");
DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L);
MetricsRecordBuilder mrb = getMetrics(datanode.getMetrics().name());
assertTrue("Process command nums is not expected.",
getLongCounter("NumProcessedCommands", mrb) > 0);
assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
// Check new metric result about processedCommandsOp.
// One command send back to DataNode here is #FinalizeCommand.
assertCounter("ProcessedCommandsOpNumOps", 1L, mrb);
}
}
@Test(timeout = 5000)
public void testCommandProcessingThreadExit() throws Exception {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf, baseDir.getRoot()).
numDataNodes(1).build()) {
List<DataNode> datanodes = cluster.getDataNodes();
DataNode dataNode = datanodes.get(0);
List<BPOfferService> allBpOs = dataNode.getAllBpOs();
BPOfferService bpos = allBpOs.get(0);
waitForInitialization(bpos);
BPServiceActor actor = bpos.getBPServiceActors().get(0);
// Stop and wait util actor exit.
actor.stopCommandProcessingThread();
GenericTestUtils.waitFor(() -> !actor.isAlive(), 100, 3000);
}
}
}