TestDominantResourceFairnessPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparatorN;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparator2;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * comparator.compare(sched1, sched2) < 0 means that sched1 should get a
 * container before sched2
 */
public class TestDominantResourceFairnessPolicy {
  @BeforeEach
  public void setup() {
    addResources("test");
  }

  private Comparator<Schedulable> createComparator(int clusterMem,
      int clusterCpu) {
    DominantResourceFairnessPolicy policy =
        new DominantResourceFairnessPolicy();
    FSContext fsContext = mock(FSContext.class);
    when(fsContext.getClusterResource()).
        thenReturn(Resources.createResource(clusterMem, clusterCpu));
    policy.initialize(fsContext);
    return policy.getComparator();
  }
  
  private Schedulable createSchedulable(int memUsage, int cpuUsage) {
    return createSchedulable(memUsage, cpuUsage, 1.0f, 0, 0);
  }
  
  private Schedulable createSchedulable(int memUsage, int cpuUsage,
      int minMemShare, int minCpuShare) {
    return createSchedulable(memUsage, cpuUsage, 1.0f,
        minMemShare, minCpuShare);
  }
  
  private Schedulable createSchedulable(int memUsage, int cpuUsage,
      float weights) {
    return createSchedulable(memUsage, cpuUsage, weights, 0, 0);
  }

