TestCapacitySchedulerQueueMappingFactory.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.util.Records;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class TestCapacitySchedulerQueueMappingFactory {

  private static final String QUEUE_MAPPING_NAME = "app-name";
  private static final String QUEUE_MAPPING_RULE =
      CSMappingPlacementRule.class.getCanonicalName();
  public static final String USER = "user_";
  public static final String PARENT_QUEUE = "c";

  public static CapacitySchedulerConfiguration setupQueueMappingsForRules(
      CapacitySchedulerConfiguration conf, String parentQueue,
      boolean overrideWithQueueMappings, int[] sourceIds) {

    List<String> queuePlacementRules = new ArrayList<>();

    queuePlacementRules.add(QUEUE_MAPPING_RULE);

    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();

    //set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();
    for (int i = 0; i < sourceIds.length; i++) {
      //Set C as parent queue name for auto queue creation
      QueueMapping userQueueMapping = QueueMappingBuilder.create()
                                          .type(MappingType.USER)
                                          .source(USER + sourceIds[i])
                                          .queue(
                                              getQueueMapping(parentQueue,
                                                  USER + sourceIds[i]))
                                          .build();
      queueMappingsForUG.add(userQueueMapping);
    }

    existingMappingsForUG.addAll(queueMappingsForUG);
    conf.setQueueMappings(existingMappingsForUG);

    List<QueueMapping> existingMappingsForAN =
        conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);

    //set queue mapping
    List<QueueMapping> queueMappingsForAN =
        new ArrayList<>();
    for (int i = 0; i < sourceIds.length; i++) {
      //Set C as parent queue name for auto queue creation
      QueueMapping queueMapping = QueueMapping.QueueMappingBuilder.create()
          .type(MappingType.APPLICATION)
          .source(USER + sourceIds[i])
          .queue(getQueueMapping(parentQueue, USER + sourceIds[i]))
          .build();

      queueMappingsForAN.add(queueMapping);
    }

    existingMappingsForAN.addAll(queueMappingsForAN);
    conf.setQueueMappingEntities(existingMappingsForAN, QUEUE_MAPPING_NAME);
    //override with queue mappings
    conf.setOverrideWithQueueMappings(overrideWithQueueMappings);
    return conf;
  }

  @Test
  public void testUpdatePlacementRulesFactory() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    // init queue mapping for UserGroupMappingRule and AppNameMappingRule
    setupQueueMappingsForRules(conf, PARENT_QUEUE, true, new int[] {1, 2, 3});

    MockRM mockRM = null;
    try {
      mockRM = new MockRM(conf);
      CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
      cs.updatePlacementRules();
      mockRM.start();
      cs.start();

      List<PlacementRule> rules = cs.getRMContext()
          .getQueuePlacementManager().getPlacementRules();

      List<String> placementRuleNames = new ArrayList<>();
      for (PlacementRule pr : rules) {
        placementRuleNames.add(pr.getName());
      }

      // verify both placement rules were added successfully
      assertThat(placementRuleNames).contains(QUEUE_MAPPING_RULE);
    } finally {
      if(mockRM != null) {
        mockRM.close();
      }
    }
  }

  @Test
  public void testNestedUserQueueWithStaticParentQueue() throws Exception {

    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);

    List<String> queuePlacementRules = new ArrayList<>();
    queuePlacementRules.add(QUEUE_MAPPING_RULE);
    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();

    // set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();

    // u:user1:b1
    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.USER)
                                        .source("user1")
                                        .queue("b1")
                                        .build();

    // u:%user:parentqueue.%user
    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.USER)
                                        .source("%user")
                                        .queue(getQueueMapping("c", "%user"))
                                        .build();

    queueMappingsForUG.add(userQueueMapping1);
    queueMappingsForUG.add(userQueueMapping2);

    existingMappingsForUG.addAll(queueMappingsForUG);
    conf.setQueueMappings(existingMappingsForUG);

    // override with queue mappings
    conf.setOverrideWithQueueMappings(true);

    MockRM mockRM = null;
    try {
      mockRM = new MockRM(conf);
      CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
      cs.updatePlacementRules();
      mockRM.start();
      cs.start();

      ApplicationSubmissionContext asc =
          Records.newRecord(ApplicationSubmissionContext.class);
      asc.setQueue("default");

      List<PlacementRule> rules =
          cs.getRMContext().getQueuePlacementManager().getPlacementRules();

      CSMappingPlacementRule r =
          (CSMappingPlacementRule) rules.get(0);

      ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
      assertEquals("b1", ctx.getQueue(), "Queue");

      ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "user2");
      assertEquals("user2", ctx2.getQueue(), "Queue");
      assertEquals("root.c", ctx2.getParentQueue(), "Queue");
    } finally {
      if(mockRM != null) {
        mockRM.close();
      }
    }
  }

  @Test
  public void testNestedUserQueueWithPrimaryGroupAsDynamicParentQueue()
      throws Exception {

    /**
     * Mapping order: 1. u:%user:%primary_group.%user 2.
     * u:%user:%secondary_group.%user
     *
     * Expected parent queue is primary group of the user
     */

    // set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();

    // u:%user:%primary_group.%user
    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.USER)
                                        .source("%user")
                                        .queue(
                                            getQueueMapping("%primary_group",
                                                "%user"))
                                        .build();

    // u:%user:%secondary_group.%user
    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.USER)
                                        .source("%user")
                                        .queue(
                                            getQueueMapping("%secondary_group",
                                                "%user"))
                                        .build();

    // u:b4:%secondary_group
    QueueMapping userQueueMapping3 = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.USER)
                                        .source("b4")
                                        .queue("%secondary_group")
                                        .build();
    queueMappingsForUG.add(userQueueMapping1);
    queueMappingsForUG.add(userQueueMapping2);
    queueMappingsForUG.add(userQueueMapping3);

    testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "f");

    try {
      testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "g");
      fail("Queue 'g' exists, but type is not Leaf Queue");
    } catch (YarnException e) {
      // Exception is expected as there is no such leaf queue
    }

    try {
      testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "a1");
      fail("Actual Parent Queue of Leaf Queue 'a1' is 'a', but as per queue "
          + "mapping it returns primary queue as 'a1group'");
    } catch (YarnException e) {
      // Exception is expected as there is mismatch in expected and actual
      // parent queue
    }
  }

  @Test
  public void testNestedUserQueueWithSecondaryGroupAsDynamicParentQueue()
      throws Exception {

    /**
     * Mapping order: 1. u:%user:%secondary_group.%user 2.
     * u:%user:%primary_group.%user
     *
     * Expected parent queue is secondary group of the user
     */

    // set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();

    // u:%user:%primary_group.%user
    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("%user")
                                          .queue(
                                              getQueueMapping("%primary_group",
                                                  "%user"))
                                          .build();

    // u:%user:%secondary_group.%user
    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("%user")
                                          .queue(
                                              getQueueMapping(
                                                  "%secondary_group", "%user")
                                              )
                                          .build();

    queueMappingsForUG.add(userQueueMapping2);
    queueMappingsForUG.add(userQueueMapping1);

    testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, false, "e");
  }

  private void testNestedUserQueueWithDynamicParentQueue(
      List<QueueMapping> mapping, boolean primary, String user)
      throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);

    List<String> queuePlacementRules = new ArrayList<>();
    queuePlacementRules.add(QUEUE_MAPPING_RULE);
    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();

    existingMappingsForUG.addAll(mapping);
    conf.setQueueMappings(existingMappingsForUG);

    // override with queue mappings
    conf.setOverrideWithQueueMappings(true);

    MockRM mockRM = null;
    try {
      mockRM = new MockRM(conf);
      CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
      cs.updatePlacementRules();
      mockRM.start();
      cs.start();

      ApplicationSubmissionContext asc =
          Records.newRecord(ApplicationSubmissionContext.class);
      asc.setQueue("default");

      List<PlacementRule> rules =
          cs.getRMContext().getQueuePlacementManager().getPlacementRules();

      CSMappingPlacementRule r =
          (CSMappingPlacementRule) rules.get(0);
      ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
      assertEquals(user, ctx.getQueue(), "Queue");

      if (primary) {
        assertEquals("root." + user + "group", ctx.getParentQueue(), "Primary Group");
      } else {
        assertEquals("root." + user + "subgroup1",
            ctx.getParentQueue(), "Secondary Group");
      }
    } finally {
      if (mockRM != null) {
        mockRM.close();
      }
    }
  }

  @Test
  public void testDynamicPrimaryGroupQueue() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);

    List<String> queuePlacementRules = new ArrayList<>();
    queuePlacementRules.add(QUEUE_MAPPING_RULE);
    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();

    // set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();

    // u:user1:b1
    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("user1")
                                          .queue("b1")
                                          .build();

    // u:user2:%primary_group
    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("a1")
                                          .queue("%primary_group")
                                          .build();

    queueMappingsForUG.add(userQueueMapping1);
    queueMappingsForUG.add(userQueueMapping2);
    existingMappingsForUG.addAll(queueMappingsForUG);
    conf.setQueueMappings(existingMappingsForUG);

    // override with queue mappings
    conf.setOverrideWithQueueMappings(true);

    MockRM mockRM = null;
    try {
      mockRM = new MockRM(conf);
      CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
      cs.updatePlacementRules();
      mockRM.start();
      cs.start();

      ApplicationSubmissionContext asc =
          Records.newRecord(ApplicationSubmissionContext.class);
      asc.setQueue("default");

      List<PlacementRule> rules =
          cs.getRMContext().getQueuePlacementManager().getPlacementRules();
      CSMappingPlacementRule r =
          (CSMappingPlacementRule) rules.get(0);

      ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
      assertEquals("b1", ctx.getQueue(), "Queue");

      ApplicationPlacementContext ctx1 = r.getPlacementForApp(asc, "a1");
      assertEquals("a1group", ctx1.getQueue(), "Queue");
    } finally {
      if (mockRM != null) {
        mockRM.close();
      }
    }
  }

  @Test
  public void testFixedUserWithDynamicGroupQueue() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);

    List<String> queuePlacementRules = new ArrayList<>();
    queuePlacementRules.add(QUEUE_MAPPING_RULE);
    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();

    // set queue mapping
    List<QueueMapping> queueMappingsForUG = new ArrayList<>();

    // u:user1:b1
    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("user1")
                                          .queue("b1")
                                          .build();

    // u:user2:%primary_group
    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("a1")
                                          .queue("%primary_group")
                                          .build();

    // u:b4:c.%secondary_group
    QueueMapping userQueueMapping3 = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source("b4")
                                          .queue("root.b.%secondary_group")
                                          .build();

    queueMappingsForUG.add(userQueueMapping1);
    queueMappingsForUG.add(userQueueMapping2);
    queueMappingsForUG.add(userQueueMapping3);
    existingMappingsForUG.addAll(queueMappingsForUG);
    conf.setQueueMappings(existingMappingsForUG);

    //override with queue mappings
    conf.setOverrideWithQueueMappings(true);

    MockRM mockRM = null;
    try {
      mockRM = new MockRM(conf);
      CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
      cs.updatePlacementRules();
      mockRM.start();
      cs.start();

      ApplicationSubmissionContext asc =
          Records.newRecord(ApplicationSubmissionContext.class);
      asc.setQueue("default");

      List<PlacementRule> rules =
          cs.getRMContext().getQueuePlacementManager().getPlacementRules();
      CSMappingPlacementRule r =
          (CSMappingPlacementRule) rules.get(0);

      ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
      assertEquals("b1", ctx.getQueue(), "Queue");

      ApplicationPlacementContext ctx1 = r.getPlacementForApp(asc, "a1");
      assertEquals("a1group", ctx1.getQueue(), "Queue");

      ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "b4");
      assertEquals("b4subgroup1", ctx2.getQueue(), "Queue");
      assertEquals("root.b", ctx2.getParentQueue(), "Queue");
    } finally {
      if (mockRM != null) {
        mockRM.close();
      }
    }
  }
}