TestSubClusterCleaner.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.cleaner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class TestSubClusterCleaner {
////////////////////////////////
// Router Constants
////////////////////////////////
private Configuration conf;
private MemoryFederationStateStore stateStore;
private FederationStateStoreFacade facade;
private SubClusterCleaner cleaner;
private final static int NUM_SUBCLUSTERS = 4;
private final static long EXPIRATION_TIME = Time.now() - 5000;
@BeforeEach
public void setup() throws YarnException {
conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.ROUTER_SUBCLUSTER_EXPIRATION_TIME, 1000);
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade = FederationStateStoreFacade.getInstance(conf);
facade.reinitialize(stateStore, conf);
cleaner = new SubClusterCleaner(conf);
for (int i = 0; i < NUM_SUBCLUSTERS; i++){
// Create sub cluster id and info
SubClusterId subClusterId = SubClusterId.newInstance("SC-" + i);
SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
"127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3", "127.0.0.1:4",
SubClusterState.SC_RUNNING, Time.now(), "");
// Register the subCluster
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
}
@Test
public void testSubClustersWithOutHeartBeat()
throws InterruptedException, TimeoutException, YarnException {
// We set up such a unit test, We set the status of all subClusters to RUNNING,
// and Manually set subCluster heartbeat expiration.
// At this time, the size of the Active SubCluster is 0.
Map<SubClusterId, SubClusterInfo> subClustersMap = facade.getSubClusters(false);
// Step1. Manually set subCluster heartbeat expiration.
// subCluster has no heartbeat, and all subClusters will expire.
subClustersMap.keySet().forEach(subClusterId ->
stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME));
// Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
cleaner.run();
// Step3. All clusters have expired,
// so the current Federation has no active subClusters.
int count = facade.getActiveSubClustersCount();
assertEquals(0, count);
// Step4. Check Active SubCluster Status.
// We want all subClusters to be SC_LOST.
subClustersMap.values().forEach(subClusterInfo -> {
SubClusterState subClusterState = subClusterInfo.getState();
assertEquals(SubClusterState.SC_LOST, subClusterState);
});
}
@Test
public void testSubClustersPartWithHeartBeat() throws YarnException, InterruptedException {
// Step1. Manually set subCluster heartbeat expiration.
for (int i = 0; i < NUM_SUBCLUSTERS; i++) {
// Create subCluster id and info.
expiredSubcluster("SC-" + i);
}
// Step2. Run the Cleaner to change the status of the expired SubCluster to SC_LOST.
cleaner.run();
// Step3. Let SC-0, SC-1 resume heartbeat.
resumeSubClusterHeartbeat("SC-0");
resumeSubClusterHeartbeat("SC-1");
// Step4. At this point we should have 2 subClusters that are surviving clusters.
int count = facade.getActiveSubClustersCount();
assertEquals(2, count);
// Step5. The result we expect is that SC-0 and SC-1 are in the RUNNING state,
// and SC-2 and SC-3 are in the SC_LOST state.
checkSubClusterState("SC-0", SubClusterState.SC_RUNNING);
checkSubClusterState("SC-1", SubClusterState.SC_RUNNING);
checkSubClusterState("SC-2", SubClusterState.SC_LOST);
checkSubClusterState("SC-3", SubClusterState.SC_LOST);
}
private void resumeSubClusterHeartbeat(String pSubClusterId)
throws YarnException {
SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest.newInstance(
subClusterId, Time.now(), SubClusterState.SC_RUNNING, "test");
SubClusterHeartbeatResponse response = stateStore.subClusterHeartbeat(request);
assertNotNull(response);
}
private void expiredSubcluster(String pSubClusterId) {
SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
stateStore.setExpiredHeartbeat(subClusterId, EXPIRATION_TIME);
}
private void checkSubClusterState(String pSubClusterId, SubClusterState expectState)
throws YarnException {
Map<SubClusterId, SubClusterInfo> subClustersMap = facade.getSubClusters(false);
SubClusterId subClusterId = SubClusterId.newInstance(pSubClusterId);
SubClusterInfo subClusterInfo = subClustersMap.get(subClusterId);
if (subClusterInfo == null) {
throw new YarnException("subClusterId=" + pSubClusterId + " does not exist.");
}
assertEquals(expectState, subClusterInfo.getState());
}
}