TestPreemptionForQueueWithPriorities.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestPreemptionForQueueWithPriorities
    extends ProportionalCapacityPreemptionPolicyMockFramework {
  @BeforeEach
  public void setup() {
    resourceCalculator = new DefaultResourceCalculator();
    super.setup();
    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
  }

  @Test
  public void testPreemptionForHighestPriorityUnderutilizedQueue()
      throws IOException {
    /**
     * The simplest test of queue with priorities, Queue structure is:
     *
     * <pre>
     *        root
     *       / |  \
     *      a  b   c
     * </pre>
     *
     * For priorities
     * - a=1
     * - b/c=2
     *
     * So c will preempt more resource from a, till a reaches guaranteed
     * resource.
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[30 100 40 50]){priority=1};" + // a
            "-b(=[30 100 59 50]){priority=2};" + // b
            "-c(=[40 100 1 25]){priority=2}";    // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t(1,1,n1,,40,false);" + // app1 in a
        "b\t(1,1,n1,,59,false);" + // app2 in b
        "c\t(1,1,n1,,1,false);";   // app3 in c

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // 10 preempted from app1, 15 preempted from app2, and nothing preempted
    // from app3
    verify(eventHandler, times(10)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(15)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
  }

  @Test
  public void testPreemptionForLowestPriorityUnderutilizedQueue()
      throws IOException {
    /**
     * Similar to above, make sure we can still make sure less utilized queue
     * can get resource first regardless of priority.
     *
     * Queue structure is:
     *
     * <pre>
     *        root
     *       / |  \
     *      a  b   c
     * </pre>
     *
     * For priorities
     * - a=1
     * - b=2
     * - c=0
     *
     * So c will preempt more resource from a, till a reaches guaranteed
     * resource.
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[30 100 40 50]){priority=1};" + // a
            "-b(=[30 100 59 50]){priority=2};" + // b
            "-c(=[40 100 1 25]){priority=0}";    // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
            "a\t(1,1,n1,,40,false);" + // app1 in a
            "b\t(1,1,n1,,59,false);" + // app2 in b
            "c\t(1,1,n1,,1,false);";   // app3 in c

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // 10 preempted from app1, 15 preempted from app2, and nothing preempted
    // from app3
    verify(eventHandler, times(10)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(15)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
  }

  @Test
  public void testPreemptionWontHappenBetweenSatisfiedQueues()
      throws IOException {
    /**
     * No preemption happen if a queue is already satisfied, regardless of
     * priority
     *
     * Queue structure is:
     *
     * <pre>
     *        root
     *       / |  \
     *      a  b   c
     * </pre>
     *
     * For priorities
     * - a=1
     * - b=1
     * - c=2
     *
     * When c is satisfied, it will not preempt any resource from other queues
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[30 100 0 0]){priority=1};" + // a
            "-b(=[30 100 40 50]){priority=1};" + // b
            "-c(=[40 100 60 25]){priority=2}";   // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "b\t(1,1,n1,,40,false);" + // app1 in b
        "c\t(1,1,n1,,60,false)"; // app2 in c

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // Nothing preempted
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
  }

  @Test
  public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets()
      throws IOException {
    /**
     * When a cluster has different priorities, each priority has multiple
     * queues, preemption policy should try to balance resource between queues
     * with same priority by ratio of their capacities
     *
     * Queue structure is:
     *
     * <pre>
     * root
     * - a (capacity=10), p=1
     * - b (capacity=15), p=1
     * - c (capacity=20), p=2
     * - d (capacity=25), p=2
     * - e (capacity=30), p=2
     * </pre>
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[10 100 35 50]){priority=1};" + // a
            "-b(=[15 100 25 50]){priority=1};" + // b
            "-c(=[20 100 39 50]){priority=2};" + // c
            "-d(=[25 100 0 0]){priority=2};" + // d
            "-e(=[30 100 1 99]){priority=2}";   // e
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t(1,1,n1,,35,false);" + // app1 in a
        "b\t(1,1,n1,,25,false);" + // app2 in b
            "c\t(1,1,n1,,39,false);" + // app3 in c
            "e\t(1,1,n1,,1,false)"; // app4 in e

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // 23 preempted from app1, 6 preempted from app2, and nothing preempted
    // from app3/app4
    // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
    //  preemption is 1.58, close to 1.50)
    verify(eventHandler, times(23)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(6)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(4))));
  }

  @Test
  public void testPreemptionForPriorityAndDisablePreemption()
      throws IOException {
    /**
     * When a cluster has different priorities, each priority has multiple
     * queues, preemption policy should try to balance resource between queues
     * with same priority by ratio of their capacities.
     *
     * But also we need to make sure preemption disable will be honered
     * regardless of priority.
     *
     * Queue structure is:
     *
     * <pre>
     * root
     * - a (capacity=10), p=1
     * - b (capacity=15), p=1
     * - c (capacity=20), p=2
     * - d (capacity=25), p=2
     * - e (capacity=30), p=2
     * </pre>
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a
            "-b(=[15 100 25 50]){priority=1};" + // b
            "-c(=[20 100 39 50]){priority=2};" + // c
            "-d(=[25 100 0 0]){priority=2};" + // d
            "-e(=[30 100 1 99]){priority=2}";   // e
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t(1,1,n1,,35,false);" + // app1 in a
            "b\t(1,1,n1,,25,false);" + // app2 in b
            "c\t(1,1,n1,,39,false);" + // app3 in c
            "e\t(1,1,n1,,1,false)"; // app4 in e

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // We suppose to preempt some resource from A, but now since queueA
    // disables preemption, so we need to preempt some resource from B and
    // some from C even if C has higher priority than A
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(9)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, times(19)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(4))));
  }

  @Test
  public void testPriorityPreemptionForHierarchicalOfQueues()
      throws IOException {
    /**
     * When a queue has multiple hierarchy and different priorities:
     *
     * <pre>
     * root
     * - a (capacity=30), p=1
     *   - a1 (capacity=40), p=1
     *   - a2 (capacity=60), p=1
     * - b (capacity=30), p=1
     *   - b1 (capacity=50), p=1
     *   - b2 (capacity=50), p=2
     * - c (capacity=40), p=1
     * </pre>
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[29 100 40 50]){priority=1};" + // a
            "--a1(=[12 100 20 50]){priority=1};" + // a1
            "--a2(=[17 100 20 50]){priority=1};" + // a2
            "-b(=[31 100 59 50]){priority=1};" + // b
            "--b1(=[16 100 30 50]){priority=1};" + // b1
            "--b2(=[15 100 29 50]){priority=2};" + // b2
            "-c(=[40 100 1 30]){priority=1}";   // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a1\t(1,1,n1,,20,false);" + // app1 in a1
            "a2\t(1,1,n1,,20,false);" + // app2 in a2
            "b1\t(1,1,n1,,30,false);" + // app3 in b1
            "b2\t(1,1,n1,,29,false);" + // app4 in b2
            "c\t(1,1,n1,,1,false)"; // app5 in c


    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    // Preemption should first divide capacities between a / b, and b2 should
    // get less preemption than b1 (because b2 has higher priority)
    verify(eventHandler, times(6)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(1)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, times(13)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
    verify(eventHandler, times(10)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(4))));
  }

  @Test
  public void testPriorityPreemptionWithMandatoryResourceForHierarchicalOfQueues()
      throws Exception {
    /**
     * Queue structure is:
     *
     * <pre>
     *           root
     *           /  \
     *          a    b
     *        /  \  /  \
     *       a1  a2 b1  b2
     * </pre>
     *
     * a2 is underserved and need more resource. b2 will be preemptable.
     */

    String labelsConfig = "=100:200,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100:200 100:200 100:200 100:200]);" + //root
            "-a(=[50:100 100:200 20:40 60:100]){priority=1};" + // a
            "--a1(=[10:20 100:200 10:30 30:20]){priority=1};" + // a1
            "--a2(=[40:80 100:200 10:10 30:80]){priority=1};" + // a2
            "-b(=[50:100 100:200 80:160 40:100]){priority=1};" + // b
            "--b1(=[20:40 100:200 20:40 20:70]){priority=2};" + // b1
            "--b2(=[30:60 100:200 60:120 20:30]){priority=1}";// b2

    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a1\t(1,1:4,n1,,10,false);" + // app1 in a1
            "a2\t(1,1:1,n1,,10,false);" + // app2 in a2
            "b1\t(1,3:4,n1,,10,false);" + // app3 in b1
            "b2\t(1,20:40,n1,,3,false)";  // app4 in b2


    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
    policy.editSchedule();

    // Preemption should first divide capacities between a / b, and b1 should
    // get less preemption than b2 (because b1 has higher priority)
    verify(eventHandler, times(3)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
    verify(eventHandler, times(2)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(4))));
  }

  @Test
  public void testPriorityPreemptionWithMultipleResource()
      throws Exception {
    String RESOURCE_1 = "res1";

    riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
        ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

    /**
     * Queue structure is:
     *
     * <pre>
     *           root
     *           /  \
     *          a    b
     *        /  \
     *       a1  a2
     * </pre>
     *
     * a1 and a2 are using most of resources.
     * b needs more resources which is under served.
     */
    String labelsConfig =
        "=100:100:10,true;";
    String nodesConfig =
        "n1=;"; // n1 is default partition
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
            "-a(=[50:60:3 100:100:10 80:90:10 30:20:4]){priority=1};" + // a
            "--a1(=[20:15:3 100:50:10 60:50:10 0]){priority=1};" + // a1
            "--a2(=[30:45 100:50:10 20:40 30:20:4]){priority=2};" + // a2
            "-b(=[50:40:7 100:100:10 20:10 30:10:2]){priority=1}"; // b

    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a1\t" // app1 in a1
            + "(1,6:5:1,n1,,10,false);" +
            "a2\t" // app2 in a2
            + "(1,2:4,n1,,10,false);" +
            "b\t" // app3 in b
            + "(1,2:1,n1,,10,false)";

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
    policy.editSchedule();

    // Preemption should first divide capacities between a / b, and a2 should
    // get less preemption than a1 (because a2 has higher priority). More
    // specifically, a2 will not get preempted since the resource preempted
    // from a1 can satisfy b already.
    verify(eventHandler, times(7)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));

    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));

    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(3))));
  }

  @Test
  public void test3ResourceTypesInterQueuePreemption() throws IOException {
    resourceCalculator = new DominantResourceCalculator();
    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);

    // Initialize resource map
    String RESOURCE_1 = "res1";
    riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
        ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

    /**
     * Queue structure is:
     *
     * <pre>
     *              root
     *           /  \  \
     *          a    b  c
     * </pre>
     *  A / B / C have 33.3 / 33.3 / 33.4 resources
     *  Total cluster resource have mem=30, cpu=18, GPU=6
     *  A uses mem=6, cpu=3, GPU=3
     *  B uses mem=6, cpu=3, GPU=3
     *  C is asking mem=1,cpu=1,GPU=1
     *
     *  We expect it can preempt from one of the jobs
     */
    String labelsConfig =
        "=30:18:6,true;";
    String nodesConfig =
        "n1= res=30:18:6;"; // n1 is default partition
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root
            "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a
            "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b
            "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c
    String appsConfig=
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t" // app1 in a1
            + "(1,2:2:1,n1,,3,false);" +
            "b\t" // app2 in b2
            + "(1,2:2:1,n1,,3,false)";

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    verify(eventHandler, times(1)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
  }

  @Test
  public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues()
      throws IOException {
    /**
     * All queues are beyond guarantee, c has higher priority than b.
     * c ask for more resource, and there is no idle left, c should preempt
     * some resource from b but won���t let b under its guarantee.
     *
     * Queue structure is:
     *
     * <pre>
     *        root
     *       / |  \
     *      a  b   c
     * </pre>
     *
     * For priorities
     * - a=1
     * - b=1
     * - c=2
     *
     */
    String labelsConfig = "=100,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100 100 100 100]);" + //root
            "-a(=[30 100 0 0]){priority=1};" + // a
            "-b(=[30 100 40 50]){priority=1};" + // b
            "-c(=[40 100 60 25]){priority=2}";   // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "b\t(1,1,n1,,40,false);" + // app1 in b
            "c\t(1,1,n1,,60,false)"; // app2 in c

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    CapacitySchedulerConfiguration newConf =
        new CapacitySchedulerConfiguration(conf);
    boolean isPreemptionToBalanceRequired = true;
    newConf.setBoolean(
        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
        isPreemptionToBalanceRequired);
    when(cs.getConfiguration()).thenReturn(newConf);
    policy.editSchedule();

    // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though
    // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
    // since c has higher priority, c will be put in mostUnderServedQueue and
    // get all remain 30 capacity.
    verify(eventHandler, times(10)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
  }
}