TestNamenodeResolver.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* Test the basic {@link ActiveNamenodeResolver} functionality.
*/
public class TestNamenodeResolver {
private static StateStoreService stateStore;
private static ActiveNamenodeResolver namenodeResolver;
@BeforeAll
public static void create() throws Exception {
Configuration conf = getStateStoreConfiguration();
// Reduce expirations to 5 seconds
conf.setLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
TimeUnit.SECONDS.toMillis(5));
stateStore = newStateStore(conf);
assertNotNull(stateStore);
namenodeResolver = new MembershipNamenodeResolver(conf, stateStore);
namenodeResolver.setRouterId(ROUTERS[0]);
}
@AfterAll
public static void destroy() throws Exception {
stateStore.stop();
stateStore.close();
}
@BeforeEach
public void setup() throws IOException, InterruptedException {
// Wait for state store to connect
stateStore.loadDriver();
waitStateStore(stateStore, 10000);
// Clear NN registrations
boolean cleared = clearRecords(stateStore, MembershipState.class);
assertTrue(cleared);
}
@Test
public void testShuffleObserverNNs() throws Exception {
// Add an active entry to the store
NamenodeStatusReport activeReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(activeReport));
// Add a standby entry to the store
NamenodeStatusReport standbyReport = createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
assertTrue(namenodeResolver.registerNamenode(standbyReport));
// Load cache
stateStore.refreshCaches(true);
// Get namenodes from state store.
List<? extends FederationNamenodeContext> withoutObserver =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
// Get namenodes from cache.
withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(2, withoutObserver.size());
assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
// Add an observer entry to the store
NamenodeStatusReport observerReport1 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport1));
// Load cache
stateStore.refreshCaches(true);
// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
// Get namenodes from cache.
observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(3, observerList.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
// Add one new observer entry to the store
NamenodeStatusReport observerReport2 = createNamenodeReport(
NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
assertTrue(namenodeResolver.registerNamenode(observerReport2));
// Load cache
stateStore.refreshCaches(true);
// Get namenodes from state store.
List<? extends FederationNamenodeContext> observerList2 =
namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
// Get namenodes from cache.
observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(4, observerList2.size());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
// Test shuffler
List<? extends FederationNamenodeContext> observerList3;
boolean hit = false;
for (int i = 0; i < 1000; i++) {
observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
hit = true;
break;
}
}
assertTrue(hit);
}
@Test
public void testStateStoreDisconnected() throws Exception {
// Add an entry to the store
NamenodeStatusReport report = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(report));
// Close the data store driver
stateStore.closeDriver();
assertFalse(stateStore.isDriverReady());
// Flush the caches
stateStore.refreshCaches(true);
// Verify commands fail due to no cached data and no state store
// connectivity.
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]);
assertNull(nns);
verifyException(namenodeResolver, "registerNamenode",
StateStoreUnavailableException.class,
new Class[] {NamenodeStatusReport.class}, new Object[] {report});
}
/**
* Verify the first registration on the resolver.
*
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier within the nemeservice.
* @param resultsCount Number of results expected.
* @param state Expected state for the first one.
* @throws IOException If we cannot get the namenodes.
*/
private void verifyFirstRegistration(String nsId, String nnId,
int resultsCount, FederationNamenodeServiceState state)
throws IOException {
List<? extends FederationNamenodeContext> namenodes =
namenodeResolver.getNamenodesForNameserviceId(nsId, false);
if (resultsCount == 0) {
assertNull(namenodes);
} else {
assertEquals(resultsCount, namenodes.size());
if (namenodes.size() > 0) {
FederationNamenodeContext namenode = namenodes.get(0);
assertEquals(state, namenode.getState());
assertEquals(nnId, namenode.getNamenodeId());
}
}
}
@Test
public void testRegistrationExpired()
throws InterruptedException, IOException {
// Populate the state store with a single NN element
// 1) ns0:nn0 - Active
// Wait for the entry to expire without heartbeating
// Verify the NN entry is not accessible once expired.
NamenodeStatusReport report = createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
assertTrue(namenodeResolver.registerNamenode(report));
// Load cache
stateStore.refreshCaches(true);
// Verify
verifyFirstRegistration(
NAMESERVICES[0], NAMENODES[0], 1,
FederationNamenodeServiceState.ACTIVE);
// Wait past expiration (set in conf to 5 seconds)
Thread.sleep(6000);
// Reload cache
stateStore.refreshCaches(true);
// Verify entry is now expired and is no longer in the cache
verifyFirstRegistration(
NAMESERVICES[0], NAMENODES[0], 0,
FederationNamenodeServiceState.ACTIVE);
// Heartbeat again, updates dateModified
assertTrue(namenodeResolver.registerNamenode(report));
// Reload cache
stateStore.refreshCaches(true);
// Verify updated entry is marked active again and accessible to RPC server
verifyFirstRegistration(
NAMESERVICES[0], NAMENODES[0], 1,
FederationNamenodeServiceState.ACTIVE);
}
@Test
public void testRegistrationNamenodeSelection()
throws InterruptedException, IOException {
// 1) ns0:nn0 - Active
// 2) ns0:nn1 - Standby (newest)
// Verify the selected entry is the active entry
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
Thread.sleep(100);
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
stateStore.refreshCaches(true);
verifyFirstRegistration(
NAMESERVICES[0], NAMENODES[0], 2,
FederationNamenodeServiceState.ACTIVE);
// 1) ns0:nn0 - Expired (stale)
// 2) ns0:nn1 - Standby (newest)
// Verify the selected entry is the standby entry as the active entry is
// stale
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
// Expire active registration
Thread.sleep(6000);
// Refresh standby registration
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
// Verify that standby is selected (active is now expired)
stateStore.refreshCaches(true);
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1,
FederationNamenodeServiceState.STANDBY);
// 1) ns0:nn0 - Active
// 2) ns0:nn1 - Unavailable (newest)
// Verify the selected entry is the active entry
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE)));
Thread.sleep(100);
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], null)));
stateStore.refreshCaches(true);
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2,
FederationNamenodeServiceState.ACTIVE);
// 1) ns0:nn0 - Unavailable (newest)
// 2) ns0:nn1 - Standby
// Verify the selected entry is the standby entry
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
Thread.sleep(1000);
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], null)));
stateStore.refreshCaches(true);
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2,
FederationNamenodeServiceState.STANDBY);
// 1) ns0:nn0 - Active (oldest)
// 2) ns0:nn1 - Standby
// 3) ns0:nn2 - Active (newest)
// Verify the selected entry is the newest active entry
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null)));
Thread.sleep(100);
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
Thread.sleep(100);
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE)));
stateStore.refreshCaches(true);
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3,
FederationNamenodeServiceState.ACTIVE);
// 1) ns0:nn0 - Standby (oldest)
// 2) ns0:nn1 - Standby (newest)
// 3) ns0:nn2 - Standby
// Verify the selected entry is the newest standby entry
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY)));
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY)));
Thread.sleep(1500);
assertTrue(namenodeResolver.registerNamenode(createNamenodeReport(
NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY)));
stateStore.refreshCaches(true);
verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3,
FederationNamenodeServiceState.STANDBY);
}
@Test
public void testCacheUpdateOnNamenodeStateUpdate() throws IOException {
// Create a namenode initially registering in standby state.
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(NAMESERVICES[0], NAMENODES[0],
HAServiceState.STANDBY)));
stateStore.refreshCaches(true);
// Check whether the namenpde state is reported correct as standby.
FederationNamenodeContext namenode = namenodeResolver
.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
assertEquals(FederationNamenodeServiceState.STANDBY, namenode.getState());
String rpcAddr = namenode.getRpcAddress();
InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr);
// If the namenode state changes, and it serves request,
// RouterRpcClient calls updateActiveNamenode to update the state to active,
// Check whether correct updated state is returned post update.
namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
FederationNamenodeContext namenode1 = namenodeResolver
.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
assertEquals(FederationNamenodeServiceState.ACTIVE, namenode1.getState(),
"The namenode state should be ACTIVE post update.");
}
@Test
public void testCacheUpdateOnNamenodeStateUpdateWithIp()
throws IOException {
final String rpcAddress = "127.0.0.1:10000";
assertTrue(namenodeResolver.registerNamenode(
createNamenodeReport(NAMESERVICES[0], NAMENODES[0], rpcAddress,
HAServiceState.STANDBY)));
stateStore.refreshCaches(true);
InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress);
namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr);
FederationNamenodeContext namenode = namenodeResolver
.getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0);
assertEquals(FederationNamenodeServiceState.ACTIVE, namenode.getState(),
"The namenode state should be ACTIVE post update.");
}
/**
* Creates InetSocketAddress from the given RPC address.
* @param rpcAddr RPC address (host:port).
* @return InetSocketAddress corresponding to the specified RPC address.
*/
private static InetSocketAddress getInetSocketAddress(String rpcAddr) {
String[] rpcAddrArr = rpcAddr.split(":");
int port = Integer.parseInt(rpcAddrArr[1]);
String hostname = rpcAddrArr[0];
return new InetSocketAddress(hostname, port);
}
}