CapacitySchedulerQueueCalculationTestBase.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.GB;
public class CapacitySchedulerQueueCalculationTestBase {
protected static final QueuePath ROOT = new QueuePath("root");
protected static final QueuePath A = new QueuePath("root.a");
protected static final QueuePath A1 = new QueuePath("root.a.a1");
protected static final QueuePath A11 = new QueuePath("root.a.a1.a11");
protected static final QueuePath A12 = new QueuePath("root.a.a1.a12");
protected static final QueuePath A2 = new QueuePath("root.a.a2");
protected static final QueuePath B = new QueuePath("root.b");
protected static final QueuePath B1 = new QueuePath("root.b.b1");
protected static final QueuePath C = new QueuePath("root.c");
private static final String CAPACITY_VECTOR_TEMPLATE = "[memory=%s, vcores=%s]";
protected ResourceCalculator resourceCalculator;
protected MockRM mockRM;
protected CapacityScheduler cs;
protected CapacitySchedulerConfiguration csConf;
protected NullRMNodeLabelsManager mgr;
@BeforeEach
public void setUp() throws Exception {
csConf = new CapacitySchedulerConfiguration();
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
csConf.setQueues(ROOT, new String[]{"a", "b"});
csConf.setCapacity(A, 50f);
csConf.setCapacity(B, 50f);
csConf.setQueues(A, new String[]{"a1", "a2"});
csConf.setCapacity(A1, 100f);
csConf.setQueues(A1, new String[]{"a11", "a12"});
csConf.setCapacity(A11, 50f);
csConf.setCapacity(A12, 50f);
mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
mockRM = new MockRM(csConf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
// Policy for new auto created queue's auto deletion when expired
mockRM.start();
cs.start();
mockRM.registerNode("h1:1234", 10 * GB); // label = x
resourceCalculator = cs.getResourceCalculator();
}
protected QueueCapacityUpdateContext update(
QueueAssertionBuilder assertions, Resource clusterResource)
throws IOException {
return update(assertions, clusterResource, clusterResource);
}
protected QueueCapacityUpdateContext update(
QueueAssertionBuilder assertions, Resource clusterResource, Resource emptyLabelResource)
throws IOException {
cs.reinitialize(csConf, mockRM.getRMContext());
CapacitySchedulerQueueCapacityHandler queueController =
new CapacitySchedulerQueueCapacityHandler(mgr, csConf);
mgr.setResourceForLabel(CommonNodeLabelsManager.NO_LABEL, emptyLabelResource);
queueController.updateRoot(cs.getQueue("root"), clusterResource);
QueueCapacityUpdateContext updateContext =
queueController.updateChildren(clusterResource, cs.getQueue("root"));
assertions.finishAssertion();
return updateContext;
}
protected QueueAssertionBuilder createAssertionBuilder() {
return new QueueAssertionBuilder(cs);
}
protected static String createCapacityVector(Object memory, Object vcores) {
return String.format(CAPACITY_VECTOR_TEMPLATE, memory, vcores);
}
protected static String absolute(double value) {
return String.valueOf((long) value);
}
protected static String weight(float value) {
return value + "w";
}
protected static String percentage(float value) {
return value + "%";
}
protected static Resource createResource(double memory, double vcores) {
return Resource.newInstance((int) memory, (int) vcores);
}
}