TestCapacitySchedulerAmbiguousLeafs.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.Test;

import java.io.IOException;

public class TestCapacitySchedulerAmbiguousLeafs {
  /**
   * Internal counter for incremental application id generation
   */
  int appId = 0;

  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath DEFAULT = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".default");
  private static final QueuePath A = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".a");
  private static final QueuePath B = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".b");
  private static final QueuePath A_UNIQUE = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".a.unique");
  private static final QueuePath A_AMBI = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".a.ambi");
  private static final QueuePath B_AMBI = new QueuePath(CapacitySchedulerConfiguration.ROOT +
      ".b.ambi");

  /**
   * Helper method to submit applications via RMClientService, to make sure
   * all submissions go through RMAppManager.
   * @param rm The resource manager instance
   * @param queue Name of the queue to submit the application to
   * @return ApplicationID of the submitted application
   * @throws IOException
   * @throws YarnException
   */
  private ApplicationId submitApplication(MockRM rm, String queue)
      throws IOException, YarnException {
    //Generating incremental application id
    final ApplicationAttemptId appAttemptId = TestUtils
        .getMockApplicationAttemptId(appId++, 1);

    Resource resource = Resources.createResource(1024);
    ContainerLaunchContext amContainerSpec = ContainerLaunchContext
        .newInstance(null, null, null, null, null, null);
    ApplicationSubmissionContext asc = ApplicationSubmissionContext
        .newInstance(appAttemptId.getApplicationId(), "Test application",
            queue, null, amContainerSpec, false, true, 1, resource,
            "applicationType");

    SubmitApplicationRequest req = SubmitApplicationRequest.newInstance(asc);
    rm.getClientRMService().submitApplication(req);
    return appAttemptId.getApplicationId();
  }

  @Test
  public void testAmbiguousSubmissionWithACL() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
    conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);

    MockRM rm = new MockRM(conf);
    CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
    CapacitySchedulerConfiguration schedulerConf = cs.getConfiguration();

    schedulerConf.setQueues(ROOT, new String[] {"a", "b", "default"});
    schedulerConf.setAcl(ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
    schedulerConf.setAcl(ROOT, QueueACL.ADMINISTER_QUEUE, "forbidden forbidden");

    schedulerConf.setQueues(A, new String[] {"unique", "ambi"});
    schedulerConf.setAcl(A, QueueACL.SUBMIT_APPLICATIONS, "forbidden forbidden");
    schedulerConf.setCapacity(A, 45);

    schedulerConf.setQueues(B, new String[] {"ambi"});
    schedulerConf.setCapacity(B, 45);
    schedulerConf.setCapacity(DEFAULT, 10);

    schedulerConf.setCapacity(A_UNIQUE, 50);
    schedulerConf.setAcl(A_UNIQUE, QueueACL.SUBMIT_APPLICATIONS, "* *");
    schedulerConf.setCapacity(A_AMBI, 50);
    schedulerConf.setAcl(A_AMBI, QueueACL.SUBMIT_APPLICATIONS, "* *");
    schedulerConf.setCapacity(B_AMBI, 100);

    schedulerConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, "json");
    //Simple %specified mapping rule for all submissions with skip fallback
    //The %specified needed rule to make sure we get an
    //ApplicationPlacementContext which is required for validating YARN-10787
    schedulerConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
        "{\"rules\" : [{\"type\": \"user\", \"policy\" : \"specified\", " +
        "\"fallbackResult\" : \"skip\", \"matches\" : \"*\"}]}");
    schedulerConf.setOverrideWithQueueMappings(true);

    rm.start();
    cs.reinitialize(schedulerConf, rm.getRMContext());


    ApplicationId id = submitApplication(rm, "root.a.unique");
    rm.waitForState(id, RMAppState.ACCEPTED);

    id = submitApplication(rm, "unique");
    rm.waitForState(id, RMAppState.ACCEPTED);

    id = submitApplication(rm, "ambi");
    rm.waitForState(id, RMAppState.FAILED);

    id = submitApplication(rm, "root.a.ambi");
    rm.waitForState(id, RMAppState.ACCEPTED);

    rm.stop();
  }
}