TestAppManagerWithFairScheduler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Collections;

import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;


import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
    .allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
    .allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * Testing RMAppManager application submission with fair scheduler.
 */
public class TestAppManagerWithFairScheduler extends AppManagerTestBase {

  private static final String TEST_FOLDER = "test-queues";

  private static YarnConfiguration conf = new YarnConfiguration();
  private PlacementManager placementMgr;
  private TestRMAppManager rmAppManager;
  private RMContext rmContext;
  private static String allocFileName =
      GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();

  @BeforeEach
  public void setup() throws IOException {
    // Basic config with one queue (override in test if needed)
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("test").build())
        .writeToFile(allocFileName);

    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
        ResourceScheduler.class);

    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFileName);
    placementMgr = mock(PlacementManager.class);

    MockRM mockRM = new MockRM(conf);
    rmContext = mockRM.getRMContext();
    rmContext.setQueuePlacementManager(placementMgr);
    ApplicationMasterService masterService = new ApplicationMasterService(
        rmContext, rmContext.getScheduler());

    rmAppManager = new TestRMAppManager(rmContext,
        new ClientToAMTokenSecretManagerInRM(), rmContext.getScheduler(),
        masterService, new ApplicationACLsManager(conf), conf);
  }

  @AfterEach
  public void teardown(){
    File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER);
    allocFile.delete();
  }

  @Test
  public void testQueueSubmitWithHighQueueContainerSize()
      throws YarnException, IOException {
    int maxAlloc =
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;

    // scheduler config with a limited queue
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .subQueue(new AllocationFileQueue.Builder("limited")
                .maxContainerAllocation(maxAlloc + " mb 1 vcores")
                .build())
            .subQueue(new AllocationFileQueue.Builder("unlimited")
                .build())
            .build())
        .writeToFile(allocFileName);
    rmContext.getScheduler().reinitialize(conf, rmContext);

    ApplicationId appId = MockApps.newAppID(1);
    Resource res = Resources.createResource(maxAlloc + 1);
    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);

    // Submit to limited queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("limited"));
    try {
      rmAppManager.submitApplication(asContext, "test");
      fail("Test should fail on too high allocation!");
    } catch (InvalidResourceRequestException e) {
      assertEquals(GREATER_THEN_MAX_ALLOCATION,
          e.getInvalidResourceType());
    }

    // submit same app but now place it in the unlimited queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("root.unlimited"));
    rmAppManager.submitApplication(asContext, "test");
  }

  @Test
  public void testQueueSubmitWithPermissionLimits()
      throws YarnException, IOException {

    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .aclSubmitApps(" ")
            .aclAdministerApps(" ")
            .subQueue(new AllocationFileQueue.Builder("noaccess")
                .build())
            .subQueue(new AllocationFileQueue.Builder("submitonly")
                .aclSubmitApps("test ")
                .aclAdministerApps(" ")
                .build())
            .subQueue(new AllocationFileQueue.Builder("adminonly")
                .aclSubmitApps(" ")
                .aclAdministerApps("test ")
                .build())
            .build())
        .writeToFile(allocFileName);
    rmContext.getScheduler().reinitialize(conf, rmContext);

    ApplicationId appId = MockApps.newAppID(1);
    Resource res = Resources.createResource(1024, 1);
    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);

    // Submit to no access queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("noaccess"));
    try {
      rmAppManager.submitApplication(asContext, "test");
      fail("Test should have failed with access denied");
    } catch (YarnException e) {
      assertTrue(e.getCause() instanceof AccessControlException,
          "Access exception not found");
    }
    // Submit to submit access queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("submitonly"));
    rmAppManager.submitApplication(asContext, "test");
    // Submit second app to admin access queue
    appId = MockApps.newAppID(2);
    asContext = createAppSubmitCtx(appId, res);
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("adminonly"));
    rmAppManager.submitApplication(asContext, "test");
  }

  @Test
  public void testQueueSubmitWithRootPermission()
      throws YarnException, IOException {

    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .subQueue(new AllocationFileQueue.Builder("noaccess")
                .aclSubmitApps(" ")
                .aclAdministerApps(" ")
                .build())
            .build())
        .writeToFile(allocFileName);
    rmContext.getScheduler().reinitialize(conf, rmContext);

    ApplicationId appId = MockApps.newAppID(1);
    Resource res = Resources.createResource(1024, 1);
    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);

    // Submit to noaccess queue should be allowed by root ACL
    when(placementMgr.placeApplication(any(), any()))
        .thenReturn(new ApplicationPlacementContext("noaccess"));
    rmAppManager.submitApplication(asContext, "test");
  }

  @Test
  public void testQueueSubmitWithAutoCreateQueue()
      throws YarnException, IOException {

    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .aclSubmitApps(" ")
            .aclAdministerApps(" ")
            .subQueue(new AllocationFileQueue.Builder("noaccess")
                .parent(true)
                .build())
            .subQueue(new AllocationFileQueue.Builder("submitonly")
                .parent(true)
                .aclSubmitApps("test ")
                .build())
            .build())
        .writeToFile(allocFileName);
    rmContext.getScheduler().reinitialize(conf, rmContext);

    ApplicationId appId = MockApps.newAppID(1);
    Resource res = Resources.createResource(1024, 1);
    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);

    // Submit to noaccess parent with non existent child queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
    try {
      rmAppManager.submitApplication(asContext, "test");
      fail("Test should have failed with access denied");
    } catch (YarnException e) {
      assertTrue(e.getCause() instanceof AccessControlException,
          "Access exception not found");
    }
    // Submit to submitonly parent with non existent child queue
    when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
        .thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
    rmAppManager.submitApplication(asContext, "test");
  }

  private ApplicationSubmissionContext createAppSubmitCtx(ApplicationId appId,
                                                          Resource res) {
    ApplicationSubmissionContext asContext =
        Records.newRecord(ApplicationSubmissionContext.class);
    asContext.setApplicationId(appId);
    ResourceRequest resReg =
        ResourceRequest.newInstance(Priority.newInstance(0),
            ResourceRequest.ANY, res, 1);
    asContext.setAMContainerResourceRequests(
        Collections.singletonList(resReg));
    asContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
    asContext.setQueue("default");
    return asContext;
  }
}