TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import java.io.IOException;

import org.junit.Before;
import org.junit.Test;

import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
    extends ProportionalCapacityPreemptionPolicyMockFramework {

  @Before
  public void setup() {
    super.setup();
    resourceCalculator = new DominantResourceCalculator();
    when(cs.getResourceCalculator()).thenReturn(resourceCalculator);
    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
  }

  @Test
  public void testInterQueuePreemptionWithMultipleResource() throws Exception {
    /**
     * Queue structure is:
     *
     * <pre>
     *           root
     *           /  \
     *          a    b
     * </pre>
     *
     */

    String labelsConfig = "=100:200,true"; // default partition
    String nodesConfig = "n1="; // only one node
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100:200 100:200 100:200 100:200]);" + //root
            "-a(=[50:100 100:200 40:80 30:70]);" + // a
            "-b(=[50:100 100:200 60:120 40:50])";   // b

    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t(1,2:4,n1,,20,false);" + // app1 in a
            "b\t(1,2:4,n1,,30,false)"; // app2 in b

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
    policy.editSchedule();

    // Preemption should happen in Queue b, preempt <10,20> to Queue a
    verify(eventHandler, never()).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(5)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
  }

  @Test
  public void testInterQueuePreemptionWithNaturalTerminationFactor()
      throws Exception {
    /**
     * Queue structure is:
     *
     * <pre>
     *       root
     *      /   \
     *     a     b
     * </pre>
     *
     * Guaranteed resource of a/b are 50:50 Total cluster resource = 100
     * Scenario: All resources are allocated to Queue A.
     * Even though Queue B needs few resources like 1 VCore, some resources
     * must be preempted from the app which is running in Queue A.
     */

    conf.setFloat(
        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
        (float) 0.2);

    String labelsConfig = "=100:50,true;";
    String nodesConfig = // n1 has no label
        "n1= res=100:50";
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[100:50 100:50 50:50 0:0]);" + // root
            "-a(=[50:25 100:50 50:50 0:0]);" + // a
            "-b(=[50:25 50:25 0:0 2:1]);"; // b

    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t(1,2:1,n1,,50,false);"; // app1 in a

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    verify(eventHandler, times(1)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
  }

  @Test
  public void test3ResourceTypesInterQueuePreemption() throws IOException {
    // Initialize resource map
    String RESOURCE_1 = "res1";
    riMap.put(RESOURCE_1, ResourceInformation
        .newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
            Integer.MAX_VALUE));

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

    /*
     *              root
     *           /  \  \
     *          a    b  c
     *
     *  A / B / C have 33.3 / 33.3 / 33.4 resources
     *  Total cluster resource have mem=30, cpu=18, GPU=6
     *  A uses mem=6, cpu=3, GPU=3
     *  B uses mem=6, cpu=3, GPU=3
     *  C is asking mem=1,cpu=1,GPU=1
     *
     *  We expect it can preempt from one of the jobs
     */
    String labelsConfig = "=30:18:6,true;";
    String nodesConfig = "n1= res=30:18:6;"; // n1 is default partition
    String queuesConfig =
        // guaranteed,max,used,pending
        "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root
            "-a(=[10:7:2 10:6:3 6:6:3 0:0:0]);" + // a
            "-b(=[10:6:2 10:6:3 6:6:3 0:0:0]);" + // b
            "-c(=[10:5:2 10:6:2 0:0:0 1:1:1])"; // c
    String appsConfig =
        //queueName\t(priority,resource,host,expression,#repeat,reserved)
        "a\t" // app1 in a1
            + "(1,2:2:1,n1,,3,false);" + "b\t" // app2 in b2
            + "(1,2:2:1,n1,,3,false)";

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    policy.editSchedule();

    verify(eventHandler, times(0)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
    verify(eventHandler, times(1)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(2))));
  }

  @SuppressWarnings("unchecked")
  @Test
  public void testInterQueuePreemptionWithStrictAndRelaxedDRF()
      throws IOException {

    /*
     *              root
     *           /  \  \
     *          a    b  c
     *
     *  A / B / C have 33.3 / 33.3 / 33.4 resources
     *  Total cluster resource have mem=61440, cpu=600
     *
     *  +=================+========================+
     *  | used in queue a | user limit for queue a |
     *  +=================+========================+
     *  |    61440:60     |       20480:200        |
     *  +=================+========================+
     *  In this case, the used memory is over the user limit but the used vCores
     *  is not. If conservative DRF is true, preemptions will not occur.
     *  If conservative DRF is false (default) preemptions will occur.
     */
    String labelsConfig = "=61440:600,true;";
    String nodesConfig = "n1= res=61440:600"; // n1 is default partition
    String queuesConfig =
        // guaranteed,max,used,pending,reserved
        "root(=[61440:600 61440:600 61440:600 20480:20 0]);" + // root
            "-a(=[20480:200 61440:600 61440:60 0:0 0]);" + // b
            "-b(=[20480:200 61440:600 0:0 20480:20 0]);" + // a
            "-c(=[20480:200 61440:600 0:0 0:0 0])"; // c
    String appsConfig =
        // queueName\t(priority,resource,host,expression,#repeat,reserved,pending,user)
        "a\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in a
        "b\t" + "(1,0:0,n1,,0,false,20480:20,user2);"; // app2 in b

    conf.setBoolean(
        CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
        true);

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    Resource ul = Resource.newInstance(20480, 20);
    when(((LeafQueue)(cs.getQueue("root.a")))
        .getResourceLimitForAllUsers(any(), any(), any(), any())
    ).thenReturn(ul);
    policy.editSchedule();

    verify(eventHandler, times(0)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));

    reset(eventHandler);

    conf.setBoolean(
        CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF,
        false);

    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
    ul = Resource.newInstance(20480, 20);
    when(((LeafQueue)(cs.getQueue("root.a")))
        .getResourceLimitForAllUsers(any(), any(), any(), any())
    ).thenReturn(ul);
    policy.editSchedule();

    verify(eventHandler, times(20)).handle(argThat(
        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
            getAppAttemptId(1))));
  }
}