TestFSQueueMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * The test class for {@link FSQueueMetrics}.
 */
public class TestFSQueueMetrics {
  private static final Configuration CONF = new Configuration();

  private MetricsSystem ms;
  private static final String RESOURCE_NAME = "test1";
  private static final String QUEUE_NAME = "single";

  @BeforeEach
  public void setUp() {
    ms = new MetricsSystemImpl();
    QueueMetrics.clearQueueMetrics();
  }

  private FSQueueMetrics setupMetrics(String resourceName) {
    CONF.set(YarnConfiguration.RESOURCE_TYPES, resourceName);
    ResourceUtils.resetResourceTypes(CONF);

    return FSQueueMetrics.forQueue(ms, QUEUE_NAME, null, false, CONF);
  }

  private String getErrorMessage(String metricsType) {
    return metricsType + " is not the expected!";
  }

  /**
   * Test if the metric scheduling policy is set correctly.
   */
  @Test
  public void testSchedulingPolicy() {
    String queueName = "single";

    FSQueueMetrics metrics = FSQueueMetrics.forQueue(ms, queueName, null, false,
        CONF);
    metrics.setSchedulingPolicy("drf");
    checkSchedulingPolicy(queueName, "drf");

    // test resetting the scheduling policy
    metrics.setSchedulingPolicy("fair");
    checkSchedulingPolicy(queueName, "fair");
  }

  private void checkSchedulingPolicy(String queueName, String policy) {
    MetricsSource queueSource = TestQueueMetrics.queueSource(ms, queueName);
    MetricsCollectorImpl collector = new MetricsCollectorImpl();
    queueSource.getMetrics(collector, true);
    MetricsRecords.assertTag(collector.getRecords().get(0), "SchedulingPolicy",
        policy);
  }

  @Test
  public void testSetFairShare() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setFairShare(res);

