TestConsistentReadsObserver.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test consistency of reads while accessing an ObserverNode.
* The tests are based on traditional (non fast path) edits tailing.
*/
public class TestConsistentReadsObserver {
public static final Logger LOG =
LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName());
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
private DistributedFileSystem dfs;
private final Path testPath= new Path("/TestConsistentReadsObserver");
@BeforeClass
public static void startUpCluster() throws Exception {
conf = new Configuration();
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
// disable fast tailing here because this test's assertions are based on the
// timing of explicitly called rollEditLogAndTail. Although this means this
// test takes some time to run
// TODO: revisit if there is a better way.
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
dfsCluster = qjmhaCluster.getDfsCluster();
}
@Before
public void setUp() throws Exception {
dfs = setObserverRead(true);
}
@After
public void cleanUp() throws IOException {
dfs.delete(testPath, true);
}
@AfterClass
public static void shutDownCluster() throws IOException {
if (qjmhaCluster != null) {
qjmhaCluster.shutdown();
}
}
@Test
public void testRequeueCall() throws Exception {
// Update the configuration just for the observer, by enabling
// IPC backoff and using the test scheduler class, which starts to backoff
// after certain number of calls.
final int observerIdx = 2;
NameNode nn = dfsCluster.getNameNode(observerIdx);
int port = nn.getNameNodeAddress().getPort();
Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
Configuration configuration = new Configuration(originalConf);
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
TestRpcScheduler.class.getName());
configuration.setBoolean(prefix
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
// Since we haven't tailed edit logs on the observer, it will fall behind
// and keep re-queueing the incoming request. Eventually, RPC backoff will
// be triggered and client should retry active NN.
dfs.getFileStatus(testPath);
assertSentTo(0);
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
// reset the original call queue
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
}
@Test
public void testMsyncSimple() throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
// Making an uncoordinated call, which initialize the proxy
// to Observer node.
dfs.getClient().getHAServiceState();
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(() -> {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
readStatus.set(1);
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
assertEquals(0, readStatus.get());
dfsCluster.rollEditLogAndTail(0);
// wait a while for all the change to be done
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
// the reader should have succeed.
assertEquals(1, readStatus.get());
}
private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
Configuration conf2 = new Configuration(conf);
// Disable FS cache so two different DFS clients will be used.
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
if (autoMsync) {
conf2.setTimeDuration(
ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
+ "." + dfs.getUri().getHost(),
autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
}
DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
// Initialize the proxies for Observer Node.
dfs.getClient().getHAServiceState();
// This initialization will perform the msync-on-startup, so that another
// form of msync is required later
dfs2.getClient().getHAServiceState();
// Advance Observer's state ID so it is ahead of client's.
dfs.mkdir(new Path("/test"), FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(() -> {
try {
// After msync, client should have the latest state ID from active.
// Therefore, the subsequent getFileStatus call should succeed.
if (!autoMsync) {
// If not testing auto-msync, perform an explicit one here
dfs2.msync();
} else if (autoMsyncPeriodMs > 0) {
Thread.sleep(autoMsyncPeriodMs);
}
dfs2.getFileStatus(testPath);
if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
readStatus.set(1);
} else {
readStatus.set(-1);
}
} catch (Exception e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
Thread.sleep(100);
assertEquals(0, readStatus.get());
dfsCluster.rollEditLogAndTail(0);
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
assertEquals(1, readStatus.get());
}
@Test
public void testExplicitMsync() throws Exception {
testMsync(false, -1);
}
@Test
public void testAutoMsyncPeriod0() throws Exception {
testMsync(true, 0);
}
@Test
public void testAutoMsyncPeriod5() throws Exception {
testMsync(true, 5);
}
@Test(expected = TimeoutException.class)
public void testAutoMsyncLongPeriod() throws Exception {
// This should fail since the auto-msync is never activated
testMsync(true, Long.MAX_VALUE);
}
// A new client should first contact the active, before using an observer,
// to ensure that it is up-to-date with the current state
@Test
public void testCallFromNewClient() throws Exception {
// Set the order of nodes: Observer, Standby, Active
// This is to ensure that test doesn't pass trivially because the active is
// the first node contacted
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToObserver(0);
dfsCluster.transitionToStandby(2);
dfsCluster.transitionToActive(2);
try {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
// Initialize the proxies for Observer Node.
dfs.getClient().getHAServiceState();
// Advance Observer's state ID so it is ahead of client's.
dfs.mkdir(new Path("/test"), FsPermission.getDefault());
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
dfsCluster.getNameNode(0)
.getNamesystem().getEditLogTailer().doTailEdits();
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(2);
Configuration conf2 = new Configuration(conf);
// Disable FS cache so two different DFS clients will be used.
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
DistributedFileSystem dfs2 =
(DistributedFileSystem) FileSystem.get(conf2);
dfs2.getClient().getHAServiceState();
Thread reader = new Thread(() -> {
try {
dfs2.getFileStatus(testPath);
readStatus.set(1);
} catch (Exception e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
Thread.sleep(100);
assertEquals(0, readStatus.get());
dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
dfsCluster.getNameNode(0)
.getNamesystem().getEditLogTailer().doTailEdits();
GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
assertEquals(1, readStatus.get());
} finally {
// Put the cluster back the way it was when the test started
dfsCluster.transitionToStandby(2);
dfsCluster.transitionToObserver(2);
dfsCluster.transitionToStandby(0);
dfsCluster.transitionToActive(0);
}
}
@Test
public void testUncoordinatedCall() throws Exception {
// make a write call so that client will be ahead of
// observer for now.
dfs.mkdir(testPath, FsPermission.getDefault());
// a status flag, initialized to 0, after reader finished, this will be
// updated to 1, -1 on error
AtomicInteger readStatus = new AtomicInteger(0);
// create a separate thread to make a blocking read.
Thread reader = new Thread(() -> {
try {
// this read call will block until server state catches up. But due to
// configuration, this will take a very long time.
dfs.getClient().getFileInfo("/");
readStatus.set(1);
fail("Should have been interrupted before getting here.");
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
});
reader.start();
long before = Time.now();
dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
long after = Time.now();
// should succeed immediately, because datanodeReport is marked an
// uncoordinated call, and will not be waiting for server to catch up.
assertTrue(after - before < 200);
// by this time, reader thread should still be blocking, so the status not
// updated
assertEquals(0, readStatus.get());
Thread.sleep(5000);
// reader thread status should still be unchanged after 5 sec...
assertEquals(0, readStatus.get());
// and the reader thread is not dead, so it must be still waiting
assertEquals(Thread.State.WAITING, reader.getState());
reader.interrupt();
}
@Test
public void testRequestFromNonObserverProxyProvider() throws Exception {
// Create another HDFS client using ConfiguredFailoverProvider
Configuration conf2 = new Configuration(conf);
// Populate the above configuration with only a single observer in the
// namenode list. Also reduce retries to make test finish faster.
HATestUtil.setFailoverConfigurations(
conf2,
HATestUtil.getLogicalHostname(dfsCluster),
Collections.singletonList(
dfsCluster.getNameNode(2).getNameNodeAddress()),
ConfiguredFailoverProxyProvider.class);
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
FileSystem dfs2 = FileSystem.get(conf2);
dfs.mkdir(testPath, FsPermission.getDefault());
dfsCluster.rollEditLogAndTail(0);
try {
// Request should be rejected by observer and throw StandbyException
dfs2.listStatus(testPath);
fail("listStatus should have thrown exception");
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException();
assertTrue("should have thrown StandbyException but got "
+ e.getClass().getSimpleName(),
e instanceof StandbyException);
}
}
@Test(timeout=10000)
public void testMsyncFileContext() throws Exception {
NameNode nn0 = dfsCluster.getNameNode(0);
NameNode nn2 = dfsCluster.getNameNode(2);
HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
st = nn2.getRpcServer().getServiceStatus();
assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());
FileContext fc = FileContext.getFileContext(conf);
// initialize observer proxy for FileContext
fc.getFsStatus(testPath);
Path p = new Path(testPath, "testMsyncFileContext");
fc.mkdir(p, FsPermission.getDefault(), true);
fc.msync();
dfsCluster.rollEditLogAndTail(0);
LOG.info("State id active = {}, Stat id observer = {}",
nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
try {
// if getFileStatus is taking too long due to server requeueing
// the test will time out
fc.getFileStatus(p);
} catch (FileNotFoundException e) {
fail("File should exist on Observer after msync");
}
}
@Test
public void testRpcQueueTimeNumOpsMetrics() throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
// Making an uncoordinated call, which initialize the proxy
// to Observer node.
dfs.getClient().getHAServiceState();
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(new Runnable() {
@Override
public void run() {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
readStatus.set(1);
} catch (IOException e) {
e.printStackTrace();
readStatus.set(-1);
}
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
assertEquals(0, readStatus.get());
dfsCluster.rollEditLogAndTail(0);
// wait a while for all the change to be done
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return readStatus.get() != 0;
}
}, 100, 10000);
// the reader should have succeed.
assertEquals(1, readStatus.get());
final int observerIdx = 2;
NameNode observerNN = dfsCluster.getNameNode(observerIdx);
MetricsRecordBuilder rpcMetrics =
getMetrics("RpcActivityForPort"
+ observerNN.getNameNodeAddress().getPort());
long rpcQueueTimeNumOps = getLongCounter("RpcQueueTimeNumOps", rpcMetrics);
long rpcProcessingTimeNumOps = getLongCounter("RpcProcessingTimeNumOps",
rpcMetrics);
assertEquals(rpcQueueTimeNumOps, rpcProcessingTimeNumOps);
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
return HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
}
/**
* A dummy test scheduler that starts backoff after a fixed number
* of requests.
*/
public static class TestRpcScheduler implements RpcScheduler {
// Allow a number of RPCs to pass in order for the NN restart to succeed.
private int allowed = 10;
public TestRpcScheduler() {}
@Override
public int getPriorityLevel(Schedulable obj) {
return 0;
}
@Override
public boolean shouldBackOff(Schedulable obj) {
return --allowed < 0;
}
@Override
public void stop() {
}
}
}