TestCapacitySchedulerMaxParallelApps.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestCapacitySchedulerMaxParallelApps {
  private CapacitySchedulerConfiguration conf;
  private MockRM rm;
  private MockNM nm1;

  private RMApp app1;
  private MockAM am1;
  private RMApp app2;
  private MockAM am2;
  private RMApp app3;
  private RMAppAttempt attempt3;
  private RMApp app4;
  private RMAppAttempt attempt4;

  private ParentQueue rootQueue;
  private LeafQueue defaultQueue;

  @BeforeEach
  public void setUp() {
    CapacitySchedulerConfiguration config =
        new CapacitySchedulerConfiguration();
    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class.getName());

    conf = new CapacitySchedulerConfiguration(config);
  }

  @AfterEach
  public void after() {
    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  @Timeout(value = 30)
  public void testMaxParallelAppsExceedsQueueSetting() throws Exception {
    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
    executeCommonStepsAndChecks();
    testWhenSettingsExceeded();
  }

  @Test
  @Timeout(value = 30)
  public void testMaxParallelAppsExceedsDefaultQueueSetting()
      throws Exception {
    conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2);
    executeCommonStepsAndChecks();
    testWhenSettingsExceeded();
  }

  @Test
  @Timeout(value = 30)
  public void testMaxParallelAppsExceedsUserSetting() throws Exception {
    conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2);
    executeCommonStepsAndChecks();
    testWhenSettingsExceeded();
  }

  @Test
  @Timeout(value = 30)
  public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception {
    conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2);
    executeCommonStepsAndChecks();
    testWhenSettingsExceeded();
  }

  @Test
  @Timeout(value = 30)
  public void testMaxParallelAppsWhenReloadingConfig() throws Exception {
    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);

    executeCommonStepsAndChecks();

    RMContext rmContext = rm.getRMContext();
    // Disable parallel apps setting + max out AM percent
    conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps");
    conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f);
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    cs.reinitialize(conf, rmContext);

    // Both app #3 and app #4 should transition to RUNNABLE
    launchAMandWaitForRunning(app3, attempt3, nm1);
    launchAMandWaitForRunning(app4, attempt4, nm1);
    verifyRunningAndAcceptedApps(4, 0);
  }

  @Test
  @Timeout(value = 30)
  public void testMaxAppsReachedWithNonRunnableApps() throws Exception {
    conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
    conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4);
    executeCommonStepsAndChecks();

    RMApp app5 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
        .withAppName("app5")
        .withUser("testuser")
        .withQueue("default")
        .withWaitForAppAcceptedState(false)
        .build());

    rm.waitForState(app5.getApplicationId(), RMAppState.FAILED);
  }

  private void executeCommonStepsAndChecks() throws Exception {
    rm = new MockRM(conf);
    rm.start();

    nm1 = rm.registerNode("h1:1234", 4096, 8);
    rm.registerNode("h2:1234", 4096, 8);
    rm.registerNode("h3:1234", 4096, 8);

    rm.drainEvents();

    app1 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
        .withAppName("app1")
        .withUser("testuser")
        .withQueue("default")
        .build());

    am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

    app2 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
        .withAppName("app2")
        .withUser("testuser")
        .withQueue("default")
        .build());
    am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);

    app3 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
        .withAppName("app3")
        .withUser("testuser")
        .withQueue("default")
        .build());
    attempt3 = MockRM.waitForAttemptScheduled(app3, rm);

    app4 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
        .withAppName("app4")
        .withUser("testuser")
        .withQueue("default")
        .build());
    attempt4 = MockRM.waitForAttemptScheduled(app4, rm);

    // Check that app attempt #3 and #4 are non-runnable
    rootQueue = getRootQueue();
    defaultQueue = getDefaultQueue();
    Set<ApplicationAttemptId> nonRunnables =
        Sets.newHashSet(
            attempt3.getAppAttemptId(),
            attempt4.getAppAttemptId());
    verifyRunnableAppsInParent(rootQueue, 2);
    verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables);
    verifyRunningAndAcceptedApps(2, 2);
  }

  private void testWhenSettingsExceeded() throws Exception {
    // Stop app #1
    unregisterAMandWaitForFinish(app1, am1, nm1);

    // Launch app #3
    launchAMandWaitForRunning(app3, attempt3, nm1);

    // Check that attempt #4 is still non-runnable
    verifyRunnableAppsInParent(rootQueue, 2);
    verifyRunnableAppsInLeaf(defaultQueue, 2,
        Collections.singleton(attempt4.getAppAttemptId()));
    verifyRunningAndAcceptedApps(2, 1);

    // Stop app #2
    unregisterAMandWaitForFinish(app2, am2, nm1);

    // Launch app #4
    launchAMandWaitForRunning(app4, attempt4, nm1);
    verifyRunnableAppsInParent(rootQueue, 2);
    verifyRunnableAppsInLeaf(defaultQueue, 2,
        Collections.emptySet());
    verifyRunningAndAcceptedApps(2, 0);
  }

  @SuppressWarnings("checkstyle:hiddenfield")
  private LeafQueue getDefaultQueue() {
    CSQueue defaultQueue =
        ((CapacityScheduler) rm.getResourceScheduler()).getQueue("default");

    return (LeafQueue) defaultQueue;
  }

  private ParentQueue getRootQueue() {
    CSQueue root =
        ((CapacityScheduler) rm.getResourceScheduler()).getQueue("root");

    return (ParentQueue) root;
  }

  private void verifyRunnableAppsInParent(ParentQueue queue,
      int expectedRunnable) {
    assertEquals(expectedRunnable, queue.getNumRunnableApps(),
        "Num of runnable apps");
  }

  private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable,
      Set<ApplicationAttemptId> nonRunnableIds) {
    assertEquals(expectedRunnable, queue.getNumRunnableApps(),
        "Num of runnable apps");

    queue.getCopyOfNonRunnableAppSchedulables()
        .stream()
        .map(fca -> fca.getApplicationAttemptId())
        .forEach(id -> assertTrue(nonRunnableIds.contains(id),
        id + " not found as non-runnable"));
  }

  private void verifyRunningAndAcceptedApps(int expectedRunning,
      int expectedAccepted) throws YarnException {
    GetApplicationsRequest request = GetApplicationsRequest.newInstance();

    GetApplicationsResponse resp =
        rm.getClientRMService().getApplications(request);

    List<ApplicationReport> apps = resp.getApplicationList();

    long runningCount = apps
        .stream()
        .filter(report ->
          report.getYarnApplicationState() == YarnApplicationState.RUNNING)
        .count();

    long acceptedCount = apps
        .stream()
        .filter(report ->
          report.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
        .count();

    assertEquals(expectedRunning, runningCount, "Running apps count");
    assertEquals(expectedAccepted, acceptedCount, "Accepted apps count");
  }

  private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm)
      throws Exception {
    am.unregisterAppAttempt();
    nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1,
        ContainerState.COMPLETE);
    rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
        RMAppAttemptState.FINISHED);
  }

  @SuppressWarnings("rawtypes")
  private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt,
      MockNM nm) throws Exception {
    nm.nodeHeartbeat(true);
    ((AbstractYarnScheduler)rm.getResourceScheduler()).update();
    rm.drainEvents();
    nm.nodeHeartbeat(true);
    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
    am.registerAppAttempt();
    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);

    return am;
  }
}