TestCapacitySchedulerWorkflowPriorityMapping.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;

public class TestCapacitySchedulerWorkflowPriorityMapping {
  private MockRM mockRM = null;

  @AfterEach
  public void tearDown() {
    if (mockRM != null) {
      mockRM.stop();
    }
  }

  private static void setWorkFlowPriorityMappings(
      CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});

    conf.setCapacity(A, A_CAPACITY);
    conf.setCapacity(B, B_CAPACITY);

    // Define 2nd-level queues
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setCapacity(A1, A1_CAPACITY);
    conf.setCapacity(A2, A2_CAPACITY);

    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
    conf.setCapacity(B1, B1_CAPACITY);
    conf.setCapacity(B2, B2_CAPACITY);
    conf.setCapacity(B3, B3_CAPACITY);

    List<WorkflowPriorityMapping> mappings = Arrays.asList(
        new WorkflowPriorityMapping("workflow1", B.getFullPath(), Priority.newInstance(2)),
        new WorkflowPriorityMapping("workflow2", A1.getFullPath(), Priority.newInstance(3)),
        new WorkflowPriorityMapping("Workflow3", A.getFullPath(), Priority.newInstance(4)));
    conf.setWorkflowPriorityMappings(mappings);
  }

  @Test
  public void testWorkflowPriorityMappings() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setBoolean(CapacitySchedulerConfiguration
        .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true);
    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);

    // Initialize workflow priority mappings.
    setWorkFlowPriorityMappings(conf);

    mockRM = new MockRM(conf);
    CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
    mockRM.start();
    cs.start();

    Map<String, Object> expected = ImmutableMap.of(
        A.getFullPath(), ImmutableMap.of("workflow3", Priority.newInstance(4)),
        B.getFullPath(), ImmutableMap.of("workflow1", Priority.newInstance(2)),
        A1.getFullPath(), ImmutableMap.of("workflow2", Priority.newInstance(3)));
    assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
        .getWorkflowPriorityMappings());

    // Maps to rule corresponding to parent queue "a" for workflow3.
    MockRMAppSubmitter.submit(mockRM,
        MockRMAppSubmissionData.Builder.createWithMemory(1, mockRM)
            .withQueue("a2")
            .withApplicationId(ApplicationId.newInstance(0, 1))
            .withAppPriority(Priority.newInstance(0))
            .withApplicationTags(ImmutableSet.of(
                YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
                    + "workflow3"))
            .build());
    RMApp app =
        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,1));
    assertEquals(4, app.getApplicationSubmissionContext().getPriority()
        .getPriority());

    // Does not match any rule as rule for queue + workflow does not exist.
    // Priority passed in the app is taken up.
    MockRMAppSubmitter.submit(mockRM,
        MockRMAppSubmissionData.Builder.createWithMemory(1, mockRM)
            .withQueue("a1")
            .withApplicationId(ApplicationId.newInstance(0, 2))
            .withAppPriority(Priority.newInstance(6))
            .withApplicationTags(ImmutableSet.of(
                YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
                    + "workflow1"))
            .build());
    app =
        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,2));
    assertEquals(6, app.getApplicationSubmissionContext().getPriority()
        .getPriority());

    // Maps to rule corresponding to parent queue "a1" for workflow2.
    MockRMAppSubmitter.submit(mockRM,
        MockRMAppSubmissionData.Builder.createWithMemory(1, mockRM)
            .withQueue("a1")
            .withApplicationId(ApplicationId.newInstance(0, 3))
            .withAppPriority(Priority.newInstance(0))
            .withApplicationTags(ImmutableSet.of(
                YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
                    + "workflow2"))
            .build());
    app =
        mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,3));
    assertEquals(3, app.getApplicationSubmissionContext().getPriority()
        .getPriority());

    // Maps to rule corresponding to parent queue "b" for workflow1.
    MockRMAppSubmitter.submit(mockRM,
        MockRMAppSubmissionData.Builder.createWithMemory(1, mockRM)
            .withQueue("b3")
            .withApplicationId(ApplicationId.newInstance(0, 4))
            .withAppPriority(Priority.newInstance(0))
            .withApplicationTags(ImmutableSet.of(
                YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
                    + "workflow1"))
            .build());
    app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,4));
    assertEquals(2, app.getApplicationSubmissionContext().getPriority()
        .getPriority());

    // Disable workflow priority mappings override and reinitialize scheduler.
    conf.setBoolean(CapacitySchedulerConfiguration
        .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, false);
    cs.reinitialize(conf, mockRM.getRMContext());

    MockRMAppSubmitter.submit(mockRM,
        MockRMAppSubmissionData.Builder.createWithMemory(1, mockRM)
            .withQueue("a2")
            .withApplicationId(ApplicationId.newInstance(0, 5))
            .withAppPriority(Priority.newInstance(0))
            .withApplicationTags(ImmutableSet.of(
                YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX
                    + "workflow3"))
            .build());
    app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,5));
    assertEquals(0, app.getApplicationSubmissionContext().getPriority()
        .getPriority());
  }
}