TestContainerSchedulerOppContainersByResources.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;

import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * Tests the behavior of {@link ContainerScheduler} when its queueing policy
 * is set to {@link OpportunisticContainersQueuePolicy#BY_RESOURCES}
 * such that the NM only queues containers if there's enough resources
 * on the node to start all queued containers.
 */
public class TestContainerSchedulerOppContainersByResources
    extends BaseContainerSchedulerTest {
  public TestContainerSchedulerOppContainersByResources()
      throws UnsupportedFileSystemException {
  }

  @BeforeEach
  @Override
  public void setup() throws IOException {
    conf.set(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
        OpportunisticContainersQueuePolicy.BY_RESOURCES.name());
    super.setup();
    containerManager.start();
  }

  /**
   * Checks if a container is in a running or successfully run state.
   * @param containerStatus the container status
   * @return true if the container is running or completed
   * with a successful state, false if the container has not started or failed
   */
  private static boolean isContainerInSuccessfulState(
      final ContainerStatus containerStatus) {
    final org.apache.hadoop.yarn.api.records.ContainerState state =
        containerStatus.getState();
    final ContainerSubState subState = containerStatus.getContainerSubState();
    switch (subState) {
    case RUNNING:
    case COMPLETING:
      return true;
    case DONE:
      // If the state is not COMPLETE, then the
      // container is a failed container
      return state ==
          org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
    default:
      return false;
    }
  }

  private void verifyRunAndKilledContainers(
      final List<ContainerId> statList,
      final int numExpectedContainers, final Set<ContainerId> runContainers,
      final Set<ContainerId> killedContainers)
      throws TimeoutException, InterruptedException {
    GenericTestUtils.waitFor(
        () -> {
          GetContainerStatusesRequest statRequest =
              GetContainerStatusesRequest.newInstance(statList);
          final List<ContainerStatus> containerStatuses;
          try {
            containerStatuses = containerManager
                .getContainerStatuses(statRequest).getContainerStatuses();
          } catch (final Exception e) {
            return false;
          }

          if (numExpectedContainers != containerStatuses.size()) {
            return false;
          }

          for (final ContainerStatus status : containerStatuses) {
            if (runContainers.contains(status.getContainerId())) {
              if (!isContainerInSuccessfulState(status)) {
                return false;
              }
            } else if (killedContainers.contains(status.getContainerId())) {
              if (!status.getDiagnostics()
                  .contains("Opportunistic container queue is full")) {
                return false;
              }
            } else {
              return false;
            }
          }

          return true;
        }, 1000, 10000);
  }

  /**
   * Verifies that nothing is queued at the container scheduler.
   */
  private void verifyNothingQueued() {
    // Check that nothing is queued
    ContainerScheduler containerScheduler =
        containerManager.getContainerScheduler();
    assertEquals(0,
        containerScheduler.getNumQueuedContainers());
    assertEquals(0,
        containerScheduler.getNumQueuedGuaranteedContainers());
    assertEquals(0,
        containerScheduler.getNumQueuedOpportunisticContainers());
    assertEquals(0,
        metrics.getQueuedOpportunisticContainers());
    assertEquals(0, metrics.getQueuedGuaranteedContainers());
  }

  /**
   * Tests that newly arrived containers after the resources are filled up
   * get killed and never gets run.
   */
  @Test
  public void testKillOpportunisticWhenNoResourcesAvailable() throws Exception {
    List<StartContainerRequest> startContainerRequests = new ArrayList<>();

    // GContainer that takes up the whole node
    startContainerRequests.add(StartContainerRequest.newInstance(
        recordFactory.newRecordInstance(ContainerLaunchContext.class),
        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
            context.getNodeId(),
            user, Resources.createResource(2048),
            context.getContainerTokenSecretManager(), null,
            ExecutionType.GUARANTEED)));

    // OContainer that should be killed
    startContainerRequests.add(StartContainerRequest.newInstance(
        recordFactory.newRecordInstance(ContainerLaunchContext.class),
        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
            context.getNodeId(),
            user, Resources.createResource(2048),
            context.getContainerTokenSecretManager(), null,
            ExecutionType.OPPORTUNISTIC)));

    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(startContainerRequests);
    containerManager.startContainers(allRequests);

    BaseContainerManagerTest.waitForNMContainerState(containerManager,
        createContainerId(0), ContainerState.RUNNING, 40);

    // Wait for the OContainer to get killed
    BaseContainerManagerTest.waitForNMContainerState(containerManager,
        createContainerId(1), ContainerState.DONE, 40);

    // Get container statuses.
    // Container 0 should be running and container 1 should be killed
    List<ContainerId> statList = ImmutableList.of(createContainerId(0),
        createContainerId(1));

    verifyRunAndKilledContainers(
        statList, 2,
        Collections.singleton(createContainerId(0)),
        Collections.singleton(createContainerId(1))
    );

    verifyNothingQueued();
  }

  /**
   * Tests that newly arrived containers after the resources are filled up
   * get killed and never gets run.
   * This scenario is more granular and runs more small container compared to
   * {@link #testKillOpportunisticWhenNoResourcesAvailable()}.
   */
  @Test
  public void testOpportunisticRunsWhenResourcesAvailable() throws Exception {
    List<StartContainerRequest> startContainerRequests = new ArrayList<>();
    final int numContainers = 8;
    final int numContainersQueued = 4;
    final Set<ContainerId> runContainers = new HashSet<>();
    final Set<ContainerId> killedContainers = new HashSet<>();

    for (int i = 0; i < numContainers; i++) {
      // OContainers that should be run
      startContainerRequests.add(StartContainerRequest.newInstance(
          recordFactory.newRecordInstance(ContainerLaunchContext.class),
          createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
              context.getNodeId(),
              user, Resources.createResource(512),
              context.getContainerTokenSecretManager(), null,
              ExecutionType.OPPORTUNISTIC)));
    }

    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(startContainerRequests);
    containerManager.startContainers(allRequests);

    // Wait for containers to start
    for (int i = 0; i < numContainersQueued; i++) {
      final ContainerId containerId = createContainerId(i);
      BaseContainerManagerTest
          .waitForNMContainerState(containerManager, containerId,
              ContainerState.RUNNING, 40);
      runContainers.add(containerId);
    }

    // Wait for containers to be killed
    for (int i = numContainersQueued; i < numContainers; i++) {
      final ContainerId containerId = createContainerId(i);
      BaseContainerManagerTest
          .waitForNMContainerState(containerManager, createContainerId(i),
              ContainerState.DONE, 40);
      killedContainers.add(containerId);
    }

    Thread.sleep(5000);

    // Get container statuses.
    List<ContainerId> statList = new ArrayList<>();
    for (int i = 0; i < numContainers; i++) {
      statList.add(createContainerId(i));
    }


    verifyRunAndKilledContainers(
        statList, numContainers, runContainers, killedContainers);

    verifyNothingQueued();
  }
}