TestCapacitySchedulerApps.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMoveAmbiguousQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestCapacitySchedulerApps {

  public static final int MAX_PARALLEL_APPS = 5;
  public static final String USER_0 = "user_0";
  private ResourceManager resourceManager = null;
  private RMContext mockContext;

  @BeforeEach
  public void setUp() throws Exception {
    resourceManager = createResourceManager();
    mockContext = createMockRMContext();
  }

  @AfterEach
  public void tearDown() throws Exception {
    stopResourceManager(resourceManager);
  }

  @Test
  public void testGetAppsInQueue() throws Exception {
    Application application0 = new Application("user_0", "a1", resourceManager);
    application0.submit();

    Application application1 = new Application("user_0", "a2", resourceManager);
    application1.submit();

    Application application2 = new Application("user_0", "b2", resourceManager);
    application2.submit();

    ResourceScheduler scheduler = resourceManager.getResourceScheduler();

    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());

    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.contains(application0.getApplicationAttemptId()));
    assertTrue(appsInA.contains(application1.getApplicationAttemptId()));
    assertEquals(2, appsInA.size());

    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(application0.getApplicationAttemptId()));
    assertTrue(appsInRoot.contains(application1.getApplicationAttemptId()));
    assertTrue(appsInRoot.contains(application2.getApplicationAttemptId()));
    assertEquals(3, appsInRoot.size());

    assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
  }

  @Test
  public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    @SuppressWarnings("unchecked")
    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
            .getResourceScheduler();
    SchedulerApplication<SchedulerApplicationAttempt> app =
        TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
            cs.getSchedulerApplications(), cs, "a1");
    assertEquals("a1", app.getQueue().getQueueName());
  }

  @Test
  public void testKillAllAppsInQueue() throws Exception {
    MockRM rm = setUpMove();
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();

    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId appAttemptId =
        rm.getApplicationReport(app.getApplicationId())
            .getCurrentApplicationAttemptId();

    // check preconditions
    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());

    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.contains(appAttemptId));
    assertEquals(1, appsInA.size());
    String queue =
        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
            .getQueueName();
    assertEquals("a1", queue);

    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(appAttemptId));
    assertEquals(1, appsInRoot.size());

    // now kill the app
    scheduler.killAllAppsInQueue("a1");

    // check postconditions
    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
    rm.waitForAppRemovedFromScheduler(app.getApplicationId());
    appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.isEmpty());

    appsInA1 = scheduler.getAppsInQueue("a1");
    assertTrue(appsInA1.isEmpty());

    appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.isEmpty());

    rm.stop();
  }

  @Test
  public void testKillAllAppsInvalidSource() throws Exception {
    MockRM rm = setUpMove();
    YarnScheduler scheduler = rm.getResourceScheduler();

    // submit an app
    ApplicationAttemptId appAttemptId = submitApp(rm);

    // check preconditions
    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());

    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.contains(appAttemptId));
    assertEquals(1, appsInA.size());

    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(appAttemptId));
    assertEquals(1, appsInRoot.size());

    // now kill the app
    try {
      scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
      fail();
    } catch (YarnException e) {
      // expected
    }

    // check postconditions, app should still be in a1
    appsInA1 = scheduler.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());

    appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.contains(appAttemptId));
    assertEquals(1, appsInA.size());

    appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(appAttemptId));
    assertEquals(1, appsInRoot.size());

    rm.stop();
  }

  // Test to ensure that we don't carry out reservation on nodes
  // that have no CPU available when using the DominantResourceCalculator
  @Test
  @Timeout(value = 30)
  public void testAppReservationWithDominantResourceCalculator() throws Exception {
    CapacitySchedulerConfiguration csconf =
        new CapacitySchedulerConfiguration();
    csconf.setResourceComparator(DominantResourceCalculator.class);

    YarnConfiguration conf = new YarnConfiguration(csconf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    MockRM rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);

    // register extra nodes to bump up cluster resource
    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
    rm.registerNode("127.0.0.1:1236", 10 * GB, 4);

    RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
    // kick the scheduling
    nm1.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
    am1.registerAppAttempt();
    SchedulerNodeReport reportNm1 =
        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());

    // check node report
    assertEquals(1 * GB, reportNm1.getUsedResource().getMemorySize());
    assertEquals(9 * GB, reportNm1.getAvailableResource().getMemorySize());

    // add request for containers
    am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 1);
    am1.schedule(); // send the request

    // kick the scheduler, container reservation should not happen
    nm1.nodeHeartbeat(true);
    Thread.sleep(1000);
    AllocateResponse allocResponse = am1.schedule();
    ApplicationResourceUsageReport report =
        rm.getResourceScheduler().getAppResourceUsageReport(
            attempt1.getAppAttemptId());
    assertEquals(0, allocResponse.getAllocatedContainers().size());
    assertEquals(0, report.getNumReservedContainers());

    // container should get allocated on this node
    nm2.nodeHeartbeat(true);

    while (allocResponse.getAllocatedContainers().size() == 0) {
      Thread.sleep(100);
      allocResponse = am1.schedule();
    }
    report =
        rm.getResourceScheduler().getAppResourceUsageReport(
            attempt1.getAppAttemptId());
    assertEquals(1, allocResponse.getAllocatedContainers().size());
    assertEquals(0, report.getNumReservedContainers());
    rm.stop();
  }

  @Test
  public void testMoveAppAmbiguousQueue() throws Exception {
    MockRM rm = setUpMoveAmbiguousQueue();
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();
    QueueMetrics metrics = scheduler.getRootQueueMetrics();
    assertEquals(0, metrics.getAppsPending());
    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("root.a.a")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    // check preconditions
    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("root.a.a");
    assertEquals(1, appsInA.size());
    String queue =
        scheduler.getApplicationAttempt(appsInA.get(0)).getQueue()
            .getQueueName();
    assertEquals("a", queue);

    // now move the app
    scheduler.moveApplication(app.getApplicationId(), "a1");
    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("root.a.a1");
    assertEquals(1, appsInA1.size());
    queue =
        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
            .getQueueName();
    assertEquals("a1", queue);

    appsInA = scheduler.getAppsInQueue("root.a.a");
    assertTrue(appsInA.isEmpty());

    rm.stop();
  }

  @Test
  public void testMoveAppBasic() throws Exception {
    MockRM rm = setUpMove();
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();
    QueueMetrics metrics = scheduler.getRootQueueMetrics();
    assertEquals(0, metrics.getAppsPending());
    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId appAttemptId =
        rm.getApplicationReport(app.getApplicationId())
            .getCurrentApplicationAttemptId();
    // check preconditions
    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());
    String queue =
        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
            .getQueueName();
    assertEquals("a1", queue);

    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.contains(appAttemptId));
    assertEquals(1, appsInA.size());

    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(appAttemptId));
    assertEquals(1, appsInRoot.size());

    assertEquals(1, metrics.getAppsPending());

    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
    assertTrue(appsInB1.isEmpty());

    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
    assertTrue(appsInB.isEmpty());

    // now move the app
    scheduler.moveApplication(app.getApplicationId(), "b1");

    // check postconditions
    appsInB1 = scheduler.getAppsInQueue("b1");
    assertEquals(1, appsInB1.size());
    queue =
        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
            .getQueueName();
    assertEquals("b1", queue);

    appsInB = scheduler.getAppsInQueue("b");
    assertTrue(appsInB.contains(appAttemptId));
    assertEquals(1, appsInB.size());

    appsInRoot = scheduler.getAppsInQueue("root");
    assertTrue(appsInRoot.contains(appAttemptId));
    assertEquals(1, appsInRoot.size());

    assertEquals(1, metrics.getAppsPending());

    appsInA1 = scheduler.getAppsInQueue("a1");
    assertTrue(appsInA1.isEmpty());

    appsInA = scheduler.getAppsInQueue("a");
    assertTrue(appsInA.isEmpty());

    rm.stop();
  }

  @Test
  public void testMoveAppPendingMetrics() throws Exception {
    MockRM rm = setUpMove();
    ResourceScheduler scheduler = rm.getResourceScheduler();
    assertApps(scheduler, 0, 0, 0);

    // submit two apps in a1
    RMApp app1 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .build());
    RMApp app2 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-2")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .build());
    assertApps(scheduler, 2, 0, 2);

    // submit one app in b1
    RMApp app3 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-2")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("b1")
            .build());
    assertApps(scheduler, 2, 1, 3);

    // now move the app1 from a1 to b1
    scheduler.moveApplication(app1.getApplicationId(), "b1");
    assertApps(scheduler, 1, 2, 3);

    // now move the app2 from a1 to b1
    scheduler.moveApplication(app2.getApplicationId(), "b1");
    assertApps(scheduler, 0, 3, 3);

    // now move the app3 from b1 to a1
    scheduler.moveApplication(app3.getApplicationId(), "a1");
    assertApps(scheduler, 1, 2, 3);
    rm.stop();
  }

  private void assertApps(ResourceScheduler scheduler,
                          int a1Size,
                          int b1Size,
                          int appsPending) {
    assertAppsSize(scheduler, "a1", a1Size);
    assertAppsSize(scheduler, "b1", b1Size);
    assertEquals(appsPending, scheduler.getRootQueueMetrics().getAppsPending());
  }

  private void assertAppsSize(ResourceScheduler scheduler, String queueName, int size) {
    assertEquals(size, scheduler.getAppsInQueue(queueName).size());
  }

  @Test
  public void testMoveAppSameParent() throws Exception {
    MockRM rm = setUpMove();
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();

    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId appAttemptId =
        rm.getApplicationReport(app.getApplicationId())
            .getCurrentApplicationAttemptId();

    // check preconditions
    assertOneAppInQueue(scheduler, "a1");
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a2");

    // now move the app
    scheduler.moveApplication(app.getApplicationId(), "a2");

    // check postconditions
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1");
    assertOneAppInQueue(scheduler, "a2");

    rm.stop();
  }

  private void assertApps(ResourceScheduler scheduler,
                          String queueName,
                          ApplicationAttemptId... apps) {
    assertEquals(Lists.newArrayList(apps), scheduler.getAppsInQueue(queueName));
  }

  private void assertOneAppInQueue(AbstractYarnScheduler scheduler, String queueName) {
    List<ApplicationAttemptId> apps = scheduler.getAppsInQueue(queueName);
    assertEquals(1, apps.size());
    assertEquals(queueName,
        scheduler.getApplicationAttempt(apps.get(0)).getQueue().getQueueName());
  }

  @Test
  public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {

    ResourceScheduler scheduler = resourceManager.getResourceScheduler();

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host0 = "host_0";
    NodeManager nm0 =
        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(4 * GB, 1), mockNodeStatus);

    // Register node2
    String host1 = "host_1";
    NodeManager nm1 =
        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(2 * GB, 1), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority0 = Priority.newInstance(0);
    Priority priority1 = Priority.newInstance(1);

    // Submit application_0
    Application application0 =
        new Application("user_0", "a1", resourceManager);
    application0.submit(); // app + app attempt event sent to scheduler

    application0.addNodeManager(host0, 1234, nm0);
    application0.addNodeManager(host1, 1234, nm1);

    Resource capability00 = Resources.createResource(1 * GB, 1);
    application0.addResourceRequestSpec(priority1, capability00);

    Resource capability01 = Resources.createResource(2 * GB, 1);
    application0.addResourceRequestSpec(priority0, capability01);

    Task task00 =
        new Task(application0, priority1, new String[]{host0, host1});
    application0.addTask(task00);

    // Submit application_1
    Application application1 =
        new Application("user_1", "b2", resourceManager);
    application1.submit(); // app + app attempt event sent to scheduler

    application1.addNodeManager(host0, 1234, nm0);
    application1.addNodeManager(host1, 1234, nm1);

    Resource capability10 = Resources.createResource(1 * GB, 1);
    application1.addResourceRequestSpec(priority1, capability10);

    Resource capability11 = Resources.createResource(2 * GB, 1);
    application1.addResourceRequestSpec(priority0, capability11);

    Task task10 =
        new Task(application1, priority1, new String[]{host0, host1});
    application1.addTask(task10);

    // Send resource requests to the scheduler
    application0.schedule(); // allocate
    application1.schedule(); // allocate

    // task_0_0 task_1_0 allocated, used=2G
    nodeUpdate(resourceManager, nm0);

    // nothing allocated
    nodeUpdate(resourceManager, nm1);

    // Get allocations from the scheduler
    application0.schedule(); // task_0_0
    checkApplicationResourceUsage(1 * GB, application0);

    application1.schedule(); // task_1_0
    checkApplicationResourceUsage(1 * GB, application1);

    checkNodeResourceUsage(2 * GB, nm0); // task_0_0 (1G) and task_1_0 (1G) 2G
    // available
    checkNodeResourceUsage(0 * GB, nm1); // no tasks, 2G available

    // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
    // total cap)
    scheduler.moveApplication(application0.getApplicationId(), "b1");

    // 2GB 1C
    Task task11 =
        new Task(application1, priority0,
            new String[]{ResourceRequest.ANY});
    application1.addTask(task11);

    application1.schedule();

    // 2GB 1C
    Task task01 =
        new Task(application0, priority0, new String[]{host0, host1});
    application0.addTask(task01);

    application0.schedule();

    // prev 2G used free 2G
    nodeUpdate(resourceManager, nm0);

    // prev 0G used free 2G
    nodeUpdate(resourceManager, nm1);

    // Get allocations from the scheduler
    application1.schedule();
    checkApplicationResourceUsage(3 * GB, application1);

    // Get allocations from the scheduler
    application0.schedule();
    checkApplicationResourceUsage(3 * GB, application0);

    checkNodeResourceUsage(4 * GB, nm0);
    checkNodeResourceUsage(2 * GB, nm1);
  }

  @Test
  public void testMoveAppSuccess() throws Exception {

    ResourceScheduler scheduler = resourceManager.getResourceScheduler();

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host0 = "host_0";
    NodeManager nm0 =
        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(5 * GB, 1), mockNodeStatus);

    // Register node2
    String host1 = "host_1";
    NodeManager nm1 =
        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(5 * GB, 1), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority0 = Priority.newInstance(0);
    Priority priority1 = Priority.newInstance(1);

    // Submit application_0
    Application application0 =
        new Application("user_0", "a1", resourceManager);
    application0.submit(); // app + app attempt event sent to scheduler

    application0.addNodeManager(host0, 1234, nm0);
    application0.addNodeManager(host1, 1234, nm1);

    Resource capability00 = Resources.createResource(3 * GB, 1);
    application0.addResourceRequestSpec(priority1, capability00);

    Resource capability01 = Resources.createResource(2 * GB, 1);
    application0.addResourceRequestSpec(priority0, capability01);

    Task task00 =
        new Task(application0, priority1, new String[]{host0, host1});
    application0.addTask(task00);

    // Submit application_1
    Application application1 =
        new Application("user_1", "b2", resourceManager);
    application1.submit(); // app + app attempt event sent to scheduler

    application1.addNodeManager(host0, 1234, nm0);
    application1.addNodeManager(host1, 1234, nm1);

    Resource capability10 = Resources.createResource(1 * GB, 1);
    application1.addResourceRequestSpec(priority1, capability10);

    Resource capability11 = Resources.createResource(2 * GB, 1);
    application1.addResourceRequestSpec(priority0, capability11);

    Task task10 =
        new Task(application1, priority1, new String[]{host0, host1});
    application1.addTask(task10);

    // Send resource requests to the scheduler
    application0.schedule(); // allocate
    application1.schedule(); // allocate

    // b2 can only run 1 app at a time
    scheduler.moveApplication(application0.getApplicationId(), "b2");

    nodeUpdate(resourceManager, nm0);

    nodeUpdate(resourceManager, nm1);

    // Get allocations from the scheduler
    application0.schedule(); // task_0_0
    checkApplicationResourceUsage(0 * GB, application0);

    application1.schedule(); // task_1_0
    checkApplicationResourceUsage(1 * GB, application1);

    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
    // not scheduled
    checkNodeResourceUsage(1 * GB, nm0);
    checkNodeResourceUsage(0 * GB, nm1);

    // lets move application_0 to a queue where it can run
    scheduler.moveApplication(application0.getApplicationId(), "a2");
    application0.schedule();

    nodeUpdate(resourceManager, nm1);

    // Get allocations from the scheduler
    application0.schedule(); // task_0_0
    checkApplicationResourceUsage(3 * GB, application0);

    checkNodeResourceUsage(1 * GB, nm0);
    checkNodeResourceUsage(3 * GB, nm1);

  }

  @Test
  public void testMoveAppViolateQueueState() throws Exception {
    assertThrows(YarnException.class, () -> {
      resourceManager = new ResourceManager() {
        @Override
        protected RMNodeLabelsManager createNodeLabelManager() {
          RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
          mgr.init(getConfig());
          return mgr;
        }
      };
      CapacitySchedulerConfiguration csConf =
          new CapacitySchedulerConfiguration();
      setupQueueConfiguration(csConf);
      StringBuilder qState = new StringBuilder();
      qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
          .append(CapacitySchedulerConfiguration.DOT)
          .append(CapacitySchedulerConfiguration.STATE);
      csConf.set(qState.toString(), QueueState.STOPPED.name());
      YarnConfiguration conf = new YarnConfiguration(csConf);
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
          ResourceScheduler.class);
      resourceManager.init(conf);
      resourceManager.getRMContext().getContainerTokenSecretManager()
          .rollMasterKey();
      resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
      ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
      mockContext = mock(RMContext.class);
      when(mockContext.getConfigurationProvider()).thenReturn(
          new LocalConfigurationProvider());

      ResourceScheduler scheduler = resourceManager.getResourceScheduler();

      NodeStatus mockNodeStatus = createMockNodeStatus();

      // Register node1
      String host0 = "host_0";
      NodeManager nm0 =
          registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
          Resources.createResource(6 * GB, 1), mockNodeStatus);

      // ResourceRequest priorities
      Priority priority0 = Priority.newInstance(0);
      Priority priority1 = Priority.newInstance(1);

      // Submit application_0
      Application application0 =
          new Application("user_0", "a1", resourceManager);
      application0.submit(); // app + app attempt event sent to scheduler

      application0.addNodeManager(host0, 1234, nm0);

      Resource capability00 = Resources.createResource(3 * GB, 1);
      application0.addResourceRequestSpec(priority1, capability00);

      Resource capability01 = Resources.createResource(2 * GB, 1);
      application0.addResourceRequestSpec(priority0, capability01);

      Task task00 =
           new Task(application0, priority1, new String[]{host0});
      application0.addTask(task00);

      // Send resource requests to the scheduler
      application0.schedule(); // allocate

      // task_0_0 allocated
      nodeUpdate(resourceManager, nm0);

      // Get allocations from the scheduler
      application0.schedule(); // task_0_0
      checkApplicationResourceUsage(3 * GB, application0);

      checkNodeResourceUsage(3 * GB, nm0);
      // b2 queue contains 3GB consumption app,
      // add another 3GB will hit max capacity limit on queue b
      scheduler.moveApplication(application0.getApplicationId(), "b1");
    });
  }

  @Test
  public void testMoveAppQueueMetricsCheck() throws Exception {
    ResourceScheduler scheduler = resourceManager.getResourceScheduler();

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host0 = "host_0";
    NodeManager nm0 =
        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(5 * GB, 1), mockNodeStatus);

    // Register node2
    String host1 = "host_1";
    NodeManager nm1 =
        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(5 * GB, 1), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority0 = Priority.newInstance(0);
    Priority priority1 = Priority.newInstance(1);

    // Submit application_0
    Application application0 =
        new Application("user_0", "a1", resourceManager);
    application0.submit(); // app + app attempt event sent to scheduler

    application0.addNodeManager(host0, 1234, nm0);
    application0.addNodeManager(host1, 1234, nm1);

    Resource capability00 = Resources.createResource(3 * GB, 1);
    application0.addResourceRequestSpec(priority1, capability00);

    Resource capability01 = Resources.createResource(2 * GB, 1);
    application0.addResourceRequestSpec(priority0, capability01);

    Task task00 =
        new Task(application0, priority1, new String[]{host0, host1});
    application0.addTask(task00);

    // Submit application_1
    Application application1 =
        new Application("user_1", "b2", resourceManager);
    application1.submit(); // app + app attempt event sent to scheduler

    application1.addNodeManager(host0, 1234, nm0);
    application1.addNodeManager(host1, 1234, nm1);

    Resource capability10 = Resources.createResource(1 * GB, 1);
    application1.addResourceRequestSpec(priority1, capability10);

    Resource capability11 = Resources.createResource(2 * GB, 1);
    application1.addResourceRequestSpec(priority0, capability11);

    Task task10 =
        new Task(application1, priority1, new String[]{host0, host1});
    application1.addTask(task10);

    // Send resource requests to the scheduler
    application0.schedule(); // allocate
    application1.schedule(); // allocate

    nodeUpdate(resourceManager, nm0);

    nodeUpdate(resourceManager, nm1);

    CapacityScheduler cs =
        (CapacityScheduler) resourceManager.getResourceScheduler();
    CSQueue origRootQ = cs.getRootQueue();
    CapacitySchedulerInfo oldInfo =
        new CapacitySchedulerInfo(origRootQ, cs);
    int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
    int origNumAppsRoot = origRootQ.getNumApplications();

    scheduler.moveApplication(application0.getApplicationId(), "a2");

    CSQueue newRootQ = cs.getRootQueue();
    int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
    int newNumAppsRoot = newRootQ.getNumApplications();
    CapacitySchedulerInfo newInfo =
        new CapacitySchedulerInfo(newRootQ, cs);
    CapacitySchedulerLeafQueueInfo origOldA1 =
        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
    CapacitySchedulerLeafQueueInfo origNewA1 =
        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
    CapacitySchedulerLeafQueueInfo targetOldA2 =
        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
    CapacitySchedulerLeafQueueInfo targetNewA2 =
        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
    // originally submitted here
    assertEquals(1, origOldA1.getNumApplications());
    assertEquals(1, origNumAppsA);
    assertEquals(2, origNumAppsRoot);
    // after the move
    assertEquals(0, origNewA1.getNumApplications());
    assertEquals(1, newNumAppsA);
    assertEquals(2, newNumAppsRoot);
    // original consumption on a1
    assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemorySize());
    assertEquals(1, origOldA1.getResourcesUsed().getvCores());
    assertEquals(0, origNewA1.getResourcesUsed().getMemorySize()); // after the move
    assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
    // app moved here with live containers
    assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemorySize());
    assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
    // it was empty before the move
    assertEquals(0, targetOldA2.getNumApplications());
    assertEquals(0, targetOldA2.getResourcesUsed().getMemorySize());
    assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
    // after the app moved here
    assertEquals(1, targetNewA2.getNumApplications());
    // 1 container on original queue before move
    assertEquals(1, origOldA1.getNumContainers());
    // after the move the resource released
    assertEquals(0, origNewA1.getNumContainers());
    // and moved to the new queue
    assertEquals(1, targetNewA2.getNumContainers());
    // which originally didn't have any
    assertEquals(0, targetOldA2.getNumContainers());
    // 1 user with 3GB
    assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
        .getResourcesUsed().getMemorySize());
    // 1 user with 1 core
    assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
        .getResourcesUsed().getvCores());
    // user ha no more running app in the orig queue
    assertEquals(0, origNewA1.getUsers().getUsersList().size());
    // 1 user with 3GB
    assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
        .getResourcesUsed().getMemorySize());
    // 1 user with 1 core
    assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
        .getResourcesUsed().getvCores());

    // Get allocations from the scheduler
    application0.schedule(); // task_0_0
    checkApplicationResourceUsage(3 * GB, application0);

    application1.schedule(); // task_1_0
    checkApplicationResourceUsage(1 * GB, application1);

    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
    // not scheduled
    checkNodeResourceUsage(4 * GB, nm0);
    checkNodeResourceUsage(0 * GB, nm1);

  }

  @Test
  public void testMoveAllApps() throws Exception {
    MockRM rm = setUpMove();
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();

    // submit an app
    ApplicationAttemptId appAttemptId = submitApp(rm);

    // check preconditions
    assertOneAppInQueue(scheduler, "a1");
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1", appAttemptId);
    assertApps(scheduler, "b1");
    assertApps(scheduler, "b");

    // now move the app
    scheduler.moveAllApps("a1", "b1");

    // check post conditions
    Thread.sleep(1000);
    assertOneAppInQueue(scheduler, "b1");
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "b", appAttemptId);
    assertApps(scheduler, "b1", appAttemptId);
    assertApps(scheduler, "a1");
    assertApps(scheduler, "a");

    rm.stop();
  }

  @Test
  public void testMaxParallelAppsPendingQueueMetrics() throws Exception {
    MockRM rm = setUpMove();
    ResourceScheduler scheduler = rm.getResourceScheduler();
    CapacityScheduler cs = (CapacityScheduler) scheduler;
    cs.getQueueContext().getConfiguration().setInt(QueuePrefixes.getQueuePrefix(A1)
        + CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS, MAX_PARALLEL_APPS);
    cs.reinitialize(cs.getQueueContext().getConfiguration(), mockContext);
    List<ApplicationAttemptId> attemptIds = new ArrayList<>();

    for (int i = 0; i < 2 * MAX_PARALLEL_APPS; i++) {
      attemptIds.add(submitApp(rm));
    }

    // Finish first batch to allow the other batch to run
    for (int i = 0; i < MAX_PARALLEL_APPS; i++) {
      cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
          RMAppAttemptState.FINISHED, true));
    }

    // Finish the remaining apps
    for (int i = MAX_PARALLEL_APPS; i < 2 * MAX_PARALLEL_APPS; i++) {
      cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
          RMAppAttemptState.FINISHED, true));
    }

    assertEquals(0, cs.getRootQueueMetrics().getAppsPending(),
        "No pending app should remain for root queue");
    assertEquals(0, cs.getRootQueueMetrics().getAppsRunning(),
        "No running application should remain for root queue");

    rm.stop();
  }

  private ApplicationAttemptId submitApp(MockRM rm) throws Exception {
    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser(USER_0)
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    return rm.getApplicationReport(app.getApplicationId())
        .getCurrentApplicationAttemptId();
  }

  @Test
  public void testMoveAllAppsInvalidDestination() throws Exception {
    MockRM rm = setUpMove();
    ResourceScheduler scheduler = rm.getResourceScheduler();

    // submit an app
    ApplicationAttemptId appAttemptId = submitApp(rm);

    // check preconditions
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1", appAttemptId);
    assertApps(scheduler, "b");
    assertApps(scheduler, "b1");

    // now move the app
    try {
      scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
      fail();
    } catch (YarnException e) {
      // expected
    }

    // check post conditions, app should still be in a1
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1", appAttemptId);
    assertApps(scheduler, "b");
    assertApps(scheduler, "b1");

    rm.stop();
  }

  @Test
  public void testMoveAllAppsInvalidSource() throws Exception {
    MockRM rm = setUpMove();
    ResourceScheduler scheduler = rm.getResourceScheduler();

    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId appAttemptId =
        rm.getApplicationReport(app.getApplicationId())
            .getCurrentApplicationAttemptId();

    // check preconditions
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1", appAttemptId);
    assertApps(scheduler, "b");
    assertApps(scheduler, "b1");

    // now move the app
    try {
      scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
      fail();
    } catch (YarnException e) {
      // expected
    }

    // check post conditions, app should still be in a1
    assertApps(scheduler, "root", appAttemptId);
    assertApps(scheduler, "a", appAttemptId);
    assertApps(scheduler, "a1", appAttemptId);
    assertApps(scheduler, "b");
    assertApps(scheduler, "b1");

    rm.stop();
  }

  @Test
  public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    CapacitySchedulerConfiguration newConf =
        new CapacitySchedulerConfiguration(conf);

    // Define top-level queues
    newConf.setQueues(ROOT,
        new String[]{"a", "b"});

    newConf.setCapacity(A, 50);
    newConf.setCapacity(B, 50);

    // Define 2nd-level queues
    newConf.setQueues(A, new String[]{"a1"});
    newConf.setCapacity(A1, 100);
    newConf.setUserLimitFactor(A1, 2.0f);
    newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);

    newConf.setQueues(B, new String[]{"b1"});
    newConf.setCapacity(B1, 100);
    newConf.setUserLimitFactor(B1, 2.0f);

    MockRM rm = new MockRM(newConf);
    rm.start();

    CapacityScheduler scheduler =
        (CapacityScheduler) rm.getResourceScheduler();

    MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);

    // submit an app
    MockRMAppSubmissionData data3 =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-move-1")
            .withUser("u1")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data3);
    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);

    ApplicationAttemptId appAttemptId =
        rm.getApplicationReport(app.getApplicationId())
            .getCurrentApplicationAttemptId();

    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("u2")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);

    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("u3")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm, data1);

    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("u4")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app4 = MockRMAppSubmitter.submit(rm, data);

    // Each application asks 50 * 1GB containers
    am1.allocate("*", 1 * GB, 50, null);
    am2.allocate("*", 1 * GB, 50, null);

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());

    // check preconditions
    assertApps(scheduler, "root",
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId(),
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId());
    assertApps(scheduler, "a",
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId(),
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId());
    assertApps(scheduler, "a1",
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId(),
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId());
    assertApps(scheduler, "b");
    assertApps(scheduler, "b1");

    UsersManager um =
        (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();

    assertEquals(4, um.getNumActiveUsers());
    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());

    // now move the app
    scheduler.moveAllApps("a1", "b1");

    //Triggering this event so that user limit computation can
    //happen again
    for (int i = 0; i < 10; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
      Thread.sleep(500);
    }

    // check post conditions
    assertApps(scheduler, "root",
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId(),
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId());
    assertApps(scheduler, "a");
    assertApps(scheduler, "a1");
    assertApps(scheduler, "b",
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId(),
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId());
    assertApps(scheduler, "b1",
        appAttemptId,
        app2.getCurrentAppAttempt().getAppAttemptId(),
        app3.getCurrentAppAttempt().getAppAttemptId(),
        app4.getCurrentAppAttempt().getAppAttemptId());

    UsersManager umB1 =
        (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();

    assertEquals(2, umB1.getNumActiveUsers());
    assertEquals(2, umB1.getNumActiveUsersWithOnlyPendingApps());

    rm.close();
  }

  @Test
  @Timeout(value = 60)
  public void testMoveAttemptNotAdded() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(getCapacityConfiguration(conf));
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
    ApplicationAttemptId appAttemptId =
        BuilderUtils.newApplicationAttemptId(appId, 1);

    RMAppAttemptMetrics attemptMetric =
        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
    RMAppImpl app = mock(RMAppImpl.class);
    when(app.getApplicationId()).thenReturn(appId);
    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
    Container container = mock(Container.class);
    when(attempt.getMasterContainer()).thenReturn(container);
    ApplicationSubmissionContext submissionContext =
        mock(ApplicationSubmissionContext.class);
    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
    when(app.getCurrentAppAttempt()).thenReturn(attempt);

    rm.getRMContext().getRMApps().put(appId, app);

    SchedulerEvent addAppEvent =
        new AppAddedSchedulerEvent(appId, "a1", "user");
    try {
      cs.moveApplication(appId, "b1");
      fail("Move should throw exception app not available");
    } catch (YarnException e) {
      assertEquals("App to be moved application_100_0001 not found.",
          e.getMessage());
    }
    cs.handle(addAppEvent);
    cs.moveApplication(appId, "b1");
    SchedulerEvent addAttemptEvent =
        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
    cs.handle(addAttemptEvent);
    CSQueue rootQ = cs.getRootQueue();
    CSQueue queueB = cs.getQueue("b");
    CSQueue queueA = cs.getQueue("a");
    CSQueue queueA1 = cs.getQueue("a1");
    CSQueue queueB1 = cs.getQueue("b1");
    assertEquals(1, rootQ.getNumApplications());
    assertEquals(0, queueA.getNumApplications());
    assertEquals(1, queueB.getNumApplications());
    assertEquals(0, queueA1.getNumApplications());
    assertEquals(1, queueB1.getNumApplications());

    rm.close();
  }

  @Test
  public void testRemoveAttemptMoveAdded() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        CapacityScheduler.class);
    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
    // Create Mock RM
    MockRM rm = new MockRM(getCapacityConfiguration(conf));
    CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
    // add node
    Resource newResource = Resource.newInstance(4 * GB, 1);
    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
    SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
    sch.handle(addNode);

    ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");

    // get Queues
    CSQueue queueA1 = sch.getQueue("a1");
    CSQueue queueB = sch.getQueue("b");
    CSQueue queueB1 = sch.getQueue("b1");

    // add Running rm container and simulate live containers to a1
    ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
    RMContainerImpl rmContainer = mock(RMContainerImpl.class);
    when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
    Container container2 = mock(Container.class);
    when(rmContainer.getContainer()).thenReturn(container2);
    Resource resource = Resource.newInstance(1024, 1);
    when(container2.getResource()).thenReturn(resource);
    when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
    when(container2.getNodeId()).thenReturn(node.getNodeID());
    when(container2.getId()).thenReturn(newContainerId);
    when(rmContainer.getNodeLabelExpression())
        .thenReturn(RMNodeLabelsManager.NO_LABEL);
    when(rmContainer.getContainerId()).thenReturn(newContainerId);
    sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
        .put(newContainerId, rmContainer);
    QueueMetrics queueA1M = queueA1.getMetrics();
    queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
        "user1", 1, resource);
    queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
        "user1", resource);
    // remove attempt
    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
        RMAppAttemptState.KILLED, true));
    // Move application to queue b1
    sch.moveApplication(appAttemptId.getApplicationId(), "b1");
    // Check queue metrics after move
    assertEquals(0, queueA1.getNumApplications());
    assertEquals(1, queueB.getNumApplications());
    assertEquals(0, queueB1.getNumApplications());

    // Release attempt add event
    ApplicationAttemptId appAttemptId2 =
        BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
    SchedulerEvent addAttemptEvent2 =
        new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
    sch.handle(addAttemptEvent2);

    // Check metrics after attempt added
    assertEquals(0, queueA1.getNumApplications());
    assertEquals(1, queueB.getNumApplications());
    assertEquals(1, queueB1.getNumApplications());


    QueueMetrics queueB1M = queueB1.getMetrics();
    QueueMetrics queueBM = queueB.getMetrics();
    // Verify allocation MB of current state
    assertEquals(0, queueA1M.getAllocatedMB());
    assertEquals(0, queueA1M.getAllocatedVirtualCores());
    assertEquals(1024, queueB1M.getAllocatedMB());
    assertEquals(1, queueB1M.getAllocatedVirtualCores());

    // remove attempt
    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
        RMAppAttemptState.FINISHED, false));

    assertEquals(0, queueA1M.getAllocatedMB());
    assertEquals(0, queueA1M.getAllocatedVirtualCores());
    assertEquals(0, queueB1M.getAllocatedMB());
    assertEquals(0, queueB1M.getAllocatedVirtualCores());

    verifyQueueMetrics(queueB1M);
    verifyQueueMetrics(queueBM);
    // Verify queue A1 metrics
    verifyQueueMetrics(queueA1M);
    rm.close();
  }

  @Test
  public void testAppSubmission() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setQueues(A, new String[]{"a1", "a2", "b"});
    conf.setCapacity(A1, 20);
    conf.setCapacity(new QueuePath("root.a.b"), 10);
    MockRM rm = new MockRM(conf);
    rm.start();

    RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, noParentQueueApp.getState());

    RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());

    RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());

    RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
  }

  private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
    MockRMAppSubmissionData ambiguousQueueAppData =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withWaitForAppAcceptedState(false)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue(b)
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
    rm.waitForState(app1.getApplicationId(), state);
    return app1;
  }

  private int getNumAppsInQueue(String name, List<CSQueue> queues) {
    for (CSQueue queue : queues) {
      if (queue.getQueueShortName().equals(name)) {
        return queue.getNumApplications();
      }
    }
    return -1;
  }

  private CapacitySchedulerQueueInfo getQueueInfo(String name,
                                                  CapacitySchedulerQueueInfoList info) {
    if (info != null) {
      for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
        if (queueInfo.getQueueName().equals(name)) {
          return queueInfo;
        } else {
          CapacitySchedulerQueueInfo result =
              getQueueInfo(name, queueInfo.getQueues());
          if (result == null) {
            continue;
          }
          return result;
        }
      }
    }
    return null;
  }

  private void verifyQueueMetrics(QueueMetrics queue) {
    assertEquals(0, queue.getPendingMB());
    assertEquals(0, queue.getActiveUsers());
    assertEquals(0, queue.getActiveApps());
    assertEquals(0, queue.getAppsPending());
    assertEquals(0, queue.getAppsRunning());
    assertEquals(0, queue.getAllocatedMB());
    assertEquals(0, queue.getAllocatedVirtualCores());
  }

  private Configuration getCapacityConfiguration(Configuration config) {
    CapacitySchedulerConfiguration conf =
        new CapacitySchedulerConfiguration(config);

    // Define top-level queues
    conf.setQueues(ROOT,
        new String[]{"a", "b"});
    conf.setCapacity(A, 50);
    conf.setCapacity(B, 50);
    conf.setQueues(A, new String[]{"a1", "a2"});
    conf.setCapacity(A1, 50);
    conf.setCapacity(A2, 50);
    conf.setQueues(B, new String[]{"b1"});
    conf.setCapacity(B1, 100);
    return conf;
  }

}