TestSchedulingPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Stack;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
    .allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
    .allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestSchedulingPolicy {
  private static final Logger LOG =
      LoggerFactory.getLogger(TestSchedulingPolicy.class);
  private final static String ALLOC_FILE =
      new File(FairSchedulerTestBase.TEST_DIR, "test-queues").getAbsolutePath();
  private FairSchedulerConfiguration conf;
  private FairScheduler scheduler;

  @BeforeEach
  public void setUp() throws Exception {
    scheduler = new FairScheduler();
    conf = new FairSchedulerConfiguration();
    // since this runs outside of the normal context we need to set one
    RMContext rmContext = mock(RMContext.class);
    PlacementManager placementManager = new PlacementManager();
    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
    scheduler.setRMContext(rmContext);
  }

  public void testParseSchedulingPolicy()
      throws AllocationConfigurationException {

    // Class name
    SchedulingPolicy sm = SchedulingPolicy
        .parse(FairSharePolicy.class.getName());
    assertTrue(sm.getName().equals(FairSharePolicy.NAME),
        "Invalid scheduler name");

    // Canonical name
    sm = SchedulingPolicy.parse(FairSharePolicy.class
        .getCanonicalName());
    assertTrue(sm.getName().equals(FairSharePolicy.NAME),
        "Invalid scheduler name");

    // Class
    sm = SchedulingPolicy.getInstance(FairSharePolicy.class);
    assertTrue(sm.getName().equals(FairSharePolicy.NAME),
        "Invalid scheduler name");

    // Shortname - drf
    sm = SchedulingPolicy.parse("drf");
    assertTrue(sm.getName().equals(DominantResourceFairnessPolicy.NAME),
        "Invalid scheduler name");
    
    // Shortname - fair
    sm = SchedulingPolicy.parse("fair");
    assertTrue(sm.getName().equals(FairSharePolicy.NAME),
        "Invalid scheduler name");

    // Shortname - fifo
    sm = SchedulingPolicy.parse("fifo");
    assertTrue(sm.getName().equals(FifoPolicy.NAME),
        "Invalid scheduler name");
  }

  /**
   * Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
   */
  @Test
  public void testFairShareComparatorTransitivity() {
    FairSharePolicy policy = new FairSharePolicy();
    Comparator<Schedulable> fairShareComparator = policy.getComparator();
    FairShareComparatorTester tester =
        new FairShareComparatorTester(fairShareComparator);
    tester.testTransitivity();
  }


  /**
   * This class is responsible for testing the transitivity of
   * {@link FairSharePolicy.FairShareComparator}. We will generate
   * a lot of triples(each triple contains three {@link Schedulable}),
   * and then we verify transitivity by using each triple.
   *
   * <p>How to generate:</p>
   * For each field in {@link Schedulable} we all have a data collection. We
   * combine these data to construct a {@link Schedulable}, and generate all
   * cases of triple by DFS(depth first search algorithm). We can get 100% code
   * coverage by DFS.
   */
  private class FairShareComparatorTester {
    private Comparator<Schedulable> fairShareComparator;

    // Use the following data collections to generate three Schedulable.
    private Resource minShare = Resource.newInstance(0, 1);

    private Resource demand = Resource.newInstance(4, 1);
    private Resource[] demandCollection = {
        Resource.newInstance(0, 0), Resource.newInstance(4, 1) };

    private String[] nameCollection = {"A", "B", "C"};

    private long[] startTimeColloection = {1L, 2L, 3L};

    private Resource[] usageCollection = {
        Resource.newInstance(0, 1), Resource.newInstance(2, 1),
        Resource.newInstance(4, 1) };

    private float[] weightsCollection = {0.0f, 1.0f, 2.0f};

    public FairShareComparatorTester(
        Comparator<Schedulable> fairShareComparator) {
      this.fairShareComparator = fairShareComparator;
    }

    public void testTransitivity() {
      generateAndTest(new Stack<Schedulable>());
    }

    private void generateAndTest(Stack<Schedulable> genSchedulable) {
      if (genSchedulable.size() == 3) {
        // We get three Schedulable objects, let's use them to check the
        // comparator.
        assertTrue(checkTransitivity(genSchedulable),
            "The comparator must ensure transitivity");
        return;
      }

      for (int i = 0; i < nameCollection.length; i++) {
        for (int j = 0; j < startTimeColloection.length; j++) {
          for (int k = 0; k < usageCollection.length; k++) {
            for (int t = 0; t < weightsCollection.length; t++) {
              for (int m = 0; m < demandCollection.length; m++) {
                genSchedulable.push(createSchedulable(m, i, j, k, t));
                generateAndTest(genSchedulable);
                genSchedulable.pop();
              }
            }
          }
        }
      }

    }

    private Schedulable createSchedulable(
        int demandId, int nameIdx, int startTimeIdx,
        int usageIdx, int weightsIdx) {
      return new MockSchedulable(minShare, demandCollection[demandId],
        nameCollection[nameIdx], startTimeColloection[startTimeIdx],
        usageCollection[usageIdx], weightsCollection[weightsIdx]);
    }

    private boolean checkTransitivity(
        Collection<Schedulable> schedulableObjs) {

      assertEquals(3, schedulableObjs.size());
      Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]);

      if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
        swap(copy, 0, 1);
      }

      if (fairShareComparator.compare(copy[1], copy[2]) > 0) {
        swap(copy, 1, 2);

        if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
          swap(copy, 0, 1);
        }
      }

      // Here, we have got the following condition:
      // copy[0] <= copy[1] && copy[1] <= copy[2]
      //
      // So, just check copy[0] <= copy[2]
      if (fairShareComparator.compare(copy[0], copy[2]) <= 0) {
        return true;
      } else {
        LOG.error("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]);
        return false;
      }
    }

    private void swap(Schedulable[] array, int x, int y) {
      Schedulable tmp = array[x];
      array[x] = array[y];
      array[y] = tmp;
    }


    private class MockSchedulable implements Schedulable {
      private Resource minShare;
      private Resource demand;
      private String name;
      private long startTime;
      private Resource usage;
      private float weights;

      public MockSchedulable(Resource minShare, Resource demand, String name,
          long startTime, Resource usage, float weights) {
        this.minShare = minShare;
        this.demand = demand;
        this.name = name;
        this.startTime = startTime;
        this.usage = usage;
        this.weights = weights;
      }

      @Override
      public String getName() {
        return name;
      }

      @Override
      public Resource getDemand() {
        return demand;
      }

      @Override
      public Resource getResourceUsage() {
        return usage;
      }

      @Override
      public Resource getMinShare() {
        return minShare;
      }

      @Override
      public float getWeight() {
        return weights;
      }

      @Override
      public long getStartTime() {
        return startTime;
      }

      @Override
      public Resource getMaxShare() {
        throw new UnsupportedOperationException();
      }

      @Override
      public Priority getPriority() {
        throw new UnsupportedOperationException();
      }

      @Override
      public void updateDemand() {
        throw new UnsupportedOperationException();
      }

      @Override
      public Resource assignContainer(FSSchedulerNode node) {
        throw new UnsupportedOperationException();
      }

      @Override
      public Resource getFairShare() {
        throw new UnsupportedOperationException();
      }

      @Override
      public void setFairShare(Resource fairShare) {
        throw new UnsupportedOperationException();
      }

      @Override
      public String toString() {
        return "{name:" + name + ", start:" + startTime + ", usage:" + usage +
            ", weights:" + weights + ", demand:" + demand +
            ", minShare:" + minShare + "}";
      }

      @Override
      public boolean isPreemptable() {
        return true;
      }
    }
  }

  @Test
  public void testSchedulingPolicyViolation() throws IOException {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    AllocationFileWriter.create()
        .drfDefaultQueueSchedulingPolicy()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("fair")
            .subQueue(new AllocationFileQueue.Builder("child1")
                .schedulingPolicy("drf").build())
            .subQueue(new AllocationFileQueue.Builder("child2")
                .schedulingPolicy("fair").build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.init(conf);

    FSQueue child1 = scheduler.getQueueManager().getQueue("child1");
    assertNull(child1, "Queue 'child1' should be null since its policy isn't allowed to"
        + " be 'drf' if its parent policy is 'fair'.");

    // dynamic queue
    FSQueue dynamicQueue = scheduler.getQueueManager().
        getLeafQueue("dynamicQueue", true);
    assertNull(dynamicQueue, "Dynamic queue should be null since it isn't allowed to be 'drf'"
        + " policy if its parent policy is 'fair'.");

    // Set child1 to 'fair' and child2 to 'drf', the reload the allocation file.
    AllocationFileWriter.create()
        .drfDefaultQueueSchedulingPolicy()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("fair")
            .subQueue(new AllocationFileQueue.Builder("child1")
                .schedulingPolicy("fair").build())
            .subQueue(new AllocationFileQueue.Builder("child2")
                .schedulingPolicy("drf").build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.reinitialize(conf, null);
    child1 = scheduler.getQueueManager().getQueue("child1");
    assertNotNull(child1, "Queue 'child1' should be not null since its policy is "
        + "allowed to be 'fair' if its parent policy is 'fair'.");

    // Detect the policy violation of Child2, keep the original policy instead
    // of setting the new policy.
    FSQueue child2 = scheduler.getQueueManager().getQueue("child2");
    assertTrue(child2.getPolicy() instanceof FairSharePolicy,
        "Queue 'child2' should be 'fair' since its new policy 'drf' "
        + "is not allowed.");
  }

  @Test
  public void testSchedulingPolicyViolationInTheMiddleLevel() {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("fair")
            .subQueue(new AllocationFileQueue.Builder("level2")
                .schedulingPolicy("fair")
                .subQueue(new AllocationFileQueue.Builder("level3")
                    .schedulingPolicy("drf")
                    .subQueue(new AllocationFileQueue.Builder("leaf")
                        .schedulingPolicy("fair").build())
                    .build())
                .build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.init(conf);

    FSQueue level2 = scheduler.getQueueManager().getQueue("level2");
    assertNotNull(level2, "Queue 'level2' shouldn't be null since its policy is allowed"
        + " to be 'fair' if its parent policy is 'fair'.");
    FSQueue level3 = scheduler.getQueueManager().getQueue("level2.level3");
    assertNull(level3, "Queue 'level3' should be null since its policy isn't allowed"
        + " to be 'drf' if its parent policy is 'fair'.");
    FSQueue leaf = scheduler.getQueueManager().getQueue("level2.level3.leaf");
    assertNull(leaf, "Queue 'leaf' should be null since its parent failed to create.");
  }

  @Test
  public void testFIFOPolicyOnlyForLeafQueues()
      throws IOException {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .subQueue(new AllocationFileQueue.Builder("intermediate")
                .schedulingPolicy("fifo")
                .subQueue(new AllocationFileQueue.Builder("leaf")
                    .schedulingPolicy("fair").build())
                .build())
            .build())
            .writeToFile(ALLOC_FILE);

    scheduler.init(conf);

    FSQueue intermediate = scheduler.getQueueManager().getQueue("intermediate");
    assertNull(intermediate, "Queue 'intermediate' should be null since 'fifo' is only for "
        + "leaf queue.");

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .subQueue(new AllocationFileQueue.Builder("intermediate")
                .schedulingPolicy("fair")
                .subQueue(new AllocationFileQueue.Builder("leaf")
                    .schedulingPolicy("fifo").build())
                .build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.reinitialize(conf, null);

    assertNotNull(scheduler.getQueueManager().getQueue("intermediate"));

    FSQueue leaf = scheduler.getQueueManager().getQueue("intermediate.leaf");
    assertNotNull(leaf, "Queue 'leaf' should be null since 'fifo' is only for "
        + "leaf queue.");
  }

  @Test
  public void testPolicyReinitialization() throws IOException {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("fair")
            .subQueue(new AllocationFileQueue.Builder("child1")
                .schedulingPolicy("fair").build())
            .subQueue(new AllocationFileQueue.Builder("child2")
                .schedulingPolicy("fair").build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.init(conf);

    // Set child1 to 'drf' which is not allowed, then reload the allocation file
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("fair")
            .subQueue(new AllocationFileQueue.Builder("child1")
                .schedulingPolicy("drf").build())
            .subQueue(new AllocationFileQueue.Builder("child2")
                .schedulingPolicy("fifo").build())
            .build())
        .writeToFile(ALLOC_FILE);
    scheduler.reinitialize(conf, null);

    FSQueue child1 = scheduler.getQueueManager().getQueue("child1");
    assertTrue(child1.getPolicy() instanceof FairSharePolicy,
        "Queue 'child1' should still be 'fair' since 'drf' isn't allowed"
        + " if its parent policy is 'fair'.");
    FSQueue child2 = scheduler.getQueueManager().getQueue("child2");
    assertTrue(child2.getPolicy() instanceof FairSharePolicy,
        "Queue 'child2' should still be 'fair' there is a policy"
        + " violation while reinitialization.");

    // Set both child1 and root to 'drf', then reload the allocation file
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("root")
            .schedulingPolicy("drf")
            .subQueue(new AllocationFileQueue.Builder("child1")
                .schedulingPolicy("drf").build())
            .subQueue(new AllocationFileQueue.Builder("child2")
                .schedulingPolicy("fifo").build())
            .build())
        .writeToFile(ALLOC_FILE);

    scheduler.reinitialize(conf, null);

    child1 = scheduler.getQueueManager().getQueue("child1");
    assertTrue(child1.getPolicy() instanceof DominantResourceFairnessPolicy,
        "Queue 'child1' should be 'drf' since both 'root' and 'child1'" +
        " are 'drf'.");
    child2 = scheduler.getQueueManager().getQueue("child2");
    assertTrue(child2.getPolicy() instanceof FifoPolicy,
        "Queue 'child2' should still be 'fifo' there is no policy" +
        " violation while reinitialization.");
  }
}