TestCapacitySchedulerPerf.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.event.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestCapacitySchedulerPerf {
  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath DEFAULT = new QueuePath(CapacitySchedulerConfiguration.ROOT
          + ".default");
  private final int GB = 1024;

  private String getResourceName(int idx) {
    return "resource-" + idx;
  }

  public static class CapacitySchedulerPerf extends CapacityScheduler {
    volatile boolean enable = false;
    AtomicLong count = new AtomicLong(0);

    public CapacitySchedulerPerf() {
      super();
    }

    @Override
    CSAssignment allocateContainersToNode(
        CandidateNodeSet<FiCaSchedulerNode> candidates,
        boolean withNodeHeartbeat) {
      CSAssignment retVal = super.allocateContainersToNode(candidates,
          withNodeHeartbeat);

      if (enable) {
        count.incrementAndGet();
      }

      return retVal;
    }
  }

  // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
  // on the command line. In addition, this test has tunables for the following:
  //   Number of queues: -DNumberOfQueues (default=100)
  //   Number of total apps: -DNumberOfApplications (default=200)
  //   Percentage of queues with apps: -DPercentActiveQueues (default=100)
  // E.G.:
  // mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
  //    -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
  //    -DNumberOfApplications=200 -DPercentActiveQueues=100
  // Note that the surefire.fork.timeout flag is added because these tests could
  // take longer than the surefire timeout.
  private void testUserLimitThroughputWithNumberOfResourceTypes(
      int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount)
      throws Exception {
    assumeTrue(Boolean.valueOf(
        System.getProperty("RunCapacitySchedulerPerfTests")));
    int numThreads = Integer.valueOf(System.getProperty(
        "CapacitySchedulerPerfTestsNumThreads", "0"));

    if (numOfResourceTypes > 2) {
      // Initialize resource map
      Map<String, ResourceInformation> riMap = new HashMap<>();

      // Initialize mandatory resources
      riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
      riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);

      for (int i = 2; i < numOfResourceTypes; i++) {
        String resourceName = getResourceName(i);
        riMap.put(resourceName, ResourceInformation
            .newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
                Integer.MAX_VALUE));
      }

      ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
    }

    final int activeQueues = (int) (numQueues * (pctActiveQueues/100f));
    final int totalApps = appCount + activeQueues;
    // extra apps to get started with user limit

    CapacitySchedulerConfiguration csconf =
        createCSConfWithManyQueues(numQueues);
    if (numThreads > 0) {
      csconf.setScheduleAynschronously(true);
      csconf.setInt(
          CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
          numThreads);
      csconf.setLong(
          CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
              + ".scheduling-interval-ms", 0);
    }

    YarnConfiguration conf = new YarnConfiguration(csconf);
    // Don't reset resource types since we have already configured resource
    // types
    conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);

    if (numThreads > 0) {
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacitySchedulerPerf.class,
          ResourceScheduler.class);
      // avoid getting skipped (see CapacityScheduler.shouldSkipNodeSchedule)
      conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 600000);
    } else {
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
              ResourceScheduler.class);
    }

    MockRM rm = new MockRM(conf);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    LeafQueue[] lqs = new LeafQueue[numQueues];
    for (int i = 0; i < numQueues; i++) {
      String queueName = String.format("%03d", i);
      LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
      // For now make user limit large so we can activate all applications
      qb.setUserLimitFactor((float)100.0);
      qb.setupConfigurableCapacities();
      lqs[i] = qb;
    }

    SchedulerEvent addAppEvent;
    SchedulerEvent addAttemptEvent;
    Container container = mock(Container.class);
    ApplicationSubmissionContext submissionContext =
        mock(ApplicationSubmissionContext.class);

    ApplicationId[] appids = new ApplicationId[totalApps];
    RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
    ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
    RMAppImpl[] apps = new RMAppImpl[totalApps];
    RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
    for (int i=0; i<totalApps; i++) {
      appids[i] = BuilderUtils.newApplicationId(100, i);
      appAttemptIds[i] =
          BuilderUtils.newApplicationAttemptId(appids[i], 1);

      attemptMetrics[i] =
          new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
      apps[i] = mock(RMAppImpl.class);
      when(apps[i].getApplicationId()).thenReturn(appids[i]);
      attempts[i] = mock(RMAppAttemptImpl.class);
      when(attempts[i].getMasterContainer()).thenReturn(container);
      when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
      when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
      when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
      when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);

      rm.getRMContext().getRMApps().put(appids[i], apps[i]);
      String queueName = lqs[i % activeQueues].getQueuePath();
      addAppEvent =
          new AppAddedSchedulerEvent(appids[i], queueName, "user1");
      cs.handle(addAppEvent);
      addAttemptEvent =
          new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
      cs.handle(addAttemptEvent);
    }

    // add nodes to cluster with enough resources to satisfy all apps
    Resource newResource = Resource.newInstance(totalApps * GB, totalApps);
    if (numOfResourceTypes > 2) {
      for (int i = 2; i < numOfResourceTypes; i++) {
        newResource.setResourceValue(getResourceName(i), totalApps);
      }
    }
    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
    cs.handle(new NodeAddedSchedulerEvent(node));

    RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
    cs.handle(new NodeAddedSchedulerEvent(node2));

    Priority u0Priority = TestUtils.createMockPriority(1);
    RecordFactory recordFactory =
        RecordFactoryProvider.getRecordFactory(null);

    if (numThreads > 0) {
      // disable async scheduling threads
      for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
        t.suspendSchedule();
      }
    }

    FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
    for (int i=0;i<totalApps;i++) {
      fiCaApps[i] =
          cs.getSchedulerApplications().get(apps[i].getApplicationId())
              .getCurrentAppAttempt();

      ResourceRequest resourceRequest = TestUtils.createResourceRequest(
          ResourceRequest.ANY, 1 * GB, 1, true, u0Priority, recordFactory);
      if (numOfResourceTypes > 2) {
        for (int j = 2; j < numOfResourceTypes; j++) {
          resourceRequest.getCapability().setResourceValue(getResourceName(j),
              10);
        }
      }

      // allocate container for app2 with 1GB memory and 1 vcore
      fiCaApps[i].updateResourceRequests(
          Collections.singletonList(resourceRequest));
    }
    // Now force everything to be at user limit
    for (int i = 0; i < numQueues; i++) {
      lqs[i].setUserLimitFactor((float)0.0);
    }

    if (numThreads > 0) {
      // enable async scheduling threads
      for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
        t.beginSchedule();
      }

      // let the threads allocate resources for extra apps
      while (CapacitySchedulerMetrics.getMetrics().commitSuccess.lastStat()
          .numSamples() < activeQueues) {
        Thread.sleep(1000);
      }

      // count the number of apps with allocated containers
      int numNotPending = 0;
      for (int i = 0; i < totalApps; i++) {
        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
        if (!pending) {
          numNotPending++;
          assertEquals(0,
              fiCaApps[i].getTotalPendingRequestsPerPartition().size());
        } else {
          assertEquals(1*GB,
              fiCaApps[i].getTotalPendingRequestsPerPartition()
                  .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
        }
      }

      // make sure only extra apps have allocated containers
      assertEquals(activeQueues, numNotPending);
    } else {
      // allocate one container for each extra apps since
      //  LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
      cs.handle(new NodeUpdateSchedulerEvent(node));
      cs.handle(new NodeUpdateSchedulerEvent(node2));

      // make sure only the extra apps have allocated containers
      for (int i=0;i<totalApps;i++) {
        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
        if (i < activeQueues) {
          assertFalse(pending);
          assertEquals(0,
              fiCaApps[i].getTotalPendingRequestsPerPartition().size());
        } else {
          assertTrue(pending);
          assertEquals(1*GB,
              fiCaApps[i].getTotalPendingRequestsPerPartition()
                  .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
        }
      }
    }

    // Quiet the loggers while measuring throughput
    GenericTestUtils.setRootLogLevel(Level.WARN);

    if (numThreads > 0) {
      System.out.println("Starting now");
      ((CapacitySchedulerPerf) cs).enable = true;
      long start = Time.monotonicNow();
      Thread.sleep(60000);
      long end = Time.monotonicNow();
      ((CapacitySchedulerPerf) cs).enable = false;
      long numOps = ((CapacitySchedulerPerf) cs).count.get();
      System.out.println("Number of operations: " + numOps);
      System.out.println("Time taken: " + (end - start) + " ms");
      System.out.println("" + (numOps * 1000 / (end - start))
          + " ops / second");
    } else {
      final int topn = 20;
      final int iterations = 2000000;
      final int printInterval = 20000;
      final float numerator = 1000.0f * printInterval;
      PriorityQueue<Long> queue = new PriorityQueue<>(topn,
          Collections.reverseOrder());

      long n = Time.monotonicNow();
      long timespent = 0;
      for (int i = 0; i < iterations; i+=2) {
        if (i > 0  && i % printInterval == 0){
          long ts = (Time.monotonicNow() - n);
          if (queue.size() < topn) {
            queue.offer(ts);
          } else {
            Long last = queue.peek();
            if (last > ts) {
              queue.poll();
              queue.offer(ts);
            }
          }
          System.out.println(i + " " + (numerator / ts));
          n = Time.monotonicNow();
        }
        cs.handle(new NodeUpdateSchedulerEvent(node));
        cs.handle(new NodeUpdateSchedulerEvent(node2));
      }
      timespent = 0;
      int entries = queue.size();
      while (queue.size() > 0) {
        long l = queue.poll();
        timespent += l;
      }
      System.out.println("#ResourceTypes = " + numOfResourceTypes
          + ". Avg of fastest " + entries
          + ": " + numerator / (timespent / entries) + " ops/sec of "
          + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
          + " queues.");
    }

    if (numThreads > 0) {
      // count the number of apps with allocated containers
      int numNotPending = 0;
      for (int i = 0; i < totalApps; i++) {
        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
        if (!pending) {
          numNotPending++;
          assertEquals(0,
                  fiCaApps[i].getTotalPendingRequestsPerPartition().size());
        } else {
          assertEquals(1*GB,
                  fiCaApps[i].getTotalPendingRequestsPerPartition()
                          .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
        }
      }

      // make sure only extra apps have allocated containers
      assertEquals(activeQueues, numNotPending);
    } else {
      // make sure only the extra apps have allocated containers
      for (int i = 0; i < totalApps; i++) {
        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
        if (i < activeQueues) {
          assertFalse(pending);
          assertEquals(0,
                  fiCaApps[i].getTotalPendingRequestsPerPartition().size());
        } else {
          assertTrue(pending);
          assertEquals(1 * GB,
                  fiCaApps[i].getTotalPendingRequestsPerPartition()
                          .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
        }
      }
    }

    rm.close();
    rm.stop();
  }

  @Test
  @Timeout(value = 300)
  public void testUserLimitThroughputForTwoResources() throws Exception {
    testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
  }

  @Test
  @Timeout(value = 300)
  public void testUserLimitThroughputForThreeResources() throws Exception {
    testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
  }

  @Test
  @Timeout(value = 300)
  public void testUserLimitThroughputForFourResources() throws Exception {
    testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
  }

  @Test
  @Timeout(value = 300)
  public void testUserLimitThroughputForFiveResources() throws Exception {
    testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
  }

  @Test
  @Timeout(value = 1800)
  public void testUserLimitThroughputWithManyQueues() throws Exception {

    int numQueues = Integer.getInteger("NumberOfQueues", 40);
    int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
    int appCount = Integer.getInteger("NumberOfApplications", 100);

    testUserLimitThroughputWithNumberOfResourceTypes(
        2, numQueues, pctActiveQueues, appCount);
  }

  CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues)
      throws Exception {
    CapacitySchedulerConfiguration csconf =
        new CapacitySchedulerConfiguration();
    csconf.setResourceComparator(DominantResourceCalculator.class);
    csconf.setMaximumApplicationMasterResourcePerQueuePercent(ROOT, 100.0f);
    csconf.setMaximumAMResourcePercentPerPartition(ROOT, "", 100.0f);
    csconf.setCapacity(DEFAULT, 0.0f);
    csconf.setOffSwitchPerHeartbeatLimit(numQueues);

    float capacity = 100.0f / numQueues;
    String[] subQueues = new String[numQueues];
    for (int i = 0; i < numQueues; i++) {
      String queueName = String.format("%03d", i);
      QueuePath queuePath = new QueuePath("root." + queueName);
      subQueues[i] = queueName;
      csconf.setMaximumApplicationMasterResourcePerQueuePercent(
          queuePath, 100.0f);
      csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
      csconf.setCapacity(queuePath, capacity);
      csconf.setUserLimitFactor(queuePath, 100.0f);
      csconf.setMaximumCapacity(queuePath, 100.0f);
    }

    csconf.setQueues(ROOT, subQueues);

    return csconf;
  }
}