PlanningAlgorithm.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;

import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;

/**
 * An abstract class that follows the general behavior of planning algorithms.
 */
public abstract class PlanningAlgorithm implements ReservationAgent {

  /**
   * Performs the actual allocation for a ReservationDefinition within a Plan.
   *
   * @param reservationId the identifier of the reservation
   * @param user the user who owns the reservation
   * @param plan the Plan to which the reservation must be fitted
   * @param contract encapsulates the resources required by the user for his
   *          session
   * @param oldReservation the existing reservation (null if none)
   * @return whether the allocateUser function was successful or not
   *
   * @throws PlanningException if the session cannot be fitted into the plan
   * @throws ContractValidationException if validation fails
   */
  protected boolean allocateUser(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract,
      ReservationAllocation oldReservation) throws PlanningException,
      ContractValidationException {

    // Adjust the ResourceDefinition to account for system "imperfections"
    // (e.g., scheduling delays for large containers).
    ReservationDefinition adjustedContract = adjustContract(plan, contract);

    // Compute the job allocation
    RLESparseResourceAllocation allocation =
            computeJobAllocation(plan, reservationId, adjustedContract, user);

    long period = Long.parseLong(contract.getRecurrenceExpression());

    // Make allocation periodic if request is periodic
    if (contract.getRecurrenceExpression() != null) {
      if (period > 0) {
        allocation =
            new PeriodicRLESparseResourceAllocation(allocation, period);
      }
    }

    // If no job allocation was found, fail
    if (allocation == null) {
      throw new PlanningException(
              "The planning algorithm could not find a valid allocation"
                      + " for your request");
    }

    // Translate the allocation to a map (with zero paddings)
    long step = plan.getStep();

    long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
    long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);

    Map<ReservationInterval, Resource> mapAllocations =
        allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);

    // Create the reservation
    ReservationAllocation capReservation =
        new InMemoryReservationAllocation(reservationId, // ID
            adjustedContract, // Contract
            user, // User name
            plan.getQueueName(), // Queue name
            adjustedContract.getArrival(), adjustedContract.getDeadline(),
            mapAllocations, // Allocations
            plan.getResourceCalculator(), // Resource calculator
            plan.getMinimumAllocation()); // Minimum allocation

    // Add (or update) the reservation allocation
    if (oldReservation != null) {
      return plan.updateReservation(capReservation);
    } else {
      return plan.addReservation(capReservation, false);
    }

  }

  private Map<ReservationInterval, Resource> allocationsToPaddedMap(
      RLESparseResourceAllocation allocation, long jobArrival, long jobDeadline,
      long period) {

    // Zero allocation
    Resource zeroResource = Resource.newInstance(0, 0);

    if (period > 0) {
      if ((jobDeadline - jobArrival) >= period) {
        allocation.addInterval(new ReservationInterval(0L, period),
            zeroResource);
      }
      jobArrival = jobArrival % period;
      jobDeadline = jobDeadline % period;

      if (jobArrival <= jobDeadline) {
        allocation.addInterval(new ReservationInterval(0, jobArrival),
            zeroResource);
        allocation.addInterval(new ReservationInterval(jobDeadline, period),
            zeroResource);
      } else {
        allocation.addInterval(new ReservationInterval(jobDeadline, jobArrival),
            zeroResource);
      }
    } else {
      // Pad at the beginning
      long earliestStart = findEarliestTime(allocation.toIntervalMap());
      if (jobArrival < earliestStart) {
        allocation.addInterval(
            new ReservationInterval(jobArrival, earliestStart), zeroResource);
      }

      // Pad at the beginning
      long latestEnd = findLatestTime(allocation.toIntervalMap());
      if (latestEnd < jobDeadline) {
        allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
            zeroResource);
      }
    }
    return allocation.toIntervalMap();
  }

  public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
      ReservationId reservationId, ReservationDefinition reservation,
      String user) throws PlanningException, ContractValidationException;

  @Override
  public boolean createReservation(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract) throws PlanningException {

    // Allocate
    return allocateUser(reservationId, user, plan, contract, null);

  }

  @Override
  public boolean updateReservation(ReservationId reservationId, String user,
      Plan plan, ReservationDefinition contract) throws PlanningException {

    // Get the old allocation
    ReservationAllocation oldAlloc = plan.getReservationById(reservationId);

    // Allocate (ignores the old allocation)
    return allocateUser(reservationId, user, plan, contract, oldAlloc);

  }

  @Override
  public boolean deleteReservation(ReservationId reservationId, String user,
      Plan plan) throws PlanningException {

    // Delete the existing reservation
    return plan.deleteReservation(reservationId);

  }

  protected static long findEarliestTime(
      Map<ReservationInterval, Resource> sesInt) {

    long ret = Long.MAX_VALUE;
    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
      if (s.getKey().getStartTime() < ret && s.getValue() != null) {
        ret = s.getKey().getStartTime();
      }
    }
    return ret;

  }

  protected static long findLatestTime(Map<ReservationInterval,
      Resource> sesInt) {

    long ret = Long.MIN_VALUE;
    for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
      if (s.getKey().getEndTime() > ret && s.getValue() != null) {
        ret = s.getKey().getEndTime();
      }
    }
    return ret;

  }

  protected static long stepRoundDown(long t, long step) {
    return (t / step) * step;
  }

  protected static long stepRoundUp(long t, long step) {
    return ((t + step - 1) / step) * step;
  }

  private ReservationDefinition adjustContract(Plan plan,
      ReservationDefinition originalContract) {

    // Place here adjustment. For example using QueueMetrics we can track
    // large container delays per YARN-YARN-1990

    return originalContract;

  }

  @Override
  public void init(Configuration conf) {
  }
}