TestWorkPreservingRMRestartForNodeLabel.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestWorkPreservingRMRestartForNodeLabel {
  private Configuration conf;
  private static final int GB = 1024; // 1024 MB
  
  RMNodeLabelsManager mgr;

  @BeforeEach
  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
      ResourceScheduler.class);
    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
  }
  
  @SuppressWarnings("unchecked")
  private <E> Set<E> toSet(E... elements) {
    Set<E> set = Sets.newHashSet(elements);
    return set;
  }
  
  private void checkRMContainerLabelExpression(ContainerId containerId,
      MockRM rm, String labelExpression) {
    RMContainer container =
        rm.getRMContext().getScheduler().getRMContainer(containerId);
    assertNotNull(container, "Cannot find RMContainer=" + containerId);
    assertEquals(labelExpression,
        container.getNodeLabelExpression());
  }
  
  @SuppressWarnings("rawtypes")
  public static void waitForNumContainersToRecover(int num, MockRM rm,
      ApplicationAttemptId attemptId) throws Exception {
    AbstractYarnScheduler scheduler =
        (AbstractYarnScheduler) rm.getResourceScheduler();
    SchedulerApplicationAttempt attempt =
        scheduler.getApplicationAttempt(attemptId);
    while (attempt == null) {
      System.out.println("Wait for scheduler attempt " + attemptId
          + " to be created");
      Thread.sleep(200);
      attempt = scheduler.getApplicationAttempt(attemptId);
    }
    while (attempt.getLiveContainers().size() < num) {
      System.out.println("Wait for " + num
          + " containers to recover. currently: "
          + attempt.getLiveContainers().size());
      Thread.sleep(200);
    }
  }
  
  private void checkAppResourceUsage(String partition, ApplicationId appId,
      MockRM rm, int expectedMemUsage) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    FiCaSchedulerApp app =
        cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
    assertEquals(expectedMemUsage, app.getAppAttemptResourceUsage()
        .getUsed(partition).getMemorySize());
  }
  
  private void checkQueueResourceUsage(String partition, String queueName, MockRM rm, int expectedMemUsage) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue queue = cs.getQueue(queueName);
    assertEquals(expectedMemUsage, queue.getQueueResourceUsage()
        .getUsed(partition).getMemorySize());
  }

  @Test
  public void testWorkPreservingRestartForNodeLabel() throws Exception {
    // This test is pretty much similar to testContainerAllocateWithLabel.
    // Difference is, this test doesn't specify label expression in ResourceRequest,
    // instead, it uses default queue label expression

    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("y")));

    conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf);

    // inject node label manager
    MockRM rm1 =
        new MockRM(conf) {
          @Override
          public RMNodeLabelsManager createNodeLabelManager() {
            return mgr;
          }
        };
    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
    
    ContainerId containerId;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // request a container.
    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    assertTrue(rm1.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 1), rm1, "x");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 2), rm1, "x");

    // launch an app to queue b1 (label = y), and check all container will
    // be allocated in h2
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b1")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

    // request a container.
    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
    assertTrue(rm1.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am2.getApplicationAttemptId(), 1), rm1, "y");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am2.getApplicationAttemptId(), 2), rm1, "y");
    
    // launch an app to queue c1 (label = ""), and check all container will
    // be allocated in h3
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c1")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);

    // request a container.
    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
    assertTrue(rm1.waitForState(nm3, containerId,
        RMContainerState.ALLOCATED));
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am3.getApplicationAttemptId(), 1), rm1, "");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am3.getApplicationAttemptId(), 2), rm1, "");
    
    // Re-start RM
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
        NodeId.newInstance("h2", 0), toSet("y")));
    MockRM rm2 =
        new MockRM(conf,
            memStore) {
          @Override
          public RMNodeLabelsManager createNodeLabelManager() {
            return mgr;
          }
        };
    rm2.start();
    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
    nm3.setResourceTrackerService(rm2.getResourceTrackerService());
    
    // recover app
    NMContainerStatus app1c1 =
        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
          ContainerState.RUNNING, "x");
    NMContainerStatus app1c2 =
        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
          ContainerState.RUNNING, "x");
    nm1.registerNode(Arrays.asList(app1c1, app1c2), null);
    waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 1), rm1, "x");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 2), rm1, "x");
    
    NMContainerStatus app2c1 =
        TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 1,
          ContainerState.RUNNING, "y");
    NMContainerStatus app2c2 =
        TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 2,
          ContainerState.RUNNING, "y");
    nm2.registerNode(Arrays.asList(app2c1, app2c2), null);
    waitForNumContainersToRecover(2, rm2, am2.getApplicationAttemptId());
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am2.getApplicationAttemptId(), 1), rm1, "y");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am2.getApplicationAttemptId(), 2), rm1, "y");
    
    NMContainerStatus app3c1 =
        TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 1,
          ContainerState.RUNNING, "");
    NMContainerStatus app3c2 =
        TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 2,
          ContainerState.RUNNING, "");
    nm3.registerNode(Arrays.asList(app3c1, app3c2), null);
    waitForNumContainersToRecover(2, rm2, am3.getApplicationAttemptId());
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am3.getApplicationAttemptId(), 1), rm1, "");
    checkRMContainerLabelExpression(ContainerId.newContainerId(
        am3.getApplicationAttemptId(), 2), rm1, "");
    
    // Check recovered resource usage
    checkAppResourceUsage("x", app1.getApplicationId(), rm1, 2 * GB);
    checkAppResourceUsage("y", app2.getApplicationId(), rm1, 2 * GB);
    checkAppResourceUsage("", app3.getApplicationId(), rm1, 2 * GB);
    checkQueueResourceUsage("x", "a1", rm1, 2 * GB);
    checkQueueResourceUsage("y", "b1", rm1, 2 * GB);
    checkQueueResourceUsage("", "c1", rm1, 2 * GB);
    checkQueueResourceUsage("x", "a", rm1, 2 * GB);
    checkQueueResourceUsage("y", "b", rm1, 2 * GB);
    checkQueueResourceUsage("", "c", rm1, 2 * GB);
    checkQueueResourceUsage("x", "root", rm1, 2 * GB);
    checkQueueResourceUsage("y", "root", rm1, 2 * GB);
    checkQueueResourceUsage("", "root", rm1, 2 * GB);


    rm1.close();
    rm2.close();
  }
}