BaseSharingPolicyTest.java

/*******************************************************************************
 *   Licensed to the Apache Software Foundation (ASF) under one
 *   or more contributor license agreements.  See the NOTICE file
 *   distributed with this work for additional information
 *   regarding copyright ownership.  The ASF licenses this file
 *   to you under the Apache License, Version 2.0 (the
 *   "License"); you may not use this file except in compliance
 *   with the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import net.jcip.annotations.NotThreadSafe;

/**
 * This class is a base test for {@code SharingPolicy} implementors.
 */
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public abstract class BaseSharingPolicyTest {

  @Parameterized.Parameter(value = 0)
  public long duration;

  @Parameterized.Parameter(value = 1)
  public double height;

  @Parameterized.Parameter(value = 2)
  public int numSubmissions;

  @Parameterized.Parameter(value = 3)
  public String recurrenceExpression;

  @Parameterized.Parameter(value = 4)
  public Class expectedError;

  private long step;
  private long initTime;

  private InMemoryPlan plan;
  private ReservationAgent mAgent;
  private Resource minAlloc;
  private ResourceCalculator res;
  private Resource maxAlloc;

  private int totCont = 1000;

  protected ReservationSchedulerConfiguration conf;

  @Before
  public void setup() {
    // 1 sec step
    step = 1000L;
    initTime = System.currentTimeMillis();

    minAlloc = Resource.newInstance(1024, 1);
    res = new DefaultResourceCalculator();
    maxAlloc = Resource.newInstance(1024 * 8, 8);

    mAgent = mock(ReservationAgent.class);

    QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
    Resource clusterResource =
        ReservationSystemTestUtil.calculateClusterResource(totCont);

    // invoke implementors initialization of policy
    SharingPolicy policy = getInitializedPolicy();

    RMContext context = ReservationSystemTestUtil.createMockRMContext();

    plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
        step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
  }

  public void runTest() throws IOException, PlanningException {

    long period = 1;
    if (recurrenceExpression != null) {
      period = Long.parseLong(recurrenceExpression);
    }

    try {
      RLESparseResourceAllocation rle = generateRLEAlloc(period);

      // Generate the intervalMap (trimming out-of-period entries)
      Map<ReservationInterval, Resource> reservationIntervalResourceMap;
      if (period > 1) {
        rle = new PeriodicRLESparseResourceAllocation(rle, period);
        reservationIntervalResourceMap =
            ReservationSystemTestUtil.toAllocation(rle, 0, period);
      } else {
        reservationIntervalResourceMap = ReservationSystemTestUtil
            .toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
      }

      ReservationDefinition rDef =
          ReservationSystemTestUtil.createSimpleReservationDefinition(
              initTime % period, initTime % period + duration + 1, duration, 1,
              recurrenceExpression);

      // perform multiple submissions where required
      for (int i = 0; i < numSubmissions; i++) {

        long rstart = rle.getEarliestStartTime();
        long rend = rle.getLatestNonNullTime();

        InMemoryReservationAllocation resAlloc =
            new InMemoryReservationAllocation(
                ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
                "dedicated", rstart, rend, reservationIntervalResourceMap, res,
                minAlloc);

        assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
      }
      // fail if error was expected
      if (expectedError != null) {
        System.out.println(plan.toString());
        fail();
      }
    } catch (Exception e) {
      if (expectedError == null || !e.getClass().getCanonicalName()
          .equals(expectedError.getCanonicalName())) {
        // fail on unexpected errors
        throw e;
      }
    }
  }

  private RLESparseResourceAllocation generateRLEAlloc(long period) {
    RLESparseResourceAllocation rle =
        new RLESparseResourceAllocation(new DefaultResourceCalculator());

    Resource alloc = Resources.multiply(minAlloc, height * totCont);

    // loop in case the periodicity of the reservation is smaller than LCM
    long rStart = initTime % period;
    long rEnd = initTime % period + duration;


    // handle wrap-around
    if (period > 1 && rEnd > period) {
      long diff = rEnd - period;
      rEnd = period;

      // handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
      if(duration > period) {
        rle.addInterval(new ReservationInterval(0, period),
            Resources.multiply(alloc, duration / period - 1));
        rle.addInterval(new ReservationInterval(0, diff % period), alloc);
      } else {
        rle.addInterval(new ReservationInterval(0, diff), alloc);
      }
    }

    if(rStart > rEnd){
      rle.addInterval(new ReservationInterval(rStart, period), alloc);
      rle.addInterval(new ReservationInterval(0, rEnd), alloc);
    } else {
      rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
    }
    return rle;
  }

  public abstract SharingPolicy getInitializedPolicy();
}