PriorityUtilizationQueueOrderingPolicy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
/**
* For two queues with the same priority:
* - The queue with less relative used-capacity goes first - today���s behavior.
* - The default priority for all queues is 0 and equal. So, we get today���s
* behaviour at every level - the queue with the lowest used-capacity
* percentage gets the resources
*
* For two queues with different priorities:
* - Both the queues are under their guaranteed capacities: The queue with
* the higher priority gets resources
* - Both the queues are over or meeting their guaranteed capacities:
* The queue with the higher priority gets resources
* - One of the queues is over or meeting their guaranteed capacities and the
* other is under: The queue that is under its capacity guarantee gets the
* resources.
*/
public class PriorityUtilizationQueueOrderingPolicy
implements QueueOrderingPolicy {
private List<CSQueue> queues;
private final boolean respectPriority;
/**
* Compare two queues with possibly different priority and assigned capacity,
* Will be used by preemption policy as well.
*
* @param relativeAssigned1 relativeAssigned1
* @param relativeAssigned2 relativeAssigned2
* @param priority1 p1
* @param priority2 p2
* @return compared result
*/
public static int compare(double relativeAssigned1, double relativeAssigned2,
int priority1, int priority2) {
if (priority1 == priority2) {
// The queue with less relative used-capacity goes first
return Double.compare(relativeAssigned1, relativeAssigned2);
} else{
// When priority is different:
if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || (
relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) {
// When both the queues are under their guaranteed capacities,
// Or both the queues are over or meeting their guaranteed capacities
// queue with higher used-capacity goes first
return Integer.compare(priority2, priority1);
} else{
// Otherwise, when one of the queues is over or meeting their
// guaranteed capacities and the other is under: The queue that is
// under its capacity guarantee gets the resources.
return Double.compare(relativeAssigned1, relativeAssigned2);
}
}
}
/**
* Comparator that both looks at priority and utilization
*/
final public class PriorityQueueComparator
implements Comparator<PriorityQueueResourcesForSorting> {
final private String partition;
public PriorityQueueComparator(String partition) {
this.partition = partition;
}
@Override
public int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort) {
int rc = compareQueueAccessToPartition(
q1Sort.nodeLabelAccessible,
q2Sort.nodeLabelAccessible);
if (0 != rc) {
return rc;
}
float q1AbsCapacity = q1Sort.absoluteCapacity;
float q2AbsCapacity = q2Sort.absoluteCapacity;
//If q1's abs capacity > 0 and q2 is 0, then prioritize q1
if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity,
0f) == 0) {
return -1;
//If q2's abs capacity > 0 and q1 is 0, then prioritize q2
} else if (Float.compare(q2AbsCapacity, 0f) > 0 && Float.compare(
q1AbsCapacity, 0f) == 0) {
return 1;
} else if (Float.compare(q1AbsCapacity, 0f) == 0 && Float.compare(
q2AbsCapacity, 0f) == 0) {
// both q1 has 0 and q2 has 0 capacity, then fall back to using
// priority, abs used capacity to prioritize
float used1 = q1Sort.absoluteUsedCapacity;
float used2 = q2Sort.absoluteUsedCapacity;
return compare(q1Sort, q2Sort, used1, used2,
q1Sort.priority.
getPriority(), q2Sort.priority.getPriority());
} else{
// both q1 has positive abs capacity and q2 has positive abs
// capacity
float used1 = q1Sort.usedCapacity;
float used2 = q2Sort.usedCapacity;
return compare(q1Sort, q2Sort, used1, used2,
q1Sort.priority.getPriority(),
q2Sort.priority.getPriority());
}
}
private int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort, float q1Used,
float q2Used, int q1Prior, int q2Prior) {
int p1 = 0;
int p2 = 0;
if (respectPriority) {
p1 = q1Prior;
p2 = q2Prior;
}
int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
p1, p2);
// For queue with same used ratio / priority, queue with higher configured
// capacity goes first
if (0 == rc) {
Resource minEffRes1 =
q1Sort.configuredMinResource;
Resource minEffRes2 =
q2Sort.configuredMinResource;
if (!minEffRes1.equals(Resources.none()) || !minEffRes2.equals(
Resources.none())) {
return minEffRes2.compareTo(minEffRes1);
}
float abs1 = q1Sort.absoluteCapacity;
float abs2 = q2Sort.absoluteCapacity;
return Float.compare(abs2, abs1);
}
return rc;
}
private int compareQueueAccessToPartition(boolean q1Accessible, boolean q2Accessible) {
// Everybody has access to default partition
if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) {
return 0;
}
/*
* Check accessible to given partition, if one queue accessible and
* the other not, accessible queue goes first.
*/
if (q1Accessible && !q2Accessible) {
return -1;
} else if (!q1Accessible && q2Accessible) {
return 1;
}
return 0;
}
}
/**
* A simple storage class to represent a snapshot of a queue.
*/
public static class PriorityQueueResourcesForSorting {
private final float absoluteUsedCapacity;
private final float usedCapacity;
private final Resource configuredMinResource;
private final float absoluteCapacity;
private final Priority priority;
private final boolean nodeLabelAccessible;
private final CSQueue queue;
PriorityQueueResourcesForSorting(CSQueue queue, String partition) {
this.queue = queue;
this.absoluteUsedCapacity =
queue.getQueueCapacities().
getAbsoluteUsedCapacity(partition);
this.usedCapacity =
queue.getQueueCapacities().
getUsedCapacity(partition);
this.absoluteCapacity =
queue.getQueueCapacities().
getAbsoluteCapacity(partition);
this.configuredMinResource =
queue.getQueueResourceQuotas().
getConfiguredMinResource(partition);
this.priority = queue.getPriority();
this.nodeLabelAccessible = queue.getAccessibleNodeLabels() != null &&
queue.getAccessibleNodeLabels().contains(partition) ||
queue.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
}
static PriorityQueueResourcesForSorting create(CSQueue queue, String partition) {
return new PriorityQueueResourcesForSorting(queue, partition);
}
public CSQueue getQueue() {
return queue;
}
}
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
this.respectPriority = respectPriority;
}
@Override
public void setQueues(List<CSQueue> queues) {
this.queues = queues;
}
@Override
public Iterator<CSQueue> getAssignmentIterator(String partition) {
// Copy (for thread safety) and sort the snapshot of the queues in order to avoid breaking
// the prerequisites of TimSort. See YARN-10178 for details.
return new ArrayList<>(queues).stream()
.map(queue -> PriorityQueueResourcesForSorting.create(queue, partition))
.sorted(new PriorityQueueComparator(partition))
.map(PriorityQueueResourcesForSorting::getQueue)
.collect(Collectors.toList()).iterator();
}
@Override
public String getConfigName() {
if (respectPriority) {
return CapacitySchedulerConfiguration.
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY;
} else{
return CapacitySchedulerConfiguration.
QUEUE_UTILIZATION_ORDERING_POLICY;
}
}
@VisibleForTesting
public List<CSQueue> getQueues() {
return queues;
}
}