TestActivitiesManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import org.apache.hadoop.yarn.util.resource.Resources;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
 * Test class for {@link ActivitiesManager}.
 */
public class TestActivitiesManager {

  private final static int NUM_NODES = 5;

  private final static int NUM_APPS = 5;

  private final static int NUM_THREADS = 5;

  private RMContext rmContext;

  private TestingActivitiesManager activitiesManager;

  private List<SchedulerApplicationAttempt> apps;

  private List<SchedulerNode> nodes;

  private ThreadPoolExecutor threadPoolExecutor;

  @BeforeEach
  public void setup() {
    rmContext = mock(RMContext.class);
    Configuration conf = new Configuration();
    when(rmContext.getYarnConfiguration()).thenReturn(conf);
    ResourceScheduler scheduler = mock(ResourceScheduler.class);
    when(scheduler.getMinimumResourceCapability())
        .thenReturn(Resources.none());
    when(rmContext.getScheduler()).thenReturn(scheduler);
    LeafQueue mockQueue = mock(LeafQueue.class);
    Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
    doReturn(rmApps).when(rmContext).getRMApps();
    apps = new ArrayList<>();
    for (int i = 0; i < NUM_APPS; i++) {
      ApplicationAttemptId appAttemptId =
          TestUtils.getMockApplicationAttemptId(i, 0);
      RMApp mockApp = mock(RMApp.class);
      doReturn(appAttemptId.getApplicationId()).when(mockApp)
          .getApplicationId();
      doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp)
          .getFinalApplicationStatus();
      rmApps.put(appAttemptId.getApplicationId(), mockApp);
      FiCaSchedulerApp app =
          new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
              mock(ActiveUsersManager.class), rmContext);
      apps.add(app);
    }
    nodes = new ArrayList<>();
    for (int i = 0; i < NUM_NODES; i++) {
      nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240));
    }
    activitiesManager = new TestingActivitiesManager(rmContext);
    threadPoolExecutor =
        new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>());
  }

  /**
   * Test recording activities belong to different nodes in multiple threads,
   * these threads can run without interference and one activity
   * should be recorded by every thread.
   */
  @Test
  public void testRecordingDifferentNodeActivitiesInMultiThreads()
      throws Exception {
    Random rand = new Random();
    List<Future<Void>> futures = new ArrayList<>();
    for (SchedulerNode node : nodes) {
      Callable<Void> task = () -> {
        SchedulerApplicationAttempt randomApp =
            apps.get(rand.nextInt(NUM_APPS));
        // start recording activities for random node
        activitiesManager.recordNextNodeUpdateActivities(
            node.getNodeID().toString());
        // generate node/app activities
        ActivitiesLogger.NODE
            .startNodeUpdateRecording(activitiesManager, node.getNodeID());
        ActivitiesLogger.APP
            .recordAppActivityWithoutAllocation(activitiesManager, node,
                randomApp,
                new SchedulerRequestKey(Priority.newInstance(0), 0, null),
                ActivityDiagnosticConstant.NODE_IS_BLACKLISTED,
                ActivityState.REJECTED, ActivityLevel.NODE);
        ActivitiesLogger.NODE
            .finishNodeUpdateRecording(activitiesManager, node.getNodeID(), "");
        return null;
      };
      futures.add(threadPoolExecutor.submit(task));
    }
    for (Future<Void> future : futures) {
      future.get();
    }
    // Check activities for all nodes should be recorded and every node should
    // have only one allocation information.
    assertEquals(NUM_NODES,
        activitiesManager.historyNodeAllocations.size());
    for (List<List<NodeAllocation>> nodeAllocationsForThisNode :
        activitiesManager.historyNodeAllocations.values()) {
      assertEquals(1, nodeAllocationsForThisNode.size());
      assertEquals(1, nodeAllocationsForThisNode.get(0).size());
    }
  }

  /**
   * Test recording activities for multi-nodes assignment in multiple threads,
   * only one activity info should be recorded by one of these threads.
   */
  @Test
  public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads()
      throws Exception {
    Random rand = new Random();
    // start recording activities for multi-nodes
    activitiesManager.recordNextNodeUpdateActivities(
        ActivitiesManager.EMPTY_NODE_ID.toString());
    List<Future<Void>> futures = new ArrayList<>();
    // generate node/app activities
    for (SchedulerNode node : nodes) {
      Callable<Void> task = () -> {
        SchedulerApplicationAttempt randomApp =
            apps.get(rand.nextInt(NUM_APPS));
        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
            ActivitiesManager.EMPTY_NODE_ID);
        ActivitiesLogger.APP
            .recordAppActivityWithoutAllocation(activitiesManager, node,
                randomApp,
                new SchedulerRequestKey(Priority.newInstance(0), 0, null),
                ActivityDiagnosticConstant.NODE_IS_BLACKLISTED,
                ActivityState.REJECTED, ActivityLevel.NODE);
        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
            ActivitiesManager.EMPTY_NODE_ID, "");
        return null;
      };
      futures.add(threadPoolExecutor.submit(task));
    }
    for (Future<Void> future : futures) {
      future.get();
    }
    // Check activities for multi-nodes should be recorded only once
    assertEquals(1, activitiesManager.historyNodeAllocations.size());
  }


  /**
   * Test recording app activities in multiple threads,
   * only one activity info should be recorded by one of these threads.
   */
  @Test
  public void testRecordingAppActivitiesInMultiThreads()
      throws Exception {
    Random rand = new Random();
    // start recording activities for a random app
    SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS));
    activitiesManager
        .turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3);
    List<Future<Void>> futures = new ArrayList<>();
    // generate app activities
    int nTasks = 20;
    for (int i=0; i<nTasks; i++) {
      Callable<Void> task = () -> {
        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
            (FiCaSchedulerNode) nodes.get(0),
            SystemClock.getInstance().getTime(), randomApp);
        for (SchedulerNode node : nodes) {
          ActivitiesLogger.APP
              .recordAppActivityWithoutAllocation(activitiesManager, node,
                  randomApp,
                  new SchedulerRequestKey(Priority.newInstance(0), 0, null),
                  ActivityDiagnosticConstant.NODE_IS_BLACKLISTED,
                  ActivityState.REJECTED, ActivityLevel.NODE);
        }
        ActivitiesLogger.APP
            .finishSkippedAppAllocationRecording(activitiesManager,
                randomApp.getApplicationId(), ActivityState.SKIPPED,
                ActivityDiagnosticConstant.EMPTY);
        return null;
      };
      futures.add(threadPoolExecutor.submit(task));
    }
    // Check activities for multi-nodes should be recorded only once
    for (Future<Void> future : futures) {
      future.get();
    }
    Queue<AppAllocation> appAllocations =
        activitiesManager.completedAppAllocations
            .get(randomApp.getApplicationId());
    assertEquals(nTasks, appAllocations.size());
    for(AppAllocation aa : appAllocations) {
      assertEquals(NUM_NODES, aa.getAllocationAttempts().size());
    }
  }

  @Test
  @Timeout(value = 30)
  public void testAppActivitiesTTL() throws Exception {
    long cleanupIntervalMs = 100;
    long appActivitiesTTL = 1000;
    rmContext.getYarnConfiguration()
        .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
            cleanupIntervalMs);
    rmContext.getYarnConfiguration()
        .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
            appActivitiesTTL);
    ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
    newActivitiesManager.serviceStart();
    // start recording activities for first app and first node
    SchedulerApplicationAttempt app = apps.get(0);
    FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
    newActivitiesManager
        .turnOnAppActivitiesRecording(app.getApplicationId(), 3);
    int numActivities = 10;
    for (int i = 0; i < numActivities; i++) {
      ActivitiesLogger.APP
          .startAppAllocationRecording(newActivitiesManager, node,
              SystemClock.getInstance().getTime(), app);
      ActivitiesLogger.APP
          .recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
              new SchedulerRequestKey(Priority.newInstance(0), 0, null),
              ActivityDiagnosticConstant.NODE_IS_BLACKLISTED,
              ActivityState.REJECTED, ActivityLevel.NODE);
      ActivitiesLogger.APP
          .finishSkippedAppAllocationRecording(newActivitiesManager,
              app.getApplicationId(), ActivityState.SKIPPED,
              ActivityDiagnosticConstant.EMPTY);
    }
    AppActivitiesInfo appActivitiesInfo = newActivitiesManager
        .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
            false, 3);
    assertEquals(numActivities,
        appActivitiesInfo.getAllocations().size());
    // sleep until all app activities expired
    Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
    // there should be no remaining app activities
    appActivitiesInfo = newActivitiesManager
        .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
            false, 3);
    assertEquals(0,
        appActivitiesInfo.getAllocations().size());
  }

  @Test
  @Timeout(value = 30)
  public void testAppActivitiesPerformance() {
    // start recording activities for first app
    SchedulerApplicationAttempt app = apps.get(0);
    FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
    activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100);
    int numActivities = 100;
    int numNodes = 10000;
    int testingTimes = 10;
    for (int ano = 0; ano < numActivities; ano++) {
      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
          SystemClock.getInstance().getTime(), app);
      for (int i = 0; i < numNodes; i++) {
        NodeId nodeId = NodeId.newInstance("host" + i, 0);
        activitiesManager
            .addSchedulingActivityForApp(app.getApplicationId(), null, 0,
                ActivityState.SKIPPED,
                ActivityDiagnosticConstant.NODE_IS_BLACKLISTED,
                ActivityLevel.NODE, nodeId, 0L);
      }
      ActivitiesLogger.APP
          .finishSkippedAppAllocationRecording(activitiesManager,
              app.getApplicationId(), ActivityState.SKIPPED,
              ActivityDiagnosticConstant.EMPTY);
    }

    // It often take a longer time for the first query, ignore this distraction
    activitiesManager
        .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
            true, 100);

    // Test getting normal app activities
    Supplier<Void> normalSupplier = () -> {
      AppActivitiesInfo appActivitiesInfo = activitiesManager
          .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
              false, 100);
      assertEquals(numActivities,
          appActivitiesInfo.getAllocations().size());
      assertEquals(1,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .size());
      assertEquals(numNodes,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .get(0).getChildren().size());
      return null;
    };
    testManyTimes("Getting normal app activities", normalSupplier,
        testingTimes);

    // Test getting aggregated app activities
    Supplier<Void> aggregatedSupplier = () -> {
      AppActivitiesInfo appActivitiesInfo = activitiesManager
          .getAppActivitiesInfo(app.getApplicationId(), null, null,
              RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100);
      assertEquals(numActivities,
          appActivitiesInfo.getAllocations().size());
      assertEquals(1,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .size());
      assertEquals(1,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .get(0).getChildren().size());
      assertEquals(numNodes,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .get(0).getChildren().get(0).getNodeIds().size());
      return null;
    };
    testManyTimes("Getting aggregated app activities", aggregatedSupplier,
        testingTimes);

    // Test getting summarized app activities
    Supplier<Void> summarizedSupplier = () -> {
      AppActivitiesInfo appActivitiesInfo = activitiesManager
          .getAppActivitiesInfo(app.getApplicationId(), null, null,
              RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100);
      assertEquals(1, appActivitiesInfo.getAllocations().size());
      assertEquals(1,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .size());
      assertEquals(1,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .get(0).getChildren().size());
      assertEquals(numNodes,
          appActivitiesInfo.getAllocations().get(0).getChildren()
              .get(0).getChildren().get(0).getNodeIds().size());
      return null;
    };
    testManyTimes("Getting summarized app activities", summarizedSupplier,
        testingTimes);
  }

  @Test
  @Timeout(value = 10)
  public void testAppActivitiesMaxQueueLengthUpdate()
      throws TimeoutException, InterruptedException {
    Configuration conf = new Configuration();
    int configuredAppActivitiesMaxQueueLength = 1;
    conf.setInt(YarnConfiguration.
            RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
        configuredAppActivitiesMaxQueueLength);
    conf.setInt(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
        500);
    ConcurrentMap<NodeId, RMNode> mockNodes = new ConcurrentHashMap<>();
    int numNodes = 5;
    for (int i = 0; i < numNodes; i++) {
      mockNodes.put(NodeId.newInstance("node" + i, 0), mock(RMNode.class));
    }
    CapacityScheduler cs = mock(CapacityScheduler.class);
    RMContext mockRMContext = mock(RMContext.class);
    when(mockRMContext.getRMNodes()).thenReturn(mockNodes);
    when(mockRMContext.getYarnConfiguration()).thenReturn(conf);
    when(mockRMContext.getScheduler()).thenReturn(cs);
    /*
     * Test for async-scheduling with multi-node placement disabled
     */
    when(cs.isMultiNodePlacementEnabled()).thenReturn(false);
    int numAsyncSchedulerThreads = 3;
    when(cs.getNumAsyncSchedulerThreads())
        .thenReturn(numAsyncSchedulerThreads);
    ActivitiesManager newActivitiesManager =
        new ActivitiesManager(mockRMContext);
    assertEquals(1,
        newActivitiesManager.getAppActivitiesMaxQueueLength());
    newActivitiesManager.init(conf);
    newActivitiesManager.start();
    GenericTestUtils.waitFor(
        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
            == numNodes * numAsyncSchedulerThreads, 100, 3000);
    assertEquals(15,
        newActivitiesManager.getAppActivitiesMaxQueueLength());
    /*
     * Test for HB-driven scheduling with multi-node placement disabled
     */
    when(cs.getNumAsyncSchedulerThreads()).thenReturn(0);
    GenericTestUtils.waitFor(
        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
            == numNodes * 1.2, 100, 3000);
    assertEquals(6,
        newActivitiesManager.getAppActivitiesMaxQueueLength());
    /*
     * Test for scheduling with multi-node placement enabled
     */
    when(cs.isMultiNodePlacementEnabled()).thenReturn(true);
    GenericTestUtils.waitFor(
        () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
            == configuredAppActivitiesMaxQueueLength, 100, 3000);
    assertEquals(1,
        newActivitiesManager.getAppActivitiesMaxQueueLength());
  }

  private void testManyTimes(String testingName,
      Supplier<Void> supplier, int testingTimes) {
    long totalTime = 0;
    for (int i = 0; i < testingTimes; i++) {
      long startTime = System.currentTimeMillis();
      supplier.get();
      totalTime += System.currentTimeMillis() - startTime;
    }
    System.out.println("#" + testingName + ", testing times : " + testingTimes
        + ", total cost time : " + totalTime + " ms, average cost time : "
        + (float) totalTime / testingTimes + " ms.");
  }

  /**
   * Testing activities manager which can record all history information about
   * node allocations.
   */
  public class TestingActivitiesManager extends ActivitiesManager {

    private Map<NodeId, List<List<NodeAllocation>>> historyNodeAllocations =
        new ConcurrentHashMap<>();

    public TestingActivitiesManager(RMContext rmContext) {
      super(rmContext);
      super.completedNodeAllocations = spy(new ConcurrentHashMap<>());
      doAnswer((invocationOnMock) -> {
        NodeId nodeId = (NodeId) invocationOnMock.getArguments()[0];
        List<NodeAllocation> nodeAllocations =
            (List<NodeAllocation>) invocationOnMock.getArguments()[1];
        List<List<NodeAllocation>> historyAllocationsForThisNode =
            historyNodeAllocations.get(nodeId);
        if (historyAllocationsForThisNode == null) {
          historyAllocationsForThisNode = new ArrayList<>();
          historyNodeAllocations.put(nodeId, historyAllocationsForThisNode);
        }
        historyAllocationsForThisNode.add(nodeAllocations);
        return null;
      }).when(completedNodeAllocations).put(any(NodeId.class),
          any(List.class));
    }
  }
}