MockQueueHierarchy.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework.parseResourceFromString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class MockQueueHierarchy {
private static final Logger LOG = LoggerFactory.getLogger(MockQueueHierarchy.class);
private final String ROOT = CapacitySchedulerConfiguration.ROOT;
private final ParentQueue rootQueue;
private String config;
private final CapacityScheduler cs;
private CapacitySchedulerConfiguration conf;
private final ResourceCalculator resourceCalculator;
private final Map<String, CSQueue> nameToCSQueues;
private final Map<String, Resource> partitionToResource;
MockQueueHierarchy(String config,
CapacityScheduler cs,
CapacitySchedulerConfiguration conf,
ResourceCalculator resourceCalculator,
Map<String, Resource> partitionToResource) {
this.config = config;
this.cs = cs;
this.conf = conf;
this.resourceCalculator = resourceCalculator;
this.nameToCSQueues = new HashMap<>();
this.partitionToResource = partitionToResource;
this.rootQueue = init();
}
public ParentQueue getRootQueue() {
return rootQueue;
}
Map<String, CSQueue> getNameToCSQueues() {
return nameToCSQueues;
}
/**
* Format is:
* <pre>
* root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
* -A(...);
* --A1(...);
* --A2(...);
* -B...
* </pre>
* ";" splits queues, and there should no empty lines, no extra spaces
*
* For each queue, it has configurations to specify capacities (to each
* partition), format is:
* <pre>
* -<queueName> (<labelName1>=[guaranteed max used pending], \
* <labelName2>=[guaranteed max used pending])
* {key1=value1,key2=value2}; // Additional configs
* </pre>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private ParentQueue init() {
String[] queueExprArray = config.split(";");
ParentQueue rootQueue = null;
for (int idx = 0; idx < queueExprArray.length; idx++) {
String q = queueExprArray[idx];
CSQueue queue;
// Initialize queue
if (isParent(queueExprArray, idx)) {
ParentQueue parentQueue = mock(ParentQueue.class);
queue = parentQueue;
List<CSQueue> children = new ArrayList<>();
when(parentQueue.getChildQueues()).thenReturn(children);
QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
when(policy.getConfigName()).thenReturn(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
} else {
LeafQueue leafQueue = mock(LeafQueue.class);
final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
if (a1.getPriority() != null
&& !a1.getPriority().equals(a2.getPriority())) {
return a1.getPriority().compareTo(a2.getPriority());
}
return a1.getApplicationId()
.compareTo(a2.getApplicationId());
}
});
when(leafQueue.getApplications()).thenReturn(apps);
when(leafQueue.getAllApplications()).thenReturn(apps);
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
+ CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
+ ".ordering-policy", "fifo");
if (opName.equals("fair")) {
so = Mockito.spy(new FairOrderingPolicy<>());
}
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
return apps.descendingIterator();
}
});
when(leafQueue.getOrderingPolicy()).thenReturn(so);
Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
new HashMap<>();
when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
ignorePartitionContainers);
queue = leafQueue;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(queue.getReadLock()).thenReturn(lock.readLock());
setupQueue(queue, q, queueExprArray, idx);
if (queue.getQueuePath().equals(ROOT)) {
rootQueue = (ParentQueue) queue;
}
}
return rootQueue;
}
private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
int idx) {
LOG.debug("*** Setup queue, source=" + q);
String queuePath = null;
int myLevel = getLevel(q);
if (0 == myLevel) {
// It's root
when(queue.getQueuePath()).thenReturn(ROOT);
queuePath = ROOT;
}
String queueName = getQueueName(q);
when(queue.getQueueName()).thenReturn(queueName);
// Setup parent queue, and add myself to parentQueue.children-list
ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
if (null != parentQueue) {
when(queue.getParent()).thenReturn(parentQueue);
parentQueue.getChildQueues().add(queue);
// Setup my path
queuePath = parentQueue.getQueuePath() + "." + queueName;
}
when(queue.getQueuePath()).thenReturn(queuePath);
QueueCapacities qc = new QueueCapacities(0 == myLevel);
ResourceUsage ru = new ResourceUsage();
QueueResourceQuotas qr = new QueueResourceQuotas();
when(queue.getQueueCapacities()).thenReturn(qc);
when(queue.getQueueResourceUsage()).thenReturn(ru);
when(queue.getQueueResourceQuotas()).thenReturn(qr);
LOG.debug("Setup queue, short name=" + queue.getQueueName() + " path="
+ queue.getQueuePath());
LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
.getQueuePath()));
// Setup other fields like used resource, guaranteed resource, etc.
String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
for (String s : capacitySettingStr.split(",")) {
String partitionName = s.substring(0, s.indexOf("="));
String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
// Add a small epsilon to capacities to avoid truncate when doing
// Resources.multiply
float epsilon = 1e-6f;
Resource toResourcePerPartition = partitionToResource.get(partitionName);
float absGuaranteed = Resources.divide(resourceCalculator, toResourcePerPartition,
parseResourceFromString(values[0].trim()), toResourcePerPartition)
+ epsilon;
float absMax = Resources.divide(resourceCalculator, toResourcePerPartition,
parseResourceFromString(values[1].trim()), toResourcePerPartition)
+ epsilon;
float absUsed = Resources.divide(resourceCalculator, toResourcePerPartition,
parseResourceFromString(values[2].trim()), toResourcePerPartition)
+ epsilon;
float used = Resources.divide(resourceCalculator, toResourcePerPartition,
parseResourceFromString(values[2].trim()),
parseResourceFromString(values[0].trim())) + epsilon;
Resource pending = parseResourceFromString(values[3].trim());
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
qc.setUsedCapacity(partitionName, used);
qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
qr.setEffectiveMaxResource(partitionName,
parseResourceFromString(values[1].trim()));
qr.setEffectiveMinResource(partitionName,
parseResourceFromString(values[0].trim()));
when(queue.getUsedCapacity()).thenReturn(used);
when(queue.getEffectiveCapacity(partitionName))
.thenReturn(parseResourceFromString(values[0].trim()));
when(queue.getEffectiveMaxCapacity(partitionName))
.thenReturn(parseResourceFromString(values[1].trim()));
ru.setPending(partitionName, pending);
// Setup reserved resource if it contained by input config
Resource reserved = Resources.none();
if(values.length == 5) {
reserved = parseResourceFromString(values[4].trim());
ru.setReserved(partitionName, reserved);
}
if (!isParent(queueExprArray, idx)) {
LeafQueue lq = (LeafQueue) queue;
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(false))).thenReturn(pending);
when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
isA(String.class), eq(true))).thenReturn(
Resources.subtract(pending, reserved));
}
ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ ",abs_used" + absUsed + ",pending_resource=" + pending
+ ", reserved_resource=" + reserved + "]");
}
// Setup preemption disabled
when(queue.getPreemptionDisabled()).thenReturn(
conf.getPreemptionDisabled(new QueuePath(queuePath), false));
// Setup other queue configurations
Map<String, String> otherConfigs = getOtherConfigurations(
queueExprArray[idx]);
if (otherConfigs.containsKey("priority")) {
when(queue.getPriority()).thenReturn(
Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
} else {
// set queue's priority to 0 by default
when(queue.getPriority()).thenReturn(Priority.newInstance(0));
}
// Setup disable preemption of queues
if (otherConfigs.containsKey("disable_preemption")) {
when(queue.getPreemptionDisabled()).thenReturn(
Boolean.valueOf(otherConfigs.get("disable_preemption")));
}
//TODO: Refactor this test class to use queue path internally like CS
// does from now on
nameToCSQueues.put(queuePath, queue);
nameToCSQueues.put(queueName, queue);
when(cs.getQueue(eq(queuePath))).thenReturn(queue);
when(cs.getQueue(eq(queueName))).thenReturn(queue);
when(cs.normalizeQueueName(eq(queuePath))).thenReturn(queuePath);
when(cs.normalizeQueueName(eq(queueName))).thenReturn(queuePath);
}
/**
* Get additional queue's configurations
* @param queueExpr queue expr
* @return maps of configs
*/
private Map<String, String> getOtherConfigurations(String queueExpr) {
if (queueExpr.contains("{")) {
int left = queueExpr.indexOf('{');
int right = queueExpr.indexOf('}');
if (right > left) {
Map<String, String> configs = new HashMap<>();
String subStr = queueExpr.substring(left + 1, right);
for (String kv : subStr.split(",")) {
if (kv.contains("=")) {
String key = kv.substring(0, kv.indexOf("="));
String value = kv.substring(kv.indexOf("=") + 1);
configs.put(key, value);
}
}
return configs;
}
}
return Collections.emptyMap();
}
private String getQueueName(String q) {
int idx = 0;
// find first != '-' char
while (idx < q.length() && q.charAt(idx) == '-') {
idx++;
}
if (idx == q.length()) {
throw new IllegalArgumentException("illegal input:" + q);
}
// name = after '-' and before '('
String name = q.substring(idx, q.indexOf('('));
if (name.isEmpty()) {
throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
}
if (name.contains(".")) {
throw new IllegalArgumentException("queue name shouldn't contain '.':"
+ name);
}
return name;
}
private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
idx--;
while (idx >= 0) {
int level = getLevel(queueExprArray[idx]);
if (level < myLevel) {
String parentQueueName = getQueueName(queueExprArray[idx]);
return (ParentQueue) nameToCSQueues.get(parentQueueName);
}
idx--;
}
return null;
}
/**
* Get if a queue is ParentQueue
*/
private boolean isParent(String[] queues, int idx) {
int myLevel = getLevel(queues[idx]);
idx++;
while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
idx++;
}
if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
// It's a LeafQueue
return false;
} else {
return true;
}
}
/**
* Level of a queue is how many "-" at beginning, root's level is 0
*/
private int getLevel(String q) {
int level = 0; // level = how many "-" at beginning
while (level < q.length() && q.charAt(level) == '-') {
level++;
}
return level;
}
}