TestLoadBasedRouterPolicy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
* is properly considered for allocation.
*/
public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
@BeforeEach
public void setUp() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
long now = Time.now();
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
weight = 1.0f;
}
// 5% chance we omit one of the weights
if (i <= 5 || getRand().nextFloat() > 0.05f) {
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
// initialize policy with context
setupContext();
}
public String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
// plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}
String clusterMetrics =
"{\"clusterMetrics\":{\"appsSubmitted\":65," + "\"appsCompleted\":64,"
+ "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+ "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + mem + ","
+ "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+ "\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+ "\"containersAllocated\":0,\"containersReserved\":0,"
+ "\"containersPending\":0,\"totalMB\":28364800,"
+ "\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+ "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+ "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
return clusterMetrics;
}
@Test
public void testLoadIsRespected() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext(), null);
// check the "planted" best cluster is chosen
assertEquals("sc05", chosen.getId());
}
@Test
public void testIfNoSubclustersWithWeightOne() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// update subcluster with weight 0
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", 0));
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), null, null, null, null, -1, SubClusterState.SC_RUNNING, -1,
generateClusterMetricsInfo(0));
getActiveSubclusters().clear();
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
routerWeights.put(sc, 0.0f);
amrmWeights.put(sc, 0.0f);
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
ConfigurableFederationPolicy policy = getPolicy();
FederationPoliciesTestUtil.initializePolicyContext(policy,
getPolicyInfo(), getActiveSubclusters());
LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.",
() -> ((FederationRouterPolicy) policy).
getHomeSubcluster(getApplicationSubmissionContext(), null));
}
@Test
public void testUpdateReservation() throws YarnException {
long now = Time.now();
ReservationSubmissionRequest resReq = getReservationSubmissionRequest();
when(resReq.getQueue()).thenReturn("queue1");
when(resReq.getReservationId()).thenReturn(ReservationId.newInstance(now, 1));
// first we invoke a reservation placement
FederationRouterPolicy routerPolicy = (FederationRouterPolicy) getPolicy();
SubClusterId chosen = routerPolicy.getReservationHomeSubcluster(resReq);
// add this to the store
FederationStateStoreFacade facade =
getFederationPolicyContext().getFederationStateStoreFacade();
ReservationHomeSubCluster subCluster =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen);
facade.addReservationHomeSubCluster(subCluster);
// get all activeSubClusters
Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
// Update ReservationHomeSubCluster
// Cannot be randomly selected, SubCluster with Weight >= 1.0 needs to be selected
WeightedPolicyInfo weightedPolicyInfo = this.getPolicyInfo();
Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
List<SubClusterId> subClusterIds = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : routerPolicyWeights.entrySet()) {
SubClusterIdInfo subClusterIdInfo = entry.getKey();
Float subClusterWeight = entry.getValue();
if (subClusterWeight >= 1.0) {
subClusterIds.add(subClusterIdInfo.toId());
}
}
SubClusterId chosen2 = subClusterIds.get(this.getRand().nextInt(subClusterIds.size()));
ReservationHomeSubCluster subCluster2 =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen2);
facade.updateReservationHomeSubCluster(subCluster2);
// route an application that uses this app
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(now, 1), "app1", "queue1", Priority.newInstance(1),
null, false, false, 1, null, null, false);
applicationSubmissionContext.setReservationID(resReq.getReservationId());
SubClusterId chosen3 = routerPolicy.getHomeSubcluster(
applicationSubmissionContext, new ArrayList<>());
assertEquals(chosen2, chosen3);
}
}