TestObserverNode.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test main functionality of ObserverNode.
*/
public class TestObserverNode {
public static final Logger LOG =
LoggerFactory.getLogger(TestObserverNode.class.getName());
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private final Path testPath= new Path("/TestObserverNode");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
// Set observer probe retry period to 0. Required by the tests that restart
// Observer and immediately try to read from it.
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
dfsCluster = qjmhaCluster.getDfsCluster();
}
@Before
public void setUp() throws Exception {
setObserverRead(true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
assertEquals("NN[0] should be active", HAServiceState.ACTIVE,
getServiceState(dfsCluster.getNameNode(0)));
assertEquals("NN[1] should be standby", HAServiceState.STANDBY,
getServiceState(dfsCluster.getNameNode(1)));
assertEquals("NN[2] should be observer", HAServiceState.OBSERVER,
getServiceState(dfsCluster.getNameNode(2)));
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testObserverRequeue() throws Exception {
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(1);
FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
try {
// Stop EditlogTailer of Observer NameNode.
observerFsNS.getEditLogTailer().stop();
long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();
ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(
() -> {
Path tmpTestPath = new Path("/TestObserverRequeue");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
// This operation will be blocked in ObserverNameNode
// until EditlogTailer tailed edits from journalNode.
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(2);
return fileStatus;
}, 0, TimeUnit.SECONDS);
GenericTestUtils.waitFor(() -> obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum,
50, 10000);
observerFsNS.getEditLogTailer().doTailEdits();
FileStatus fileStatus = scheduledFuture.get(10000, TimeUnit.MILLISECONDS);
assertNotNull(fileStatus);
} finally {
EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
observerFsNS.setEditLogTailerForTests(editLogTailer);
editLogTailer.start();
}
}
@Test
public void testNoActiveToObserver() throws Exception {
try {
dfsCluster.transitionToObserver(0);
} catch (ServiceFailedException e) {
return;
}
fail("active cannot be transitioned to observer");
}
/**
* Test that non-ClientProtocol proxies such as
* {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work
* when run in an environment with observers.
*/
@Test
public void testGetGroups() throws Exception {
GetGroups getGroups = new GetGroups(conf);
assertEquals(0, getGroups.run(new String[0]));
}
@Test
public void testNoObserverToActive() throws Exception {
try {
dfsCluster.transitionToActive(2);
} catch (ServiceFailedException e) {
return;
}
fail("observer cannot be transitioned to active");
}
@Test
public void testSimpleRead() throws Exception {
Path testPath2 = new Path(testPath, "test2");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(0);
}
@Test
public void testConfigStartup() throws Exception {
int nnIdx = dfsCluster.getNumNameNodes() - 1;
// Transition all current observers to standby
for (int i = 0; i < dfsCluster.getNumNameNodes(); i++) {
if (dfsCluster.getNameNode(i).isObserverState()) {
dfsCluster.transitionToStandby(i);
}
}
// Confirm that the namenode at nnIdx is standby
assertTrue("The NameNode is observer despite being transitioned to standby",
dfsCluster.getNameNode(nnIdx).isStandbyState());
// Restart the NameNode with observer startup option as false
dfsCluster.getConfiguration(nnIdx)
.setBoolean(DFS_NAMENODE_OBSERVER_ENABLED_KEY, false);
dfsCluster.restartNameNode(nnIdx);
// Verify that the NameNode is not in Observer state
dfsCluster.waitNameNodeUp(nnIdx);
assertTrue("The NameNode started as Observer despite "
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false",
dfsCluster.getNameNode(nnIdx).isStandbyState());
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
// The first request goes to the active because it has not refreshed yet;
// the second would go to the observer if it was not in standby
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
dfs.getFileStatus(testPath);
assertSentTo(0);
Path testPath2 = new Path(testPath, "test2");
// Restart the NameNode with the observer startup option as true
dfsCluster.getConfiguration(nnIdx)
.setBoolean(DFS_NAMENODE_OBSERVER_ENABLED_KEY, true);
dfsCluster.restartNameNode(nnIdx);
// Check that the NameNode is in Observer state
dfsCluster.waitNameNodeUp(nnIdx);
assertTrue("The NameNode did not start as Observer despite "
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true",
dfsCluster.getNameNode(nnIdx).isObserverState());
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(0);
// The first request goes to the active because it has not refreshed yet;
// the second will properly go to the observer
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath2);
dfs.getFileStatus(testPath2);
assertSentTo(nnIdx);
}
@Test
public void testFailover() throws Exception {
Path testPath2 = new Path(testPath, "test2");
setObserverRead(false);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfs.getFileStatus(testPath);
assertSentTo(0);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(1);
dfs.getFileStatus(testPath);
assertSentTo(1);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testDoubleFailover() throws Exception {
Path testPath2 = new Path(testPath, "test2");
Path testPath3 = new Path(testPath, "test3");
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfs.mkdir(testPath2, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
dfsCluster.rollEditLogAndTail(1);
dfs.getFileStatus(testPath2);
assertSentTo(2);
dfs.mkdir(testPath3, FsPermission.getDefault());
assertSentTo(1);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath3);
assertSentTo(2);
dfs.delete(testPath3, false);
assertSentTo(0);
}
@Test
public void testObserverShutdown() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Shutdown the observer - requests should go to active
dfsCluster.shutdownNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(0);
// Start the observer again - requests should go to observer
dfsCluster.restartNameNode(2);
dfsCluster.transitionToObserver(2);
// The first request goes to the active because it has not refreshed yet;
// the second will properly go to the observer
dfs.getFileStatus(testPath);
dfs.getFileStatus(testPath);
assertSentTo(2);
}
@Test
public void testObserverFailOverAndShutdown() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(1);
dfsCluster.waitActive(1);
// Shutdown the observer - requests should go to active
dfsCluster.shutdownNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(1);
// Start the observer again - requests should go to observer
dfsCluster.restartNameNode(2);
dfs.getFileStatus(testPath);
assertSentTo(1);
dfsCluster.transitionToObserver(2);
dfs.getFileStatus(testPath);
// The first request goes to the active because it has not refreshed yet;
// the second will properly go to the observer
dfs.getFileStatus(testPath);
assertSentTo(2);
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToActive(0);
dfsCluster.waitActive(0);
}
@Test
public void testBootstrap() throws Exception {
for (URI u : dfsCluster.getNameDirs(2)) {
File dir = new File(u.getPath());
assertTrue(FileUtil.fullyDelete(dir));
}
int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
dfsCluster.getConfiguration(2)
);
assertEquals(0, rc);
}
/**
* Test the case where Observer should throw RetriableException, just like
* active NN, for certain open() calls where block locations are not
* available. See HDFS-13898 for details.
*/
@Test
public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Create a new file - the request should go to active.
dfs.create(testPath, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.open(testPath).close();
assertSentTo(2);
// Set observer to safe mode.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.ENTER);
// Mock block manager for observer to generate some fake blocks which
// will trigger the (retriable) safe mode exception.
BlockManager bmSpy =
NameNodeAdapterMockitoUtil.spyOnBlockManager(dfsCluster.getNameNode(2));
doAnswer((invocation) -> {
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
List<LocatedBlock> fakeBlocks = new ArrayList<>();
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null);
}).when(bmSpy).createLocatedBlocks(any(), anyLong(), anyBoolean(),
anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(), any());
// Open the file again - it should throw retriable exception and then
// failover to active.
dfs.open(testPath).close();
assertSentTo(0);
Mockito.reset(bmSpy);
// Remove safe mode on observer, request should still go to it.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.LEAVE);
dfs.open(testPath).close();
assertSentTo(2);
}
@Test
public void testObserverNodeBlockMissingRetry() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
// Mock block manager for observer to generate some fake blocks which
// will trigger the block missing exception.
BlockManager bmSpy = NameNodeAdapterMockitoUtil
.spyOnBlockManager(dfsCluster.getNameNode(2));
doAnswer((invocation) -> {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Remove the datanode info for the only block so it will throw
// BlockMissingException and retry.
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(0, false, fakeBlocks, null, true, null, null);
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());
dfs.open(testPath);
assertSentTo(0);
dfs.getClient().listPaths("/", new byte[0], true);
assertSentTo(0);
dfs.getClient().getLocatedFileInfo(testPath.toString(), false);
assertSentTo(0);
Mockito.reset(bmSpy);
}
@Test
public void testFsckWithObserver() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
final String result = TestFsck.runFsck(conf, 0, true, "/");
LOG.info("result=" + result);
assertTrue(result.contains("Status: HEALTHY"));
}
/**
* Test that, if a write happens happens to go to Observer,
* Observer would throw {@link ObserverRetryOnActiveException},
* to inform client to retry on Active
*
* @throws Exception
*/
@Test
public void testObserverRetryActiveException() throws Exception {
boolean thrownRetryException = false;
try {
// Force a write on Observer, which should throw
// retry on active exception.
dfsCluster.getNameNode(2)
.getRpcServer()
.mkdirs("/testActiveRetryException",
FsPermission.createImmutable((short) 0755), true);
} catch (ObserverRetryOnActiveException orae) {
thrownRetryException = true;
}
assertTrue(thrownRetryException);
}
/**
* Test that for open call, if access time update is required,
* the open call should go to active, instead of observer.
*
* @throws Exception
*/
@Test
public void testAccessTimeUpdateRedirectToActive() throws Exception {
// Create a new pat to not mess up other tests
Path tmpTestPath = new Path("/TestObserverNodeAccessTime");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
dfs.open(tmpTestPath).close();
assertSentTo(2);
// Set access time to a time in the far past.
// So that next read call is guaranteed to
// have passed access time period.
dfs.setTimes(tmpTestPath, 0, 0);
// Verify that aTime update redirects on Active
dfs.open(tmpTestPath).close();
assertSentTo(0);
}
/**
* Test that if client connects to Active it does not try to find Observer
* on next calls during some period of time.
*/
@Test
public void testStickyActive() throws Exception {
Path testFile = new Path(testPath, "testStickyActive");
Configuration newConf = new Configuration(conf);
// Observer probe retry period set to 5 sec
newConf.setLong(OBSERVER_PROBE_RETRY_PERIOD_KEY, 5000);
// Disable cache, so that a new client actually gets created with new conf.
newConf.setBoolean("fs.hdfs.impl.disable.cache", true);
DistributedFileSystem newFs = (DistributedFileSystem) FileSystem.get(newConf);
newFs.create(testFile, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
// No Observers present, should still go to Active
dfsCluster.transitionToStandby(2);
assertEquals("NN[2] should be standby", HAServiceState.STANDBY,
getServiceState(dfsCluster.getNameNode(2)));
newFs.open(testFile).close();
assertSentTo(0);
// Restore Observer
int newObserver = 1;
dfsCluster.transitionToObserver(newObserver);
assertEquals("NN[" + newObserver + "] should be observer",
HAServiceState.OBSERVER,
getServiceState(dfsCluster.getNameNode(newObserver)));
long startTime = Time.monotonicNow();
try {
while(Time.monotonicNow() - startTime <= 5000) {
newFs.open(testFile).close();
// Client should still talk to Active
assertSentTo(0);
Thread.sleep(200);
}
} catch(AssertionError ae) {
if(Time.monotonicNow() - startTime <= 5000) {
throw ae;
}
assertSentTo(newObserver);
} finally {
dfsCluster.transitionToStandby(1);
dfsCluster.transitionToObserver(2);
}
}
@Test
public void testFsckDelete() throws Exception {
setObserverRead(true);
DFSTestUtil.createFile(dfs, testPath, 512, (short) 1, 0);
DFSTestUtil.waitForReplication(dfs, testPath, (short) 1, 5000);
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, testPath);
int dnToCorrupt = DFSTestUtil.firstDnWithBlock(dfsCluster, block);
FSNamesystem ns = dfsCluster.getNameNode(0).getNamesystem();
// corrupt Replicas are detected on restarting datanode
dfsCluster.corruptReplica(dnToCorrupt, block);
dfsCluster.restartDataNode(dnToCorrupt);
DFSTestUtil.waitCorruptReplicas(dfs, ns, testPath, block, 1);
final String result = TestFsck.runFsck(conf, 1, true, "/", "-delete");
// filesystem should be in corrupt state
LOG.info("result=" + result);
assertTrue(result.contains("The filesystem under path '/' is CORRUPT"));
}
/**
* The test models the race of two mkdirs RPC calls on the same path to
* Active NameNode. The first arrived call will journal a mkdirs transaction.
* The subsequent call hitting the NameNode before the mkdirs transaction is
* synced will see that the directory already exists, but will obtain
* lastSeenStateId smaller than the txId of the mkdirs transaction
* since the latter hasn't been synced yet.
* This causes stale read from Observer for the second client.
* See HDFS-15915.
*/
@Test
public void testMkdirsRaceWithObserverRead() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
FSEditLog spyEditLog = NameNodeAdapterMockitoUtil.spyDelayMkDirTransaction(
dfsCluster.getNameNode(0), 100);
final int numThreads = 4;
ClientState[] clientStates = new ClientState[numThreads];
final ExecutorService threadPool =
HadoopExecutors.newFixedThreadPool(numThreads);
final Future<?>[] futures = new Future<?>[numThreads];
Configuration conf2 = new Configuration(conf);
// Disable FS cache so that different DFS clients are used
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
for (int i = 0; i < numThreads; i++) {
clientStates[i] = new ClientState();
futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
}
Thread.sleep(150); // wait until mkdir is logged
long activStateId =
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
dfsCluster.rollEditLogAndTail(0);
boolean finished = true;
// wait for all dispatcher threads to finish
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
finished = false;
LOG.warn("MkDirRunner thread failed", e.getCause());
}
}
assertTrue("Not all threads finished", finished);
threadPool.shutdown();
assertEquals("Active and Observer stateIds don't match",
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
for (int i = 0; i < numThreads; i++) {
assertTrue("Client #" + i
+ " lastSeenStateId=" + clientStates[i].lastSeenStateId
+ " activStateId=" + activStateId
+ "\n" + clientStates[i].fnfe,
clientStates[i].lastSeenStateId >= activStateId &&
clientStates[i].fnfe == null);
}
// Restore edit log
Mockito.reset(spyEditLog);
}
static class ClientState {
private long lastSeenStateId = -7;
private FileNotFoundException fnfe;
}
static class MkDirRunner implements Runnable {
private static final Path DIR_PATH =
new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
private DistributedFileSystem fs;
private ClientState clientState;
MkDirRunner(Configuration conf, ClientState cs) throws IOException {
super();
fs = (DistributedFileSystem) FileSystem.get(conf);
clientState = cs;
}
@Override
public void run() {
try {
fs.mkdirs(DIR_PATH);
clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs);
assertSentTo(fs, 0);
FileStatus stat = fs.getFileStatus(DIR_PATH);
assertSentTo(fs, 2);
assertTrue("Should be a directory", stat.isDirectory());
} catch (FileNotFoundException ioe) {
clientState.fnfe = ioe;
} catch (Exception e) {
fail("Unexpected exception: " + e);
}
}
}
@Test
public void testGetListingForDeletedDir() throws Exception {
Path path = new Path("/dir1/dir2/testFile");
dfs.create(path).close();
assertTrue(dfs.delete(new Path("/dir1/dir2"), true));
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> dfs.listLocatedStatus(new Path("/dir1/dir2")));
}
@Test
public void testSimpleReadEmptyDirOrFile() throws IOException {
// read empty dir
dfs.mkdirs(new Path("/emptyDir"));
assertSentTo(0);
dfs.getClient().listPaths("/", new byte[0], true);
assertSentTo(2);
dfs.getClient().getLocatedFileInfo("/emptyDir", true);
assertSentTo(2);
// read empty file
dfs.create(new Path("/emptyFile"), (short)1);
assertSentTo(0);
dfs.getClient().getLocatedFileInfo("/emptyFile", true);
assertSentTo(2);
dfs.getClient().getBlockLocations("/emptyFile", 0, 1);
assertSentTo(2);
}
private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
private static void setObserverRead(boolean flag) throws Exception {
dfs = HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
}
}