    assertEquals(2048L, metrics.getFairShareMB(), getErrorMessage("fairShareMB"));
    assertEquals(4L, metrics.getFairShareVirtualCores(), getErrorMessage("fairShareVcores"));
    assertEquals(2048L, metrics.getFairShare().getMemorySize(), getErrorMessage("fairShareMB"));
    assertEquals(4L, metrics.getFairShare().getVirtualCores(), getErrorMessage("fairShareVcores"));
    assertEquals(20L, metrics.getFairShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("fairShare for resource: " + RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setFairShare(res);

    assertEquals(2049L, metrics.getFairShareMB(), getErrorMessage("fairShareMB"));
    assertEquals(5L, metrics.getFairShareVirtualCores(), getErrorMessage("fairShareVcores"));
    assertEquals(2049L, metrics.getFairShare().getMemorySize(), getErrorMessage("fairShareMB"));
    assertEquals(5L, metrics.getFairShare().getVirtualCores(), getErrorMessage("fairShareVcores"));
    assertEquals(0, metrics.getFairShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("fairShare for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetSteadyFairShare() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setSteadyFairShare(res);

    assertEquals(2048L, metrics.getSteadyFairShareMB(), getErrorMessage("steadyFairShareMB"));
    assertEquals(4L, metrics.getSteadyFairShareVCores(), getErrorMessage("steadyFairShareVcores"));

    Resource steadyFairShare = metrics.getSteadyFairShare();
    assertEquals(2048L, steadyFairShare.getMemorySize(), getErrorMessage("steadyFairShareMB"));
    assertEquals(4L, steadyFairShare.getVirtualCores(), getErrorMessage("steadyFairShareVcores"));
    assertEquals(20L, steadyFairShare.getResourceValue(RESOURCE_NAME),
        getErrorMessage("steadyFairShare for resource: " +
        RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setSteadyFairShare(res);

    assertEquals(2049L, metrics.getSteadyFairShareMB(),
        getErrorMessage("steadyFairShareMB"));
    assertEquals(5L, metrics.getSteadyFairShareVCores(),
        getErrorMessage("steadyFairShareVcores"));

    steadyFairShare = metrics.getSteadyFairShare();
    assertEquals(2049L, steadyFairShare.getMemorySize(), getErrorMessage("steadyFairShareMB"));
    assertEquals(5L, steadyFairShare.getVirtualCores(), getErrorMessage("steadyFairShareVcores"));
    assertEquals(0, steadyFairShare.getResourceValue(RESOURCE_NAME),
        getErrorMessage("steadyFairShare for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetMinShare() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setMinShare(res);

    assertEquals(2048L, metrics.getMinShareMB(), getErrorMessage("minShareMB"));
    assertEquals(4L, metrics.getMinShareVirtualCores(), getErrorMessage("minShareVcores"));
    assertEquals(2048L, metrics.getMinShare().getMemorySize(), getErrorMessage("minShareMB"));
    assertEquals(4L, metrics.getMinShare().getVirtualCores(), getErrorMessage("minShareVcores"));
    assertEquals(20L, metrics.getMinShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("minShare for resource: " + RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setMinShare(res);

    assertEquals(2049L, metrics.getMinShareMB(), getErrorMessage("minShareMB"));
    assertEquals(5L, metrics.getMinShareVirtualCores(), getErrorMessage("minShareVcores"));
    assertEquals(2049L, metrics.getMinShare().getMemorySize(), getErrorMessage("minShareMB"));
    assertEquals(5L, metrics.getMinShare().getVirtualCores(), getErrorMessage("minShareVcores"));
    assertEquals(0, metrics.getMinShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("minShare for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetMaxShare() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setMaxShare(res);

    assertEquals(2048L, metrics.getMaxShareMB(), getErrorMessage("maxShareMB"));
    assertEquals(4L, metrics.getMaxShareVirtualCores(), getErrorMessage("maxShareVcores"));
    assertEquals(2048L, metrics.getMaxShare().getMemorySize(), getErrorMessage("maxShareMB"));
    assertEquals(4L, metrics.getMaxShare().getVirtualCores(), getErrorMessage("maxShareVcores"));
    assertEquals(20L, metrics.getMaxShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("maxShare for resource: " + RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setMaxShare(res);

    assertEquals(2049L, metrics.getMaxShareMB(), getErrorMessage("maxShareMB"));
    assertEquals(5L, metrics.getMaxShareVirtualCores(), getErrorMessage("maxShareVcores"));
    assertEquals(2049L, metrics.getMaxShare().getMemorySize(), getErrorMessage("maxShareMB"));
    assertEquals(5L, metrics.getMaxShare().getVirtualCores(), getErrorMessage("maxShareVcores"));
    assertEquals(0, metrics.getMaxShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("maxShare for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetMaxAMShare() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setMaxAMShare(res);

    assertEquals(2048L, metrics.getMaxAMShareMB(), getErrorMessage("maxAMShareMB"));
    assertEquals(4L, metrics.getMaxAMShareVCores(), getErrorMessage("maxAMShareVcores"));
    assertEquals(2048L, metrics.getMaxAMShare().getMemorySize(),
        getErrorMessage("maxAMShareMB"));
    assertEquals(4L, metrics.getMaxAMShare().getVirtualCores(),
        getErrorMessage("maxAMShareVcores"));
    assertEquals(20L, metrics.getMaxAMShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("maxAMShare for resource: " + RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setMaxAMShare(res);

    assertEquals(2049L, metrics.getMaxAMShareMB(), getErrorMessage("maxAMShareMB"));
    assertEquals(5L, metrics.getMaxAMShareVCores(), getErrorMessage("maxAMShareVcores"));
    assertEquals(2049L, metrics.getMaxAMShare().getMemorySize(), getErrorMessage("maxAMShareMB"));
    assertEquals(5L, metrics.getMaxAMShare().getVirtualCores(),
        getErrorMessage("maxAMShareVcores"));
    assertEquals(0, metrics.getMaxAMShare().getResourceValue(RESOURCE_NAME),
        getErrorMessage("maxAMShare for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetAMResourceUsage() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);

    Resource res = Resource.newInstance(2048L, 4, ImmutableMap.of(RESOURCE_NAME,
        20L));
    metrics.setAMResourceUsage(res);

    assertEquals(2048L, metrics.getAMResourceUsageMB(), getErrorMessage("AMResourceUsageMB"));
    assertEquals(4L, metrics.getAMResourceUsageVCores(), getErrorMessage("AMResourceUsageVcores"));

    Resource amResourceUsage = metrics.getAMResourceUsage();
    assertEquals(2048L, amResourceUsage.getMemorySize(), getErrorMessage("AMResourceUsageMB"));
    assertEquals(4L, amResourceUsage.getVirtualCores(), getErrorMessage("AMResourceUsageVcores"));
    assertEquals(20L, amResourceUsage.getResourceValue(RESOURCE_NAME),
        getErrorMessage("AMResourceUsage for resource: " + RESOURCE_NAME));

    res = Resource.newInstance(2049L, 5);
    metrics.setAMResourceUsage(res);

    assertEquals(2049L, metrics.getAMResourceUsageMB(), getErrorMessage("AMResourceUsageMB"));
    assertEquals(5L, metrics.getAMResourceUsageVCores(), getErrorMessage("AMResourceUsageVcores"));

    amResourceUsage = metrics.getAMResourceUsage();
    assertEquals(2049L, amResourceUsage.getMemorySize(), getErrorMessage("AMResourceUsageMB"));
    assertEquals(5L, amResourceUsage.getVirtualCores(), getErrorMessage("AMResourceUsageVcores"));
    assertEquals(0, amResourceUsage.getResourceValue(RESOURCE_NAME),
        getErrorMessage("AMResourceUsage for resource: " + RESOURCE_NAME));
  }

  @Test
  public void testSetMaxApps() {
    FSQueueMetrics metrics = setupMetrics(RESOURCE_NAME);
    metrics.setMaxApps(25);
    assertEquals(25L, metrics.getMaxApps(), getErrorMessage("maxApps"));
  }
}