CapacitySchedulerQueueHelpers.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public final class CapacitySchedulerQueueHelpers {
public static final String DEFAULT_PATH = CapacitySchedulerConfiguration.ROOT + ".default";
public static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
public static final String B_PATH = CapacitySchedulerConfiguration.ROOT + ".b";
public static final String A_CHILD_PATH = A_PATH + ".a";
public static final String A1_PATH = A_PATH + ".a1";
public static final String A2_PATH = A_PATH + ".a2";
public static final String A3_PATH = A_PATH + ".a3";
public static final String B1_PATH = B_PATH + ".b1";
public static final String B2_PATH = B_PATH + ".b2";
public static final String B3_PATH = B_PATH + ".b3";
public static final String A1_B1_PATH = A1_PATH + ".b1";
public static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
public static final QueuePath DEFAULT = new QueuePath(DEFAULT_PATH);
public static final QueuePath A = new QueuePath(A_PATH);
public static final QueuePath A_CHILD = new QueuePath(A_CHILD_PATH);
public static final QueuePath A1 = new QueuePath(A1_PATH);
public static final QueuePath A2 = new QueuePath(A2_PATH);
public static final QueuePath A3 = new QueuePath(A3_PATH);
public static final QueuePath B = new QueuePath(B_PATH);
public static final QueuePath B1 = new QueuePath(B1_PATH);
public static final QueuePath B2 = new QueuePath(B2_PATH);
public static final QueuePath B3 = new QueuePath(B3_PATH);
public static final QueuePath A1_B1 = new QueuePath(A1_B1_PATH);
public static final float A_CAPACITY = 10.5f;
public static final float B_CAPACITY = 89.5f;
public static final String P1_PATH = CapacitySchedulerConfiguration.ROOT + ".p1";
public static final String P2_PATH = CapacitySchedulerConfiguration.ROOT + ".p2";
public static final String X1_PATH = P1_PATH + ".x1";
public static final String X2_PATH = P1_PATH + ".x2";
public static final String Y1_PATH = P2_PATH + ".y1";
public static final String Y2_PATH = P2_PATH + ".y2";
public static final QueuePath P1 = new QueuePath(P1_PATH);
public static final QueuePath P2 = new QueuePath(P2_PATH);
public static final QueuePath X1 = new QueuePath(X1_PATH);
public static final QueuePath X2 = new QueuePath(X2_PATH);
public static final QueuePath Y1 = new QueuePath(Y1_PATH);
public static final QueuePath Y2 = new QueuePath(Y2_PATH);
public static final float A1_CAPACITY = 30;
public static final float A2_CAPACITY = 70;
public static final float B1_CAPACITY = 79.2f;
public static final float B2_CAPACITY = 0.8f;
public static final float B3_CAPACITY = 20;
private CapacitySchedulerQueueHelpers() {
throw new IllegalStateException("Utility class");
}
/**
* @param conf, to be modified
* @return
* root
* / \
* a b
* / \ / | \
* a1 a2 b1 b2 b3
*
*/
public static CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
conf.setQueues(B, new String[]{"b1", "b2", "b3"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, B2_CAPACITY);
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
return conf;
}
public static CapacitySchedulerConfiguration setupAdditionalQueues(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2", "a3"});
conf.setCapacity(A1, 30.0f);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, 30.0f);
conf.setUserLimitFactor(A2, 100.0f);
conf.setCapacity(A3, 40.0f);
conf.setUserLimitFactor(A3, 100.0f);
conf.setQueues(B, new String[]{"b1", "b2", "b3"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, B2_CAPACITY);
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
return conf;
}
/**
* @param conf, to be modified
* @return CS configuration which has deleted all children of queue(b)
* root
* / \
* a b
* / \
* a1 a2
*/
public static CapacitySchedulerConfiguration setupQueueConfAmbiguousQueue(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a", "a1"});
conf.setCapacity(A_CHILD, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A1, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
return conf;
}
/**
* @param conf, to be modified
* @return CS configuration which has deleted all childred of queue(b)
* root
* / \
* a b
* / \
* a1 a2
*/
public static CapacitySchedulerConfiguration setupQueueConfWithoutChildrenOfB(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
return conf;
}
/**
* @param conf, to be modified
* @return CS configuration which has deleted a queue(b1)
* root
* / \
* a b
* / \ | \
* a1 a2 b2 b3
*/
public static CapacitySchedulerConfiguration setupQueueConfigurationWithoutB1(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
conf.setQueues(B, new String[]{"b2", "b3"});
conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
return conf;
}
/**
* @param conf, to be modified
* @return CS configuration which has converted b1 to parent queue
* root
* / \
* a b
* / \ / | \
* a1 a2 b1 b2 b3
* |
* b11
*/
public static CapacitySchedulerConfiguration setupQueueConfigurationWithB1AsParentQueue(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
conf.setQueues(B, new String[]{"b1", "b2", "b3"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setUserLimitFactor(B1, 100.0f);
conf.setCapacity(B2, B2_CAPACITY);
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
// Set childQueue for B1
conf.setQueues(B1, new String[]{"b11"});
final String b11Path = B1 + ".b11";
final QueuePath b11 = new QueuePath(b11Path);
conf.setCapacity(b11, 100.0f);
conf.setUserLimitFactor(b11, 100.0f);
return conf;
}
/**
* @param conf, to be modified
* @return CS configuration which has deleted a Parent queue(b)
*/
public static CapacitySchedulerConfiguration setupQueueConfigurationWithoutB(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT, new String[]{"a"});
conf.setCapacity(A, A_CAPACITY + B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
return conf;
}
public static CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"a", "b"});
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
conf.setUserLimitFactor(A, 100);
conf.setUserLimitFactor(B, 100);
conf.setMaximumCapacity(A, 100);
conf.setMaximumCapacity(B, 100);
return conf;
}
public static CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(ROOT,
new String[]{"p1", "p2"});
conf.setCapacity(P1, 50f);
conf.setMaximumCapacity(P1, 50f);
conf.setCapacity(P2, 50f);
conf.setMaximumCapacity(P2, 100f);
// Define 2nd-level queues
conf.setQueues(P1, new String[]{"x1", "x2"});
conf.setCapacity(X1, 80f);
conf.setMaximumCapacity(X1, 100f);
conf.setUserLimitFactor(X1, 2f);
conf.setCapacity(X2, 20f);
conf.setMaximumCapacity(X2, 100f);
conf.setUserLimitFactor(X2, 2f);
conf.setQueues(P2, new String[]{"y1", "y2"});
conf.setCapacity(Y1, 80f);
conf.setUserLimitFactor(Y1, 2f);
conf.setCapacity(Y2, 20f);
conf.setUserLimitFactor(Y2, 2f);
return conf;
}
public static class ExpectedCapacities {
private final float capacity;
private final float absCapacity;
public ExpectedCapacities(float capacity, float parentCapacity) {
this.capacity = capacity;
absCapacity = this.capacity * parentCapacity;
}
public float getCapacity() {
return capacity;
}
public float getAbsCapacity() {
return absCapacity;
}
}
public static Map<String, ExpectedCapacities> getDefaultCapacities(float capA, float capB) {
Map<String, ExpectedCapacities> capacities = new HashMap<>();
capacities.put(A.getFullPath(), new ExpectedCapacities(capA, 1.0f));
capacities.put(B.getFullPath(), new ExpectedCapacities(capB, 1.0f));
capacities.put(A1.getFullPath(), new ExpectedCapacities((A1_CAPACITY / 100.0f), capA));
capacities.put(A2.getFullPath(), new ExpectedCapacities((A2_CAPACITY / 100.0f), capA));
capacities.put(B1.getFullPath(), new ExpectedCapacities((B1_CAPACITY / 100.0f), capB));
capacities.put(B2.getFullPath(), new ExpectedCapacities((B2_CAPACITY / 100.0f), capB));
capacities.put(B3.getFullPath(), new ExpectedCapacities((B3_CAPACITY / 100.0f), capB));
return capacities;
}
public static void checkQueueStructureCapacities(CapacityScheduler cs) {
float capA = A_CAPACITY / 100.0f;
float capB = B_CAPACITY / 100.0f;
checkQueueStructureCapacities(cs, getDefaultCapacities(capA, capB));
}
public static void checkQueueStructureCapacities(CapacityScheduler cs,
Map<String, ExpectedCapacities> capacities) {
CSQueue rootQueue = cs.getRootQueue();
for (Map.Entry<String, ExpectedCapacities> entry : capacities.entrySet()) {
CSQueue queue = findQueue(rootQueue, entry.getKey());
assertNotNull(queue);
assertQueueCapacities(queue, entry.getValue());
}
}
public static void assertQueueCapacities(CSQueue q, ExpectedCapacities capacities) {
final float epsilon = 1e-4f;
assertEquals(capacities.getCapacity(), q.getCapacity(), epsilon, "capacity");
assertEquals(capacities.getAbsCapacity(),
q.getAbsoluteCapacity(), epsilon, "absolute capacity");
assertEquals(1.0f, q.getMaximumCapacity(), epsilon, "maximum capacity");
assertEquals(1.0f, q.getAbsoluteMaximumCapacity(), epsilon, "absolute maximum capacity");
}
public static CSQueue findQueue(CSQueue root, String queuePath) {
if (root.getQueuePath().equals(queuePath)) {
return root;
}
List<CSQueue> childQueues = root.getChildQueues();
if (childQueues != null) {
for (CSQueue q : childQueues) {
if (queuePath.startsWith(q.getQueuePath())) {
CSQueue result = findQueue(q, queuePath);
if (result != null) {
return result;
}
}
}
}
return null;
}
}