  private Schedulable createSchedulable(int memUsage, int cpuUsage,
      float weights, int minMemShare, int minCpuShare) {
    Resource usage = Resources.createResource(memUsage, cpuUsage);
    Resource minShare = Resources.createResource(minMemShare, minCpuShare);
    return new FakeSchedulable(minShare,
        Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE),
        weights, Resources.none(), usage, 0l);
  }
  
  @Test
  public void testSameDominantResource() {
    Comparator c = createComparator(8000, 4);
    Schedulable s1 = createSchedulable(1000, 1);
    Schedulable s2 = createSchedulable(2000, 1);

    assertTrue(c.compare(s1, s2) < 0,
        "Comparison didn't return a value less than 0");
  }
  
  @Test
  public void testSameDominantResource2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testSameDominantResource();
  }

  @Test
  public void testDifferentDominantResource() {
    Comparator c = createComparator(8000, 8);
    Schedulable s1 = createSchedulable(4000, 3);
    Schedulable s2 = createSchedulable(2000, 5);

    assertTrue(c.compare(s1, s2) < 0,
        "Comparison didn't return a value less than 0");
  }
  
  @Test
  public void testDifferentDominantResource2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testDifferentDominantResource();
  }

  @Test
  public void testOneIsNeedy() {
    Comparator c = createComparator(8000, 8);
    Schedulable s1 = createSchedulable(2000, 5, 0, 6);
    Schedulable s2 = createSchedulable(4000, 3, 0, 0);

    assertTrue(c.compare(s1, s2) < 0,
        "Comparison didn't return a value less than 0");
  }
  
  @Test
  public void testOneIsNeedy2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testOneIsNeedy();
  }

  @Test
  public void testBothAreNeedy() {
    Comparator c = createComparator(8000, 100);
    // dominant share is 2000/8000
    Schedulable s1 = createSchedulable(2000, 5);
    // dominant share is 4000/8000
    Schedulable s2 = createSchedulable(4000, 3);

    assertTrue(c.compare(s1, s2) < 0,
        "Comparison didn't return a value less than 0");

    // dominant min share is 2/3
    s1 = createSchedulable(2000, 5, 3000, 6);
    // dominant min share is 4/5
    s2 = createSchedulable(4000, 3, 5000, 4);

    assertTrue(c.compare(s1, s2) < 0,
        "Comparison didn't return a value less than 0");
  }
  
  @Test
  public void testBothAreNeedy2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testBothAreNeedy();
  }

  @Test
  public void testEvenWeightsSameDominantResource() {
    assertTrue(createComparator(8000, 8).compare(
        createSchedulable(3000, 1, 2.0f),
        createSchedulable(2000, 1)) < 0);
    assertTrue(createComparator(8000, 8).compare(
        createSchedulable(1000, 3, 2.0f),
        createSchedulable(1000, 2)) < 0);
  }
  
  @Test
  public void testEvenWeightsSameDominantResource2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testEvenWeightsSameDominantResource();
  }

  @Test
  public void testEvenWeightsDifferentDominantResource() {
    assertTrue(createComparator(8000, 8).compare(
        createSchedulable(1000, 3, 2.0f),
        createSchedulable(2000, 1)) < 0);
    assertTrue(createComparator(8000, 8).compare(
        createSchedulable(3000, 1, 2.0f),
        createSchedulable(1000, 2)) < 0);
  }
  
  @Test
  public void testEvenWeightsDifferentDominantResource2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    testEvenWeightsDifferentDominantResource();
  }

  @Test
  public void testSortShares() {
    float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}};
    float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}};
    float[][] expected1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}};
    float[][] expected2 = {{0.3f, 2.0f}, {0.25f, 0.1f}, {0.2f, 9.0f}};
    DominantResourceFairnessComparatorN comparator =
        new DominantResourceFairnessComparatorN();

    comparator.sortRatios(ratios1, ratios2);

    for (int i = 0; i < ratios1.length; i++) {
      assertArrayEquals(
          expected1[i], ratios1[i], 0.00001f, "The shares array was not sorted into the "
          + "expected order: incorrect inner array encountered");
      assertArrayEquals(
          expected2[i], ratios2[i], 0.00001f, "The shares array was not sorted into the "
          + "expected order: incorrect inner array encountered");
    }
  }

  @Test
  public void testCalculateClusterAndFairRatios() {
    Map<String, Integer> index = ResourceUtils.getResourceTypeIndex();
    Resource used = Resources.createResource(10, 5);
    Resource capacity = Resources.createResource(100, 10);
    float[][] shares = new float[3][2];
    DominantResourceFairnessComparatorN comparator =
        new DominantResourceFairnessComparatorN();

    used.setResourceValue("test", 2L);
    capacity.setResourceValue("test", 5L);

    int dominant = comparator.calculateClusterAndFairRatios(used, capacity,
        shares, 1.0f);

    assertEquals(0.1,
        shares[index.get(ResourceInformation.MEMORY_MB.getName())][0], .00001,
        "Calculated usage ratio for memory (10MB out of 100MB) is incorrect");
    assertEquals(0.5,
        shares[index.get(ResourceInformation.VCORES.getName())][0], .00001,
        "Calculated usage ratio for vcores (5 out of 10) is incorrect");
    assertEquals(0.4, shares[index.get("test")][0], .00001,
        "Calculated usage ratio for test resource (2 out of 5) is incorrect");
    assertEquals(index.get(ResourceInformation.VCORES.getName()).intValue(),
        dominant, "The wrong dominant resource index was returned");
  }

  @Test
  public void testCalculateClusterAndFairRatios2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    Resource used = Resources.createResource(10, 5);
    Resource capacity = Resources.createResource(100, 10);
    double[] shares = new double[2];
    DominantResourceFairnessComparator2 comparator =
        new DominantResourceFairnessComparator2();
    int dominant =
        comparator.calculateClusterAndFairRatios(used.getResources(), 1.0f,
            capacity.getResources(), shares);

    assertEquals(0.1, shares[Resource.MEMORY_INDEX], .00001,
        "Calculated usage ratio for memory (10MB out of 100MB) is "
        + "incorrect");
    assertEquals(0.5, shares[Resource.VCORES_INDEX], .00001,
        "Calculated usage ratio for vcores (5 out of 10) is "
        + "incorrect");
    assertEquals(Resource.VCORES_INDEX, dominant,
        "The wrong dominant resource index was returned");
  }

  @Test
  public void testCalculateMinShareRatios() {
    Map<String, Integer> index = ResourceUtils.getResourceTypeIndex();
    Resource used = Resources.createResource(10, 5);
    Resource minShares = Resources.createResource(5, 10);
    float[][] ratios = new float[3][3];
    DominantResourceFairnessComparatorN comparator =
        new DominantResourceFairnessComparatorN();

    used.setResourceValue("test", 2L);
    minShares.setResourceValue("test", 0L);

    comparator.calculateMinShareRatios(used, minShares, ratios);

    assertEquals(2.0, ratios[index.get(ResourceInformation.MEMORY_MB.getName())][2],
        .00001f, "Calculated min share ratio for memory (10MB out of 5MB) is "
        + "incorrect");
    assertEquals(0.5, ratios[index.get(ResourceInformation.VCORES.getName())][2],
        .00001f, "Calculated min share ratio for vcores (5 out of 10) is "
        + "incorrect");
    assertEquals(Float.POSITIVE_INFINITY, ratios[index.get("test")][2],
        0.00001f, "Calculated min share ratio for test resource (0 out of 5) is "
        + "incorrect");
  }

  @Test
  public void testCalculateMinShareRatios2() {
    ResourceUtils.resetResourceTypes(new Configuration());
    Resource used = Resources.createResource(10, 5);
    Resource minShares = Resources.createResource(5, 10);
    DominantResourceFairnessComparator2 comparator =
        new DominantResourceFairnessComparator2();

    double[] ratios =
        comparator.calculateMinShareRatios(used.getResources(),
            minShares.getResources());

    assertEquals(2.0, ratios[Resource.MEMORY_INDEX], .00001f,
        "Calculated min share ratio for memory (10MB out of 5MB) is "
        + "incorrect");
    assertEquals(0.5, ratios[Resource.VCORES_INDEX], .00001f,
        "Calculated min share ratio for vcores (5 out of 10) is "
        + "incorrect");
  }

  @Test
  public void testCompareShares() {
    float[][] ratios1 = {
        {0.4f, 0.1f, 2.0f},
        {0.3f, 2.0f, 0.1f},
        {0.2f, 1.0f, 9.0f}
    };
    float[][] ratios2 = {
        {0.3f, 2.0f, 1.0f},
        {0.2f, 0.1f, 0.5f},
        {0.2f, 1.0f, 2.0f}
    };
    float[][] ratios3 = {
        {0.3f, 2.0f, 1.0f},
        {0.2f, 0.1f, 2.0f},
        {0.1f, 2.0f, 1.0f}
    };
    DominantResourceFairnessComparatorN comparator =
        new DominantResourceFairnessComparatorN();

    int ret = comparator.compareRatios(ratios1, ratios2, 0);

    assertEquals(1, ret, "Expected the first array to be larger because the first "
        + "usage ratio element is larger");

    ret = comparator.compareRatios(ratios2, ratios1, 0);

    assertEquals(-1, ret, "Expected the first array to be smaller because the first "
        + "usage ratio element is smaller");

    ret = comparator.compareRatios(ratios1, ratios1, 0);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios2, 0);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios3, ratios3, 0);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios3, 0);

    assertEquals(1, ret, "Expected the first array to be larger because the last "
        + "usage ratio element is larger, and all other elements are equal");

    ret = comparator.compareRatios(ratios1, ratios2, 1);

    assertEquals(-1, ret, "Expected the first array to be smaller because the first "
        + "fair share ratio element is smaller");

    ret = comparator.compareRatios(ratios2, ratios1, 1);

    assertEquals(1, ret, "Expected the first array to be larger because the first "
        + "fair share ratio element is larger");

    ret = comparator.compareRatios(ratios1, ratios1, 1);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios2, 1);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios3, ratios3, 1);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios3, 1);

    assertEquals(-1, ret, "Expected the first array to be smaller because the last "
        + "usage ratio element is smaller, and all other elements are equal");

    ret = comparator.compareRatios(ratios1, ratios2, 2);

    assertEquals(1, ret, "Expected the first array to be larger because the first "
        + "min share ratio element is larger");

    ret = comparator.compareRatios(ratios2, ratios1, 2);

    assertEquals(-1, ret, "Expected the first array to be smaller because the first "
        + "min share ratio element is smaller");

    ret = comparator.compareRatios(ratios1, ratios1, 2);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios2, 2);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios3, ratios3, 2);

    assertEquals(0, ret, "Expected the arrays to be equal, since they're the same "
        + "array");

    ret = comparator.compareRatios(ratios2, ratios3, 2);

    assertEquals(-1, ret, "Expected the first array to be smaller because the second "
        + "min share ratio element is smaller, and all the first elements are "
        + "equal");
  }

  @Test
  public void testCompareSchedulablesWithClusterResourceChanges(){
    Schedulable schedulable1 = createSchedulable(2000, 1);
    Schedulable schedulable2 = createSchedulable(1000, 2);

    // schedulable1 has share weights [1/2, 1/5], schedulable2 has share
    // weights [1/4, 2/5], schedulable1 > schedulable2 since 1/2 > 2/5
    assertTrue(createComparator(4000, 5)
        .compare(schedulable1, schedulable2) > 0);

    // share weights have changed because of the cluster resource change.
    // schedulable1 has share weights [1/4, 1/6], schedulable2 has share
    // weights [1/8, 1/3], schedulable1 < schedulable2 since 1/4 < 1/3
    assertTrue(createComparator(8000, 6)
        .compare(schedulable1, schedulable2) < 0);
  }

  private static void addResources(String... resources) {
    Configuration conf = new Configuration();

    // Add a third resource to the allowed set
    conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources));
    ResourceUtils.resetResourceTypes(conf);
  }

  @Test
  public void testModWhileSorting(){
    final List<FakeSchedulable> schedulableList = new ArrayList<>();
    for (int i=0; i<10000; i++) {
      schedulableList.add(
          (FakeSchedulable)createSchedulable((i%10)*100, (i%3)*2));
    }
    Comparator DRFComparator = createComparator(100000, 50000);

    /*
     * The old sort should fail, but timing it makes testing to flaky.
     * TimSort which is used does not handle the concurrent modification of
     * objects it is sorting. This is the test that should fail:
     *  modThread.start();
     *  try {
     *    Collections.sort(schedulableList, DRFComparator);
     *  } catch (IllegalArgumentException iae) {
     *    // failed sort
     *  }
     */

    TreeSet<Schedulable> sortedSchedulable = new TreeSet<>(DRFComparator);
    Thread modThread = modificationThread(schedulableList);
    modThread.start();
    sortedSchedulable.addAll(schedulableList);
    try {
      modThread.join();
    } catch (InterruptedException ie) {
      fail("ModThread join failed: " + ie.getMessage());
    }
  }

  /**
   * Thread to simulate concurrent schedulable changes while sorting
   */
  private Thread modificationThread(final List<FakeSchedulable> schedulableList) {
    Thread modThread  = new Thread() {
      @Override
      public void run() {
        try {
          // This sleep is needed to make sure the sort has started before the
          // modifications start and finish
          Thread.sleep(500);
        } catch (InterruptedException ie) {
          fail("Modification thread interrupted while asleep " +
              ie.getMessage());
        }
        Resource newUsage = Resources.createResource(0, 0);
        for (int j = 0; j < 1000; j++) {
          FakeSchedulable sched = schedulableList.get(j * 10);
          newUsage.setMemorySize(20000);
          newUsage.setVirtualCores(j % 10);
          sched.setResourceUsage(newUsage);
        }
      }
    };
    return modThread;
  }
}