TestQueueMetricsForCustomResources.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;


import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
    .extractCustomResourcesAsStrings;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class TestQueueMetricsForCustomResources {
  public enum MetricsForCustomResource {
    ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
  }

  public static final long GB = 1024; // MB
  private static final Configuration CONF = new Configuration();
  public static final String CUSTOM_RES_1 = "custom_res_1";
  public static final String CUSTOM_RES_2 = "custom_res_2";
  public static final String USER = "alice";
  private Resource defaultResource;
  private MetricsSystem ms;

  @BeforeEach
  public void setUp() {
    ms = new MetricsSystemImpl();
    QueueMetrics.clearQueueMetrics();
    initializeResourceTypes();
    createDefaultResource();
  }

  private void createDefaultResource() {
    defaultResource = newResource(4 * GB, 4,
        ImmutableMap.<String, String> builder()
            .put(CUSTOM_RES_1, String.valueOf(15 * GB))
            .put(CUSTOM_RES_2, String.valueOf(20 * GB))
            .build());
  }

  private void initializeResourceTypes() {
    Map<String, ResourceInformation> riMap = new HashMap<>();

    ResourceInformation memory = ResourceInformation.newInstance(
        ResourceInformation.MEMORY_MB.getName(),
        ResourceInformation.MEMORY_MB.getUnits(),
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    ResourceInformation vcores = ResourceInformation.newInstance(
        ResourceInformation.VCORES.getName(),
        ResourceInformation.VCORES.getUnits(),
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
        ResourceInformation.VCORES.getUnits(), 0, 2000);
    ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
        ResourceInformation.VCORES.getUnits(), 0, 2000);

    riMap.put(ResourceInformation.MEMORY_URI, memory);
    riMap.put(ResourceInformation.VCORES_URI, vcores);
    riMap.put(CUSTOM_RES_1, res1);
    riMap.put(CUSTOM_RES_2, res2);
    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
  }

  private static void assertCustomResourceValue(QueueMetrics metrics,
      MetricsForCustomResource metricsType,
      Function<QueueMetrics, Resource> func,
      String resourceName,
      long expectedValue) {
    Resource res = func.apply(metrics);
    Long value = res.getResourceValue(resourceName);
    assertCustomResourceValueInternal(metricsType, resourceName,
        expectedValue, value);
  }

  private static void assertCustomResourceValueInternal(
      MetricsForCustomResource metricsType, String resourceName, long
      expectedValue, Long value) {
    assertNotNull(value, "QueueMetrics should have custom resource metrics value " +
        "for resource: " + resourceName);
    assertEquals(expectedValue, (long) value, String.format(
        "QueueMetrics should have custom resource metrics value %d " +
        "for resource: %s for metrics type %s",
        expectedValue, resourceName, metricsType));
  }

  private static Map<String, String> getCustomResourcesWithValue(long value) {
    return ImmutableMap.<String, String>builder()
        .put(CUSTOM_RES_1, String.valueOf(value))
        .put(CUSTOM_RES_2, String.valueOf(value))
        .build();
  }

  private QueueInfo createFourLevelQueueHierarchy() {
    QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
    QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
    QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
    return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
  }

  private QueueInfo createBasicQueueHierarchy() {
    QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
    return new QueueInfo(root, "root.leaf", ms, CONF, USER);
  }

  private QueueMetricsTestData.Builder
      createQueueMetricsTestDataWithContainers(int containers) {
    return createDefaultQueueMetricsTestData()
        .withContainers(containers);
  }

  private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
    return QueueMetricsTestData.Builder.create()
        .withUser(USER)
        .withPartition(RMNodeLabelsManager.NO_LABEL);
  }

  private void testIncreasePendingResources(QueueMetricsTestData testData) {
    testIncreasePendingResourcesInternal(testData.containers, testData);
  }

  private void testIncreasePendingResourcesWithoutContainer(
      QueueMetricsTestData testData) {
    testIncreasePendingResourcesInternal(1, testData);
  }

  private void testIncreasePendingResourcesInternal(int containers,
      QueueMetricsTestData testData) {
    testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
        testData.user, containers, testData.resource);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .gaugeInt(PENDING_CONTAINERS, containers)
        .gaugeLong(PENDING_MB, containers *
            testData.resource.getMemorySize())
        .gaugeInt(PENDING_V_CORES, containers *
              testData.resource.getVirtualCores())
        .gaugeLong(PENDING_CUSTOM_RES1,
            containers * testData.customResourceValues.get(CUSTOM_RES_1))
        .gaugeLong(PENDING_CUSTOM_RES2,
            containers * testData.customResourceValues.get(CUSTOM_RES_2));
    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getPendingResources,
        MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> v * containers));
  }

  private void testAllocateResources(boolean decreasePending,
      QueueMetricsTestData testData) {
    testData.leafQueue.queueMetrics.allocateResources(testData.partition,
        testData.user, testData.containers, testData.resource, decreasePending);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
        .counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
        .gaugeLong(ALLOCATED_MB, testData.containers *
            testData.resource.getMemorySize())
        .gaugeInt(ALLOCATED_V_CORES, testData.containers *
            testData.resource.getVirtualCores())
        .gaugeInt(PENDING_CONTAINERS, 0)
        .gaugeLong(PENDING_MB, 0)
        .gaugeInt(PENDING_V_CORES, 0)
        .gaugeLong(ALLOCATED_CUSTOM_RES1,
            testData.containers
                * testData.customResourceValues.get(CUSTOM_RES_1))
        .gaugeLong(ALLOCATED_CUSTOM_RES2,
            testData.containers
                * testData.customResourceValues.get(CUSTOM_RES_2))
        .checkAgainst(testData.leafQueue.queueSource);
    if (decreasePending) {
      assertAllMetrics(testData.leafQueue, checker,
          QueueMetrics::getPendingResources,
          MetricsForCustomResource.PENDING,
          computeExpectedCustomResourceValues(testData.customResourceValues,
              (k, v) -> 0L));
    }
    if (!testData.customResourceValues.isEmpty()) {
      assertAllMetrics(testData.leafQueue, checker,
          QueueMetrics::getAllocatedResources,
          MetricsForCustomResource.ALLOCATED,
          computeExpectedCustomResourceValues(testData.customResourceValues,
              (k, v) -> v * testData.containers));
    }
  }

  private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
      int seconds) {
    testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
        testData.resource.getMemorySize() * seconds);
    testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
        testData.resource.getVirtualCores() * seconds);
    testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
        testData.resource, seconds);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
            testData.resource.getMemorySize() * seconds)
        .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
            testData.resource.getVirtualCores() * seconds)
        .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1,
            testData.customResourceValues.get(CUSTOM_RES_1) * seconds)
        .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2,
            testData.customResourceValues.get(CUSTOM_RES_2) * seconds);

    assertQueueMetricsOnly(testData.leafQueue, checker,
        this::convertPreemptedSecondsToResource,
        MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS,
        computeExpectedCustomResourceValues(testData.customResourceValues,
            (k, v) -> v * seconds));
  }

  private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
    CustomResourceMetricValue customValues = qm
        .getAggregatedPreemptedSecondsResources();
    MutableCounterLong vcoreSeconds = qm
        .getAggregateVcoreSecondsPreempted();
    MutableCounterLong memorySeconds = qm
        .getAggregateMemoryMBSecondsPreempted();
    return Resource.newInstance(
        memorySeconds.value(), (int) vcoreSeconds.value(),
        customValues.getValues());
  }

  private void testReserveResources(QueueMetricsTestData testData) {
    testData.leafQueue.queueMetrics.reserveResource(testData.partition,
        testData.user, testData.resource);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .gaugeInt(RESERVED_CONTAINERS, 1)
        .gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
        .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
        .gaugeLong(RESERVED_CUSTOM_RES1,
            testData.customResourceValues.get(CUSTOM_RES_1))
        .gaugeLong(RESERVED_CUSTOM_RES2,
            testData.customResourceValues.get(CUSTOM_RES_2))
        .checkAgainst(testData.leafQueue.queueSource);
    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getReservedResources,
        MetricsForCustomResource.RESERVED,
        computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> v));
  }

  private void testGetAllocatedResources(QueueMetricsTestData testData) {
    testAllocateResources(false, testData);

    Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
    if (testData.customResourceValues.size() > 0) {
      assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
          CUSTOM_RES_1,
          testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
          res.getResourceValue(CUSTOM_RES_1));
      assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
          CUSTOM_RES_2,
          testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
          res.getResourceValue(CUSTOM_RES_2));
    }
  }

  private void assertAllMetrics(QueueInfo queueInfo,
      ResourceMetricsChecker checker,
      Function<QueueMetrics, Resource> func,
      MetricsForCustomResource metricsType,
      Map<String, Long> expectedCustomResourceValues) {
    assertAllQueueMetrics(queueInfo, checker, func, metricsType,
        expectedCustomResourceValues);

    //assert leaf and root userSources
    checker = ResourceMetricsChecker.createFromChecker(checker)
        .checkAgainst(queueInfo.userSource);
    ResourceMetricsChecker.createFromChecker(checker)
        .checkAgainst(queueInfo.getRoot().userSource);
  }

  private void assertQueueMetricsOnly(QueueInfo queueInfo,
      ResourceMetricsChecker checker,
      Function<QueueMetrics, Resource> func,
      MetricsForCustomResource metricsType,
      Map<String, Long> expectedCustomResourceValues) {
    assertAllQueueMetrics(queueInfo, checker, func, metricsType,
        expectedCustomResourceValues);
  }

  private void assertAllQueueMetrics(QueueInfo queueInfo,
      ResourceMetricsChecker checker,
      Function<QueueMetrics, Resource> func,
      MetricsForCustomResource metricsType,
      Map<String, Long> expectedCustomResourceValues) {
    // assert normal resource metrics values
    queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker
        .createFromChecker(checker).checkAgainst(qs));

    // assert custom resource metrics values
    queueInfo.checkAllQueueMetrics(qm -> {
      assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1,
          expectedCustomResourceValues.get(CUSTOM_RES_1));
      assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2,
          expectedCustomResourceValues.get(CUSTOM_RES_2));
    });
  }

  private Map<String, Long> computeExpectedCustomResourceValues(
      Map<String, Long> customResourceValues,
      BiFunction<String, Long, Long> func) {
    Map<String, Long> values = Maps.newHashMap();
    for (Map.Entry<String, Long> res : customResourceValues.entrySet()) {
      values.put(res.getKey(), func.apply(res.getKey(), res.getValue()));
    }
    return values;
  }

  @Test
  public void testSetAvailableResourcesToQueue1() {
    String queueName = "single";
    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
        false, CONF);
    MetricsSource queueSource = queueSource(ms, queueName);

    metrics.setAvailableResourcesToQueue(newResource(
        GB, 4,
        ImmutableMap.<String, String> builder()
            .put(CUSTOM_RES_1, String.valueOf(5 * GB))
            .put(CUSTOM_RES_2, String.valueOf(6 * GB))
            .build()));
    ResourceMetricsChecker.create()
        .gaugeLong(AVAILABLE_MB, GB)
        .gaugeInt(AVAILABLE_V_CORES, 4)
        .gaugeLong(AVAILABLE_CUSTOM_RES1, 5 * GB)
        .gaugeLong(AVAILABLE_CUSTOM_RES2, 6 * GB)
        .checkAgainst(queueSource);

    assertCustomResourceValue(metrics,
        MetricsForCustomResource.AVAILABLE,
        QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB);
    assertCustomResourceValue(metrics,
        MetricsForCustomResource.AVAILABLE,
        QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB);
  }

  @Test
  public void testSetAvailableResourcesToQueue2() {
    String queueName = "single";
    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
        false, CONF);
    MetricsSource queueSource = queueSource(ms, queueName);

    metrics.setAvailableResourcesToQueue(null,
        newResource(GB, 4,
        ImmutableMap.<String, String> builder()
            .put(CUSTOM_RES_1, String.valueOf(15 * GB))
            .put(CUSTOM_RES_2, String.valueOf(20 * GB))
            .build()));
    ResourceMetricsChecker.create()
        .gaugeLong(AVAILABLE_MB, GB)
        .gaugeInt(AVAILABLE_V_CORES, 4)
        .gaugeLong(AVAILABLE_CUSTOM_RES1, 15 * GB)
        .gaugeLong(AVAILABLE_CUSTOM_RES2, 20 * GB)
        .checkAgainst(queueSource);

    assertCustomResourceValue(metrics,
        MetricsForCustomResource.AVAILABLE,
        QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB);
    assertCustomResourceValue(metrics,
        MetricsForCustomResource.AVAILABLE,
        QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB);
  }

  @Test
  public void testIncreasePendingResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResourceToDecrease(
            newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
        .withResources(defaultResource)
        .build();

    testIncreasePendingResources(testData);
  }

  @Test
  public void testDecreasePendingResources() {
    Resource resourceToDecrease =
        newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
    int containersToDecrease = 2;
    int containers = 5;
    QueueMetricsTestData testData =
        createQueueMetricsTestDataWithContainers(containers)
            .withLeafQueue(createBasicQueueHierarchy())
        .withResourceToDecrease(resourceToDecrease, containers)
        .withResources(defaultResource)
        .build();

    //compute expected values
    final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
    final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
    final int containersAfterDecrease = containers - containersToDecrease;
    final long customRes1ToDecrease =
      resourceToDecrease.getResourceValue(CUSTOM_RES_1);
    final long customRes2ToDecrease =
      resourceToDecrease.getResourceValue(CUSTOM_RES_2);

    final int vcoresAfterDecrease =
        (defaultResource.getVirtualCores() * containers)
            - (vCoresToDecrease * containersToDecrease);
    final long memoryAfterDecrease =
        (defaultResource.getMemorySize() * containers)
            - (memoryMBToDecrease * containersToDecrease);
    final long customResource1AfterDecrease =
      (testData.customResourceValues.get(CUSTOM_RES_1) * containers)
          - (customRes1ToDecrease * containersToDecrease);
    final long customResource2AfterDecrease =
      (testData.customResourceValues.get(CUSTOM_RES_2) * containers)
          - (customRes2ToDecrease * containersToDecrease);

    //first, increase resources to be able to decrease some
    testIncreasePendingResources(testData);

    //decrease resources
    testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
        testData.user, containersToDecrease,
        ResourceTypesTestHelper.newResource(memoryMBToDecrease,
            vCoresToDecrease,
            extractCustomResourcesAsStrings(resourceToDecrease)));

    //check
    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
        .gaugeLong(PENDING_MB, memoryAfterDecrease)
        .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
        .gaugeLong(PENDING_CUSTOM_RES1, customResource1AfterDecrease)
        .gaugeLong(PENDING_CUSTOM_RES2, customResource2AfterDecrease)
        .checkAgainst(testData.leafQueue.queueSource);

    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getPendingResources,
        MetricsForCustomResource.PENDING,
        computeExpectedCustomResourceValues(testData.customResourceValues,
            (k, v) -> v * containers - (resourceToDecrease.getResourceValue(k)
                * containersToDecrease)));
  }

  @Test
  public void testAllocateResourcesWithoutDecreasePending() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResources(defaultResource)
        .build();

    testAllocateResources(false, testData);
  }

  @Test
  public void testAllocateResourcesWithDecreasePending() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResourceToDecrease(
            newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
        .withResources(defaultResource)
        .build();

    //first, increase pending resources to be able to decrease some
    testIncreasePendingResources(testData);

    //then allocate with decrease pending resources
    testAllocateResources(true, testData);
  }

  @Test
  public void testAllocateResourcesWithoutContainer() {
    QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
        .withLeafQueue(createBasicQueueHierarchy())
        .withResources(defaultResource)
        .build();

    //first, increase pending resources
    testIncreasePendingResourcesWithoutContainer(testData);

    Resource resource = testData.resource;
    testData.leafQueue.queueMetrics.allocateResources(testData.partition,
        testData.user, resource);

    ResourceMetricsChecker checker = ResourceMetricsChecker.create()
        .gaugeLong(ALLOCATED_MB, resource.getMemorySize())
        .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
        .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
        .gaugeInt(PENDING_V_CORES, 0)
        .gaugeLong(ALLOCATED_CUSTOM_RES1,
            testData.customResourceValues.get(CUSTOM_RES_1))
        .gaugeLong(ALLOCATED_CUSTOM_RES2,
            testData.customResourceValues.get(CUSTOM_RES_2));

    checker.checkAgainst(testData.leafQueue.queueSource);
    checker.checkAgainst(testData.leafQueue.getRoot().queueSource);

    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getPendingResources,
        MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> 0L));
    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getAllocatedResources,
        MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> v));
  }

  @Test
  public void testReleaseResources() {
    int containers = 5;
    QueueMetricsTestData testData =
        createQueueMetricsTestDataWithContainers(containers)
            .withLeafQueue(createBasicQueueHierarchy())
        .withResourceToDecrease(defaultResource, containers)
        .withResources(defaultResource)
        .build();

    //first, allocate some resources so that we can release some
    testAllocateResources(false, testData);

    testData.leafQueue.queueMetrics.releaseResources(testData.partition,
        testData.user, containers, defaultResource);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
        .counter(AGGREGATE_CONTAINERS_RELEASED, containers)
        .checkAgainst(testData.leafQueue.queueSource);
    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getAllocatedResources,
        MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> 0L));
  }

  @Test
  public void testUpdatePreemptedSecondsForCustomResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createFourLevelQueueHierarchy())
        .withResources(defaultResource)
        .build();

    final int seconds = 1;
    testUpdatePreemptedSeconds(testData, seconds);
  }

  @Test
  public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createFourLevelQueueHierarchy())
        .withResources(defaultResource)
        .build();

    final int seconds = 15;
    testUpdatePreemptedSeconds(testData, seconds);
  }

  @Test
  public void testReserveResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResources(defaultResource)
        .build();

    testReserveResources(testData);
  }

  @Test
  public void testUnreserveResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResources(defaultResource)
        .build();

    testReserveResources(testData);

    testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
        testData.user, defaultResource);

    ResourceMetricsChecker checker = ResourceMetricsChecker
        .create()
        .gaugeInt(RESERVED_CONTAINERS, 0)
        .gaugeLong(RESERVED_MB, 0)
        .gaugeInt(RESERVED_V_CORES, 0)
        .gaugeLong(RESERVED_CUSTOM_RES1, 0).gaugeLong(RESERVED_CUSTOM_RES2, 0)
        .checkAgainst(testData.leafQueue.queueSource);
    assertAllMetrics(testData.leafQueue, checker,
        QueueMetrics::getReservedResources,
        MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues(
            testData.customResourceValues, (k, v) -> 0L));
  }

  @Test
  public void testGetAllocatedResourcesWithCustomResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withLeafQueue(createBasicQueueHierarchy())
        .withResources(defaultResource)
        .build();

    testGetAllocatedResources(testData);
  }

  @Test
  public void testGetAllocatedResourcesWithoutCustomResources() {
    QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
        .withResources(newResource(4 * GB, 4, Collections.emptyMap()))
        .withLeafQueue(createBasicQueueHierarchy())
        .build();

    testGetAllocatedResources(testData);
  }

}