TestObserverWithRouter.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.api.TestInfo;
public class TestObserverWithRouter {
private static final int NUM_NAMESERVICES = 2;
private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup";
private MiniRouterDFSCluster cluster;
private RouterContext routerContext;
private FileSystem fileSystem;
private static final String ROUTER_NS_ID = "router-service";
private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
"dfs.client.failover.observer.auto-msync-period";
@BeforeEach
void init(TestInfo info) throws Exception {
if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) {
return;
}
startUpCluster(2, null);
}
@AfterEach
public void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
routerContext = null;
if (fileSystem != null) {
fileSystem.close();
fileSystem = null;
}
}
public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
int numberOfNamenode = 2 + numberOfObserver;
Configuration conf = new Configuration(false);
setConfDefaults(conf);
if (confOverrides != null) {
confOverrides
.iterator()
.forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
}
cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, numberOfNamenode);
cluster.addNamenodeOverrides(conf);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Making one Namenode active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
for (int i = 2; i < numberOfNamenode; i++) {
cluster.switchToObserver(ns, NAMENODES[i]);
}
}
}
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.build();
cluster.addRouterOverrides(conf);
cluster.addRouterOverrides(routerConf);
// Start routers with only an RPC service
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// Setup the mount table
cluster.installMockLocations();
cluster.waitActiveNamespaces();
routerContext = cluster.getRandomRouter();
}
private void setConfDefaults(Configuration conf) {
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
}
public enum ConfigSetting {
USE_NAMENODE_PROXY_FLAG,
USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER,
USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER
}
private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) {
Configuration conf = new Configuration();
switch (configSetting) {
case USE_NAMENODE_PROXY_FLAG:
conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true);
break;
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
"." +
routerContext.getRouter()
.getRpcServerAddress()
.getHostName(), RouterObserverReadProxyProvider.class.getName());
break;
case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
// HA configs
conf.set(DFS_NAMESERVICES, ROUTER_NS_ID);
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + ROUTER_NS_ID, "router1");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ "." + ROUTER_NS_ID + ".router1",
routerContext.getFileSystemURI().toString());
DistributedFileSystem.setDefaultUri(conf, "hdfs://" + ROUTER_NS_ID);
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ROUTER_NS_ID,
RouterObserverReadConfiguredFailoverProxyProvider.class.getName());
break;
default:
Assertions.fail("Unknown config setting: " + configSetting);
}
return conf;
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testObserverRead(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
internalTestObserverRead();
}
/**
* Tests that without adding config to use ObserverProxyProvider, the client shouldn't
* have reads served by Observers.
* Fixes regression in HDFS-13522.
*/
@Test
public void testReadWithoutObserverClientConfigurations() throws Exception {
fileSystem = routerContext.getFileSystem();
assertThrows(AssertionError.class, this::internalTestObserverRead);
}
public void internalTestObserverRead()
throws Exception {
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/testFile");
// Send create call
fileSystem.create(path).close();
// Send read request
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create and complete calls should be sent to active
assertEquals(2, rpcCountForActive, "Two calls should be sent to active");
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getBlockLocations should be sent to observer
assertEquals(1, rpcCountForObserver, "One call should be sent to observer");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting configSetting)
throws Exception {
Configuration confOverrides = new Configuration(false);
confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0);
startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
// Send read request to observer. The router will msync to the active namenode.
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create, complete and getBlockLocations calls should be sent to active
assertEquals(3, rpcCountForActive, "Three calls should be sent to active");
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals(0, rpcCountForObserver, "No call should be sent to observer");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting configSetting)
throws Exception {
// Disable observer reads using per-nameservice override
Configuration confOverrides = new Configuration(false);
confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0");
startUpCluster(2, confOverrides);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile");
fileSystem.create(path).close();
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create, complete and read calls should be sent to active
assertEquals(3, rpcCountForActive, "Three calls should be sent to active");
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals(0, rpcCountForObserver, "Zero calls should be sent to observer");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
// Stop observer NN
int nnIndex = stopObserver(1);
assertNotEquals(3, nnIndex, "No observer found");
nnIndex = stopObserver(1);
assertNotEquals(4, nnIndex, "No observer found");
// Send read request
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create, complete and getBlockLocation calls should be sent to active
assertEquals(3, rpcCountForActive, "Three calls should be sent to active");
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals(0, rpcCountForObserver, "No call should send to observer");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testMultipleObserver(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
// Stop one observer NN
stopObserver(1);
// Send read request
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
long expectedActiveRpc = 2;
long expectedObserverRpc = 1;
// Create and complete calls should be sent to active
assertEquals(expectedActiveRpc, rpcCountForActive, "Two calls should be sent to active");
long rpcCountForObserver = routerContext.getRouter()
.getRpcServer().getRPCMetrics().getObserverProxyOps();
// getBlockLocation call should send to observer
assertEquals(expectedObserverRpc, rpcCountForObserver,
"Read should be success with another observer");
// Stop one observer NN
stopObserver(1);
// Send read request
fileSystem.open(path).close();
rpcCountForActive = routerContext.getRouter()
.getRpcServer().getRPCMetrics().getActiveProxyOps();
// getBlockLocation call should be sent to active
expectedActiveRpc += 1;
assertEquals(expectedActiveRpc, rpcCountForActive, "One call should be sent to active");
expectedObserverRpc += 0;
rpcCountForObserver = routerContext.getRouter()
.getRpcServer().getRPCMetrics().getObserverProxyOps();
assertEquals(expectedObserverRpc, rpcCountForObserver, "No call should send to observer");
}
private int stopObserver(int num) {
int nnIndex;
for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
if (nameNode != null && nameNode.isObserverState()) {
cluster.getCluster().shutdownNameNode(nnIndex);
num--;
if (num == 0) {
break;
}
}
}
return nnIndex;
}
// test router observer with multiple to know which observer NN received
// requests
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMultipleObserverRouter() throws Exception {
StateStoreDFSCluster innerCluster;
MembershipNamenodeResolver resolver;
String ns0;
String ns1;
//create 4NN, One Active One Standby and Two Observers
innerCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5),
TimeUnit.SECONDS.toMillis(5));
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().rpc()
.enableLocalHeartbeat(true).heartbeat().build();
StringBuilder sb = new StringBuilder();
ns0 = innerCluster.getNameservices().get(0);
MiniRouterDFSCluster.NamenodeContext context =
innerCluster.getNamenodes(ns0).get(1);
routerConf.set(DFS_NAMESERVICE_ID, ns0);
routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId());
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
ns1 = innerCluster.getNameservices().get(1);
for (MiniRouterDFSCluster.NamenodeContext ctx : innerCluster.getNamenodes(ns1)) {
String suffix = ctx.getConfSuffix();
if (sb.length() != 0) {
sb.append(",");
}
sb.append(suffix);
}
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
innerCluster.addNamenodeOverrides(routerConf);
innerCluster.addRouterOverrides(routerConf);
innerCluster.startCluster();
if (innerCluster.isHighAvailability()) {
for (String ns : innerCluster.getNameservices()) {
innerCluster.switchToActive(ns, NAMENODES[0]);
innerCluster.switchToStandby(ns, NAMENODES[1]);
for (int i = 2; i < 4; i++) {
innerCluster.switchToObserver(ns, NAMENODES[i]);
}
}
}
innerCluster.startRouters();
innerCluster.waitClusterUp();
routerContext = innerCluster.getRandomRouter();
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
.getNamenodeResolver();
resolver.loadCache(true);
List<? extends FederationNamenodeContext> namespaceInfo0 =
resolver.getNamenodesForNameserviceId(ns0, true);
List<? extends FederationNamenodeContext> namespaceInfo1 =
resolver.getNamenodesForNameserviceId(ns1, true);
assertEquals(namespaceInfo0.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
assertEquals(namespaceInfo0.get(1).getState(),
FederationNamenodeServiceState.OBSERVER);
assertNotEquals(namespaceInfo0.get(0).getNamenodeId(),
namespaceInfo0.get(1).getNamenodeId());
assertEquals(namespaceInfo1.get(0).getState(),
FederationNamenodeServiceState.OBSERVER);
innerCluster.shutdown();
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testUnavailableObserverNN(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
stopObserver(2);
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
// Send read request.
fileSystem.open(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create, complete and getBlockLocations
// calls should be sent to active.
assertEquals(3, rpcCountForActive, "Three calls should be send to active");
boolean hasUnavailable = false;
for(String ns : cluster.getNameservices()) {
List<? extends FederationNamenodeContext> nns = routerContext.getRouter()
.getNamenodeResolver().getNamenodesForNameserviceId(ns, false);
for(FederationNamenodeContext nn : nns) {
if(FederationNamenodeServiceState.UNAVAILABLE == nn.getState()) {
hasUnavailable = true;
}
}
}
// After attempting to communicate with unavailable observer namenode,
// its state is updated to unavailable.
assertTrue(hasUnavailable, "There must be unavailable namenodes");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testRouterMsync(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile");
// Send Create call to active
fileSystem.create(path).close();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Create and complete calls should be sent to active
assertEquals(2, rpcCountForActive, "Two calls should be sent to active");
// Send msync
fileSystem.msync();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// 2 msync calls should be sent. One to each active namenode in the two namespaces.
assertEquals(4, rpcCountForActive, "Four calls should be sent to active");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testSingleRead(ConfigSetting configSetting) throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// getListingCall sent to active.
assertEquals(1, rpcCountForActive, "Only one call should be sent to active");
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals(0, rpcCountForObserver, "No calls should be sent to observer");
}
@Test
public void testSingleReadUsingObserverReadProxyProvider() throws Exception {
fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider();
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
// Send read request
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// Two msync calls to the active namenodes.
assertEquals(2, rpcCountForActive, "Two calls should be sent to active");
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// getList call should be sent to observer
assertEquals(1, rpcCountForObserver, "One call should be sent to observer");
}
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testClientReceiveResponseState() {
ClientGSIContext clientGSIContext = new ClientGSIContext();
Map<String, Long> mockMapping = new HashMap<>();
mockMapping.put("ns0", 10L);
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
mockMapping.forEach(builder::putNamespaceStateIds);
RpcHeaderProtos.RpcResponseHeaderProto header = RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
.setRouterFederatedState(builder.build().toByteString())
.build();
clientGSIContext.receiveResponseState(header);
Map<String, Long> mockLowerMapping = new HashMap<>();
mockLowerMapping.put("ns0", 8L);
builder = RouterFederatedStateProto.newBuilder();
mockLowerMapping.forEach(builder::putNamespaceStateIds);
header = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder()
.setRouterFederatedState(builder.build().toByteString())
.setCallId(2)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
.build();
clientGSIContext.receiveResponseState(header);
Map<String, Long> latestFederateState = ClientGSIContext.getRouterFederatedStateMap(
clientGSIContext.getRouterFederatedState());
Assertions.assertEquals(1, latestFederateState.size());
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
}
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testRouterResponseHeaderState() {
// This conf makes ns1 that is not eligible for observer reads.
Configuration conf = new Configuration();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1");
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
routerStateIdContext.getNamespaceIdMap();
namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
// Only ns0 will be in latestFederateState
Assertions.assertEquals(1, latestFederateState.size());
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
}
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testRouterResponseHeaderStateMaxSizeLimit() {
Configuration conf = new Configuration();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 1);
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);
ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
routerStateIdContext.getNamespaceIdMap();
namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE));
RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);
Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
// Validate that ns0 is still part of the header
Assertions.assertEquals(1, latestFederateState.size());
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20));
// Rebuild header
responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);
latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
// Validate that ns0 is still part of the header
Assertions.assertEquals(0, latestFederateState.size());
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception {
Path rootPath = new Path("/");
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
RouterStateIdContext routerStateIdContext = routerContext
.getRouterRpcServer()
.getRouterStateIdContext();
for (int i = 0; i < 10; i++) {
fileSystem.create(new Path(rootPath, "file" + i)).close();
}
// Get object storing state of the namespace in the shared RouterStateIdContext
LongAccumulator namespaceStateId = routerStateIdContext.getNamespaceStateId("ns0");
assertEquals(21, namespaceStateId.get(), "Router's shared should have progressed.");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) throws Exception {
Path rootPath = new Path("/");
long cleanupPeriodMs = 1000;
Configuration conf = new Configuration(false);
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs);
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10);
startUpCluster(1, conf);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
.getRouterStateIdContext();
// First read goes to active and creates connection pool for this user to active
fileSystem.listStatus(rootPath);
// Second read goes to observer and creates connection pool for this user to observer
fileSystem.listStatus(rootPath);
// Get object storing state of the namespace in the shared RouterStateIdContext
LongAccumulator namespaceStateId1 = routerStateIdContext.getNamespaceStateId("ns0");
// Wait for connection pools to expire and be cleaned up.
Thread.sleep(cleanupPeriodMs * 2);
// Third read goes to observer.
// New connection pool to observer is created since existing one expired.
fileSystem.listStatus(rootPath);
fileSystem.close();
// Get object storing state of the namespace in the shared RouterStateIdContext
LongAccumulator namespaceStateId2 = routerStateIdContext.getNamespaceStateId("ns0");
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
long rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// First list status goes to active
assertEquals(1, rpcCountForActive, "One call should be sent to active");
// Last two listStatuses go to observer.
assertEquals(2, rpcCountForObserver, "Two calls should be sent to observer");
Assertions.assertSame(namespaceStateId1, namespaceStateId2,
"The same object should be used in the shared RouterStateIdContext");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Exception {
Path rootPath = new Path("/");
long recordExpiry = TimeUnit.SECONDS.toMillis(1);
Configuration confOverride = new Configuration(false);
confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry);
startUpCluster(1, confOverride);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer()
.getRouterStateIdContext();
fileSystem.listStatus(rootPath);
List<String> namespace1 = routerStateIdContext.getNamespaces();
fileSystem.close();
MockResolver mockResolver = (MockResolver) routerContext.getRouter().getNamenodeResolver();
mockResolver.cleanRegistrations();
mockResolver.setDisableRegistration(true);
Thread.sleep(recordExpiry * 2);
List<String> namespace2 = routerStateIdContext.getNamespaces();
assertEquals(1, namespace1.size());
assertEquals("ns0", namespace1.get(0));
assertTrue(namespace2.isEmpty());
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSetting)
throws Exception {
Path rootPath = new Path("/");
Configuration confOverride = new Configuration(false);
confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms");
confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
startUpCluster(1, confOverride);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
fileSystem.listStatus(rootPath);
int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
DFSClient activeClient = cluster.getNamenodes("ns0")
.stream()
.filter(nnContext -> nnContext.getNamenode().isActiveState())
.findFirst().orElseThrow(() -> new IllegalStateException("No active namenode."))
.getClient();
for (int i = 0; i < 10; i++) {
activeClient.mkdirs("/dir" + i, null, false);
}
activeClient.close();
// Wait long enough for state in router to be considered stale.
GenericTestUtils.waitFor(
() -> !routerContext
.getRouterRpcClient()
.isNamespaceStateIdFresh("ns0"),
100,
10000,
"Timeout: Namespace state was never considered stale.");
FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath);
assertEquals(initialLengthOfRootListing + 10, rootFolderAfterMkdir.length,
"List-status should show newly created directories.");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
String configKeySuffix =
configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 0);
fileSystem = routerContext.getFileSystem(clientConfiguration);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
// Send read requests
int numListings = 15;
for (int i = 0; i < numListings; i++) {
fileSystem.listFiles(path, false);
}
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
switch (configSetting) {
case USE_NAMENODE_PROXY_FLAG:
// First read goes to active.
assertEquals(1, rpcCountForActive, "Calls sent to the active");
// The rest of the reads are sent to the observer.
assertEquals(numListings - 1, rpcCountForObserver, "Reads sent to observer");
break;
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
// An msync is sent to each active namenode for each read.
// Total msyncs will be (numListings * num_of_nameservices).
assertEquals(NUM_NAMESERVICES * numListings, rpcCountForActive,
"Msyncs sent to the active namenodes");
// All reads should be sent of the observer.
assertEquals(numListings, rpcCountForObserver, "Reads sent to observer");
break;
default:
Assertions.fail("Unknown config setting: " + configSetting);
}
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception {
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
String configKeySuffix =
configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000);
fileSystem = routerContext.getFileSystem(clientConfiguration);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
fileSystem.listFiles(path, false);
fileSystem.listFiles(path, false);
Thread.sleep(5000);
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
switch (configSetting) {
case USE_NAMENODE_PROXY_FLAG:
// First read goes to active.
assertEquals(1, rpcCountForActive, "Calls sent to the active");
// The rest of the reads are sent to the observer.
assertEquals(2, rpcCountForObserver, "Reads sent to observer");
break;
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
// 4 msyncs expected. 2 for the first read, and 2 for the third read
// after the auto-msync period has elapsed during the sleep.
assertEquals(4, rpcCountForActive, "Msyncs sent to the active namenodes");
// All three reads should be sent of the observer.
assertEquals(3, rpcCountForObserver, "Reads sent to observer");
break;
default:
Assertions.fail("Unknown config setting: " + configSetting);
}
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception {
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
String configKeySuffix =
configSetting == ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ?
ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName();
clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000);
fileSystem = routerContext.getFileSystem(clientConfiguration);
List<? extends FederationNamenodeContext> namenodes = routerContext
.getRouter().getNamenodeResolver()
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
assertEquals(namenodes.get(0).getState(), FederationNamenodeServiceState.OBSERVER,
"First namenode should be observer");
Path path = new Path("/");
long rpcCountForActive;
long rpcCountForObserver;
fileSystem.listFiles(path, false);
Thread.sleep(5000);
fileSystem.mkdirs(new Path(path, "mkdirLocation"));
fileSystem.listFiles(path, false);
fileSystem.close();
rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
rpcCountForObserver = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
switch (configSetting) {
case USE_NAMENODE_PROXY_FLAG:
// First listing and mkdir go to the active.
assertEquals(2, rpcCountForActive, "Calls sent to the active namenodes");
// Second listing goes to the observer.
assertEquals(1, rpcCountForObserver, "Read sent to observer");
break;
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
// 5 calls to the active namenodes expected. 4 msync and a mkdir.
// Each of the 2 reads results in an msync to 2 nameservices.
// The mkdir also goes to the active.
assertEquals(5, rpcCountForActive, "Calls sent to the active namenodes");
// Both reads should be sent of the observer.
assertEquals(2, rpcCountForObserver, "Reads sent to observer");
break;
default:
Assertions.fail("Unknown config setting: " + configSetting);
}
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMsyncOnlyToNamespaceWithObserver(ConfigSetting configSetting) throws Exception {
Configuration confOverride = new Configuration(false);
String namespaceWithObserverReadsDisabled = "ns0";
// Disable observer reads for ns0
confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES,
namespaceWithObserverReadsDisabled);
startUpCluster(1, confOverride);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
// Send msync request
fileSystem.msync();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// There should only be one call to the namespace that has an observer.
assertEquals(1, rpcCountForActive, "Only one call to the namespace with an observer");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting)
throws Exception {
Configuration confOverride = new Configuration(false);
// Disable observer reads for all namespaces.
confOverride.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, false);
startUpCluster(1, confOverride);
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
// Send msync request.
fileSystem.msync();
long rpcCountForActive = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getActiveProxyOps();
// There should no calls to any namespace.
assertEquals(0, rpcCountForActive, "No calls to any namespace");
}
@EnumSource(ConfigSetting.class)
@ParameterizedTest
public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting configSetting)
throws Exception {
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
Path path = new Path("/testFile1");
// Send Create call to active
fileSystem.create(path).close();
// Send read request
fileSystem.open(path).close();
long observerCount1 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
// Restart active namenodes and disable sending state id.
restartActiveWithStateIDContextDisabled();
Configuration conf = getConfToEnableObserverReads(configSetting);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
FileSystem fileSystem2 = routerContext.getFileSystem(conf);
fileSystem2.msync();
fileSystem2.open(path).close();
long observerCount2 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertEquals(observerCount1, observerCount2, "There should no extra calls to the observer");
fileSystem.open(path).close();
long observerCount3 = routerContext.getRouter().getRpcServer()
.getRPCMetrics().getObserverProxyOps();
assertTrue(observerCount3 > observerCount2, "Old filesystem will send calls to observer");
}
void restartActiveWithStateIDContextDisabled() throws Exception {
for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
if (nameNode != null && nameNode.isActiveState()) {
Configuration conf = new Configuration();
setConfDefaults(conf);
cluster.getCluster().getConfiguration(nnIndex)
.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false);
cluster.getCluster().restartNameNode(nnIndex, true);
cluster.getCluster().getNameNode(nnIndex).isActiveState();
}
}
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
}
}
}