TestFSLeafQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Map;

public class TestFSLeafQueue extends FairSchedulerTestBase {
  private final static String ALLOC_FILE = new File(TEST_DIR,
      TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
  private Resource maxResource = Resources.createResource(1024 * 8);
  private static final float MAX_AM_SHARE = 0.5f;
  private static final String CUSTOM_RESOURCE = "test1";

  @BeforeEach
  public void setup() throws IOException {
    conf = createConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
        ResourceScheduler.class);
  }

  @AfterEach
  public void teardown() {
    if (resourceManager != null) {
      resourceManager.stop();
      resourceManager = null;
    }
    conf = null;
  }

  @Test
  public void testUpdateDemand() {
    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
    resourceManager = new MockRM(conf);
    resourceManager.start();
    scheduler = (FairScheduler) resourceManager.getResourceScheduler();

    String queueName = "root.queue1";
    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
    schedulable.setMaxShare(new ConfigurableResource(maxResource));
    assertThat(schedulable.getMetrics().getMaxApps()).
        isEqualTo(Integer.MAX_VALUE);
    assertThat(schedulable.getMetrics().getSchedulingPolicy()).isEqualTo(
        SchedulingPolicy.DEFAULT_POLICY.getName());

    FSAppAttempt app = mock(FSAppAttempt.class);
    when(app.getDemand()).thenReturn(maxResource);
    when(app.getResourceUsage()).thenReturn(Resources.none());

    schedulable.addApp(app, true);
    schedulable.addApp(app, true);

    schedulable.updateDemand();

    assertTrue(Resources.equals(schedulable.getDemand(), maxResource),
        "Demand is greater than max allowed ");
  }

  @Test
  @Timeout(value = 5)
  public void test() {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .queueMaxAMShareDefault(MAX_AM_SHARE)
        .addQueue(new AllocationFileQueue.Builder("queueA").build())
        .addQueue(new AllocationFileQueue.Builder("queueB").build())
        .writeToFile(ALLOC_FILE);

    resourceManager = new MockRM(conf);
    resourceManager.start();
    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
    for(FSQueue queue: scheduler.getQueueManager().getQueues()) {
      assertThat(queue.getMetrics().getMaxApps()).isEqualTo(Integer.MAX_VALUE);
      assertThat(queue.getMetrics().getSchedulingPolicy()).isEqualTo(
          SchedulingPolicy.DEFAULT_POLICY.getName());
    }

    // Add one big node (only care about aggregate capacity)
    RMNode node1 =
        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
            "127.0.0.1");
    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
    scheduler.handle(nodeEvent1);

    scheduler.update();

    // Queue A wants 3 * 1024. Node update gives this all to A
    createSchedulingRequest(3 * 1024, "queueA", "user1");
    scheduler.update();
    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
    scheduler.handle(nodeEvent2);

