TestCSMaxRunningAppsEnforcer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestCSMaxRunningAppsEnforcer {
  private CapacitySchedulerQueueManager queueManager;
  private CSMaxRunningAppsEnforcer maxAppsEnforcer;
  private int appNum;
  private ControlledClock clock;
  private RMContext rmContext;
  private CapacityScheduler scheduler;
  private ActivitiesManager activitiesManager;
  private CapacitySchedulerConfiguration csConfig;

  @BeforeEach
  public void setup() throws IOException {
    csConfig = new CapacitySchedulerConfiguration();
    rmContext = mock(RMContext.class);
    when(rmContext.getYarnConfiguration()).thenReturn(csConfig);
    when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>());
    clock = new ControlledClock();
    scheduler = mock(CapacityScheduler.class);
    when(rmContext.getScheduler()).thenReturn(scheduler);
    when(scheduler.getConf()).thenReturn(csConfig);
    when(scheduler.getConfig()).thenReturn(csConfig);
    when(scheduler.getConfiguration()).thenReturn(csConfig);
    when(scheduler.getResourceCalculator()).thenReturn(
        new DefaultResourceCalculator());
    when(scheduler.getRMContext()).thenReturn(rmContext);
    Resource clusterResource = Resource.newInstance(16384, 8);
    when(scheduler.getClusterResource())
        .thenReturn(clusterResource);
    when(scheduler.getMinimumAllocation())
        .thenReturn(Resource.newInstance(1024, 1));
    when(scheduler.getMinimumResourceCapability())
        .thenReturn(Resource.newInstance(1024, 1));
    activitiesManager = mock(ActivitiesManager.class);
    maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler);
    appNum = 0;
    setupQueues(csConfig);
    RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class);
    AppPriorityACLsManager appPriorityACLManager =
        mock(AppPriorityACLsManager.class);
    when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
    when(labelManager.getResourceByLabel(any(), any(Resource.class)))
        .thenReturn(clusterResource);
    PreemptionManager preemptionManager = mock(PreemptionManager.class);
    when(preemptionManager.getKillableResource(any(), anyString()))
        .thenReturn(Resource.newInstance(0, 0));
    when(scheduler.getPreemptionManager()).thenReturn(preemptionManager);
    when(scheduler.getActivitiesManager()).thenReturn(activitiesManager);
    queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
        appPriorityACLManager);
    queueManager.setCapacitySchedulerContext(scheduler);
    when(scheduler.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
    CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(scheduler);
    when(scheduler.getQueueContext()).thenReturn(queueContext);
    queueManager.initializeQueues(csConfig);
  }

  private void setupQueues(CapacitySchedulerConfiguration config) {
    QueuePath root = new QueuePath(CapacitySchedulerConfiguration.ROOT);
    QueuePath queue1 = new QueuePath("root.queue1");
    QueuePath subqueue1 = new QueuePath("root.queue1.subqueue1");
    QueuePath subqueue2 = new QueuePath("root.queue1.subqueue2");

    config.setQueues(root, new String[] {"queue1", "queue2"});
    config.setQueues(queue1, new String[] {"subqueue1", "subqueue2"});
    config.setQueues(subqueue1, new String[] {"leaf1"});
    config.setQueues(subqueue2, new String[] {"leaf2"});
    config.setFloat(PREFIX + "root.capacity", 100.0f);
    config.setFloat(PREFIX + "root.queue1.capacity", 50.0f);
    config.setFloat(PREFIX + "root.queue2.capacity", 50.0f);
    config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f);
    config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f);
    config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f);
    config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f);
  }

  private FiCaSchedulerApp addApp(LeafQueue queue, String user) {
    ApplicationId appId = ApplicationId.newInstance(0, appNum++);
    ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);

    FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId,
        user, queue, queue.getAbstractUsersManager(),
        rmContext, Priority.newInstance(0), false,
        activitiesManager) {

      private final long startTime = clock.getTime();

      @Override
      public long getStartTime() {
        return startTime;
      }
    };

    maxAppsEnforcer.checkRunnabilityWithUpdate(attempt);
    maxAppsEnforcer.trackApp(attempt);

    queue.submitApplicationAttempt(attempt, attempt.getUser());

    return attempt;
  }

  private void removeApp(FiCaSchedulerApp attempt) {
    AbstractLeafQueue queue = attempt.getCSLeafQueue();
    queue.finishApplicationAttempt(attempt, queue.getQueuePath());
    maxAppsEnforcer.untrackApp(attempt);
    maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
  }

  @Test
  public void testRemoveDoesNotEnableAnyApp() {
    ParentQueue root =
        (ParentQueue) queueManager.getRootQueue();
    LeafQueue leaf1 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue1.leaf1");
    LeafQueue leaf2 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue2.leaf2");
    root.setMaxParallelApps(2);
    leaf1.setMaxParallelApps(1);
    leaf2.setMaxParallelApps(1);

    FiCaSchedulerApp app1 = addApp(leaf1, "user");
    addApp(leaf2, "user");
    addApp(leaf2, "user");
    assertEquals(1, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());

    removeApp(app1);
    assertEquals(0, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());
  }

  @Test
  public void testRemoveEnablesAppOnCousinQueue() {
    LeafQueue leaf1 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue1.leaf1");
    LeafQueue leaf2 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue2.leaf2");
    ParentQueue queue1 = (ParentQueue) queueManager
        .getQueueByFullName("root.queue1");
    queue1.setMaxParallelApps(2);

    FiCaSchedulerApp app1 = addApp(leaf1, "user");
    addApp(leaf2, "user");
    addApp(leaf2, "user");
    assertEquals(1, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());

    removeApp(app1);
    assertEquals(0, leaf1.getNumRunnableApps());
    assertEquals(2, leaf2.getNumRunnableApps());
    assertEquals(0, leaf2.getNumNonRunnableApps());
  }

  @Test
  public void testRemoveEnablesOneByQueueOneByUser() {
    LeafQueue leaf1 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue1.leaf1");
    LeafQueue leaf2 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue2.leaf2");
    leaf1.setMaxParallelApps(2);
    //userMaxApps.put("user1", 1);
    csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1);

    FiCaSchedulerApp app1 = addApp(leaf1, "user1");
    addApp(leaf1, "user2");
    addApp(leaf1, "user3");
    addApp(leaf2, "user1");
    assertEquals(2, leaf1.getNumRunnableApps());
    assertEquals(1, leaf1.getNumNonRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());

    removeApp(app1);
    assertEquals(2, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(0, leaf1.getNumNonRunnableApps());
    assertEquals(0, leaf2.getNumNonRunnableApps());
  }

  @Test
  public void testRemoveEnablingOrderedByStartTime() {
    LeafQueue leaf1 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue1.leaf1");
    LeafQueue leaf2 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue2.leaf2");
    ParentQueue queue1 = (ParentQueue) queueManager
        .getQueueByFullName("root.queue1");
    queue1.setMaxParallelApps(2);
    FiCaSchedulerApp app1 = addApp(leaf1, "user");
    addApp(leaf2, "user");
    addApp(leaf2, "user");
    clock.tickSec(20);
    addApp(leaf1, "user");
    assertEquals(1, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(1, leaf1.getNumNonRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());
    removeApp(app1);
    assertEquals(0, leaf1.getNumRunnableApps());
    assertEquals(2, leaf2.getNumRunnableApps());
    assertEquals(0, leaf2.getNumNonRunnableApps());
  }

  @Test
  public void testMultipleAppsWaitingOnCousinQueue() {
    LeafQueue leaf1 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue1.leaf1");
    LeafQueue leaf2 = (LeafQueue) queueManager
        .getQueueByFullName("root.queue1.subqueue2.leaf2");
    ParentQueue queue1 = (ParentQueue) queueManager
        .getQueueByFullName("root.queue1");
    queue1.setMaxParallelApps(2);
    FiCaSchedulerApp app1 = addApp(leaf1, "user");
    addApp(leaf2, "user");
    addApp(leaf2, "user");
    addApp(leaf2, "user");
    assertEquals(1, leaf1.getNumRunnableApps());
    assertEquals(1, leaf2.getNumRunnableApps());
    assertEquals(2, leaf2.getNumNonRunnableApps());
    removeApp(app1);
    assertEquals(0, leaf1.getNumRunnableApps());
    assertEquals(2, leaf2.getNumRunnableApps());
    assertEquals(1, leaf2.getNumNonRunnableApps());
  }

  @Test
  public void testMultiListStartTimeIteratorEmptyAppLists() {
    List<List<FiCaSchedulerApp>> lists =
        new ArrayList<List<FiCaSchedulerApp>>();
    lists.add(Arrays.asList(mockAppAttempt(1)));
    lists.add(Arrays.asList(mockAppAttempt(2)));
    Iterator<FiCaSchedulerApp> iter =
        new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
    assertEquals(1, iter.next().getStartTime());
    assertEquals(2, iter.next().getStartTime());
  }

  private FiCaSchedulerApp mockAppAttempt(long startTime) {
    FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class);
    when(schedApp.getStartTime()).thenReturn(startTime);
    return schedApp;
  }
}