    // Queue B arrives and wants 1 * 1024
    createSchedulingRequest(1 * 1024, "queueB", "user1");
    scheduler.update();
    Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
    assertEquals(2, queues.size());
  }

  @Test
  public void testConcurrentAccess() {
    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
    resourceManager = new MockRM(conf);
    resourceManager.start();
    scheduler = (FairScheduler) resourceManager.getResourceScheduler();

    String queueName = "root.queue1";
    final FSLeafQueue schedulable = scheduler.getQueueManager().
      getLeafQueue(queueName, true);
    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
    RMContext rmContext = resourceManager.getRMContext();
    final FSAppAttempt app =
        new FSAppAttempt(scheduler, applicationAttemptId, "user1",
            schedulable, null, rmContext);

    // this needs to be in sync with the number of runnables declared below
    int testThreads = 2;
    List<Runnable> runnables = new ArrayList<Runnable>();

    // add applications to modify the list
    runnables.add(new Runnable() {
      @Override
      public void run() {
        for (int i=0; i < 500; i++) {
          schedulable.addApp(app, true);
        }
      }
    });

    // iterate over the list a couple of times in a different thread
    runnables.add(new Runnable() {
      @Override
      public void run() {
        for (int i=0; i < 500; i++) {
          schedulable.getResourceUsage();
        }
      }
    });

    final List<Throwable> exceptions = Collections.synchronizedList(
        new ArrayList<Throwable>());
    final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
        testThreads);

    try {
      final CountDownLatch allExecutorThreadsReady =
          new CountDownLatch(testThreads);
      final CountDownLatch startBlocker = new CountDownLatch(1);
      final CountDownLatch allDone = new CountDownLatch(testThreads);
      for (final Runnable submittedTestRunnable : runnables) {
        threadPool.submit(new Runnable() {
          public void run() {
            allExecutorThreadsReady.countDown();
            try {
              startBlocker.await();
              submittedTestRunnable.run();
            } catch (final Throwable e) {
              exceptions.add(e);
            } finally {
              allDone.countDown();
            }
          }
        });
      }
      // wait until all threads are ready
      allExecutorThreadsReady.await();
      // start all test runners
      startBlocker.countDown();
      int testTimeout = 2;
      assertTrue(allDone.await(testTimeout, TimeUnit.SECONDS),
          "Timeout waiting for more than " + testTimeout + " seconds");
    } catch (InterruptedException ie) {
      exceptions.add(ie);
    } finally {
      threadPool.shutdownNow();
    }
    assertTrue(exceptions.isEmpty(), "Test failed with exception(s)" + exceptions);
  }

  @Test
  public void testCanRunAppAMReturnsTrue() {
    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
    ResourceUtils.resetResourceTypes(conf);

    resourceManager = new MockRM(conf);
    resourceManager.start();
    scheduler = (FairScheduler) resourceManager.getResourceScheduler();

    Resource maxShare = Resource.newInstance(1024 * 8, 4,
        ImmutableMap.of(CUSTOM_RESOURCE, 10L));

    // Add a node to increase available memory and vcores in scheduler's
    // root queue metrics
    addNodeToScheduler(Resource.newInstance(4096, 10,
        ImmutableMap.of(CUSTOM_RESOURCE, 25L)));

    FSLeafQueue queue = setupQueue(maxShare);

    //Min(availableMemory, maxShareMemory (maxResourceOverridden))
    // --> Min(4096, 8192) = 4096
    //Min(availableVCores, maxShareVCores (maxResourceOverridden))
    // --> Min(10, 4) = 4
    //Min(available test1, maxShare test1 (maxResourceOverridden))
    // --> Min(25, 10) = 10
    //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
    // --> 2048 MB memory, 2 vcores, 5 test1
    Resource expectedAMShare = Resource.newInstance(2048, 2,
        ImmutableMap.of(CUSTOM_RESOURCE, 5L));

    Resource appAMResource = Resource.newInstance(2048, 2,
        ImmutableMap.of(CUSTOM_RESOURCE, 3L));

    Map<String, Long> customResourceValues =
        verifyQueueMetricsForCustomResources(queue);

    boolean result = queue.canRunAppAM(appAMResource);
    assertTrue(result, "AM should have been allocated!");

    verifyAMShare(queue, expectedAMShare, customResourceValues);
  }

  private FSLeafQueue setupQueue(Resource maxShare) {
    String queueName = "root.queue1";
    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
    schedulable.setMaxShare(new ConfigurableResource(maxShare));
    schedulable.setMaxAMShare(MAX_AM_SHARE);
    return schedulable;
  }

  @Test
  public void testCanRunAppAMReturnsFalse() {
    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
    ResourceUtils.resetResourceTypes(conf);

    resourceManager = new MockRM(conf);
    resourceManager.start();
    scheduler = (FairScheduler) resourceManager.getResourceScheduler();

    Resource maxShare = Resource.newInstance(1024 * 8, 4,
        ImmutableMap.of(CUSTOM_RESOURCE, 10L));

    // Add a node to increase available memory and vcores in scheduler's
    // root queue metrics
    addNodeToScheduler(Resource.newInstance(4096, 10,
        ImmutableMap.of(CUSTOM_RESOURCE, 25L)));

    FSLeafQueue queue = setupQueue(maxShare);

    //Min(availableMemory, maxShareMemory (maxResourceOverridden))
    // --> Min(4096, 8192) = 4096
    //Min(availableVCores, maxShareVCores (maxResourceOverridden))
    // --> Min(10, 4) = 4
    //Min(available test1, maxShare test1 (maxResourceOverridden))
    // --> Min(25, 10) = 10
    //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
    // --> 2048 MB memory, 2 vcores, 5 test1
    Resource expectedAMShare = Resource.newInstance(2048, 2,
        ImmutableMap.of(CUSTOM_RESOURCE, 5L));

    Resource appAMResource = Resource.newInstance(2048, 2,
        ImmutableMap.of(CUSTOM_RESOURCE, 6L));

    Map<String, Long> customResourceValues =
        verifyQueueMetricsForCustomResources(queue);

    boolean result = queue.canRunAppAM(appAMResource);
    assertFalse(result, "AM should not have been allocated!");

    verifyAMShare(queue, expectedAMShare, customResourceValues);
  }

  private void addNodeToScheduler(Resource node1Resource) {
    RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2");
    scheduler.handle(new NodeAddedSchedulerEvent(node1));
  }

  private void verifyAMShare(FSLeafQueue schedulable,
      Resource expectedAMShare, Map<String, Long> customResourceValues) {
    Resource actualAMShare = Resource.newInstance(
        schedulable.getMetrics().getMaxAMShareMB(),
        schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues);
    long customResourceValue =
        actualAMShare.getResourceValue(CUSTOM_RESOURCE);

    //make sure to verify custom resource value explicitly!
    assertEquals(5L, customResourceValue);
    assertEquals(expectedAMShare, actualAMShare, "AM share is not the expected!");
  }

  private Map<String, Long> verifyQueueMetricsForCustomResources(
      FSLeafQueue schedulable) {
    CustomResourceMetricValue maxAMShareCustomResources =
        schedulable.getMetrics().getCustomResources().getMaxAMShare();
    Map<String, Long> customResourceValues = maxAMShareCustomResources
        .getValues();
    assertNotNull(maxAMShareCustomResources,
        "Queue metrics for custom resources should not be null!");
    assertNotNull(customResourceValues,
        "Queue metrics for custom resources resource values " +
        "should not be null!");
    return customResourceValues;
  }
}