AbstractReservationSystem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.CapacityReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.FairReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This is the implementation of {@link ReservationSystem} based on the
 * {@link ResourceScheduler}
 */
@LimitedPrivate("yarn")
@Unstable
public abstract class AbstractReservationSystem extends AbstractService
    implements ReservationSystem {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractReservationSystem.class);

  // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN

  private final ReentrantReadWriteLock readWriteLock =
      new ReentrantReadWriteLock(true);
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  private boolean initialized = false;

  private final Clock clock = new UTCClock();

  private AtomicLong resCounter = new AtomicLong();

  private Map<String, Plan> plans = new HashMap<String, Plan>();

  private Map<ReservationId, String> resQMap =
      new HashMap<ReservationId, String>();

  private RMContext rmContext;

  private ResourceScheduler scheduler;

  private ScheduledExecutorService scheduledExecutorService;

  protected Configuration conf;

  protected long planStepSize;

  private PlanFollower planFollower;

  private ReservationsACLsManager reservationsACLsManager;

  private boolean isRecoveryEnabled = false;

  private long maxPeriodicity;

  /**
   * Construct the service.
   * 
   * @param name service name
   */
  public AbstractReservationSystem(String name) {
    super(name);
  }

  @Override
  public void setRMContext(RMContext rmContext) {
    writeLock.lock();
    try {
      this.rmContext = rmContext;
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void reinitialize(Configuration conf, RMContext rmContext)
      throws YarnException {
    writeLock.lock();
    try {
      if (!initialized) {
        initialize(conf);
        initialized = true;
      } else {
        initializeNewPlans(conf);
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void initialize(Configuration conf) throws YarnException {
    LOG.info("Initializing Reservation system");
    this.conf = conf;
    scheduler = rmContext.getScheduler();
    // Get the plan step size
    planStepSize = conf.getTimeDuration(
        YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
        YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
        TimeUnit.MILLISECONDS);
    if (planStepSize < 0) {
      planStepSize =
          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
    }
    maxPeriodicity =
        conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY,
            YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY);
    if (maxPeriodicity <= 0) {
      maxPeriodicity =
          YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY;
    }
    // Create a plan corresponding to every reservable queue
    Set<String> planQueueNames = scheduler.getPlanQueues();
    for (String planQueueName : planQueueNames) {
      Plan plan = initializePlan(planQueueName);
      plans.put(planQueueName, plan);
    }
    isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);

    if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE,
        YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE)
        && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
            YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) {
      if (scheduler instanceof CapacityScheduler) {
        reservationsACLsManager = new CapacityReservationsACLsManager(scheduler,
            conf);
      } else if (scheduler instanceof FairScheduler) {
        reservationsACLsManager = new FairReservationsACLsManager(scheduler,
            conf);
      }
    }
  }

  private void loadPlan(String planName,
      Map<ReservationId, ReservationAllocationStateProto> reservations)
      throws PlanningException {
    Plan plan = plans.get(planName);
    Resource minAllocation = getMinAllocation();
    ResourceCalculator rescCalculator = getResourceCalculator();
    for (Entry<ReservationId, ReservationAllocationStateProto> currentReservation : reservations
        .entrySet()) {
      plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
          currentReservation.getKey(), currentReservation.getValue(),
          minAllocation, rescCalculator), true);
      resQMap.put(currentReservation.getKey(), planName);
    }
    LOG.info("Recovered reservations for Plan: {}", planName);
  }

  @Override
  public void recover(RMState state) throws Exception {
    LOG.info("Recovering Reservation system");
    writeLock.lock();
    try {
      Map<String, Map<ReservationId, ReservationAllocationStateProto>> reservationSystemState =
          state.getReservationState();
      if (planFollower != null) {
        for (String plan : plans.keySet()) {
          // recover reservations if any from state store
          if (reservationSystemState.containsKey(plan)) {
            loadPlan(plan, reservationSystemState.get(plan));
          }
          synchronizePlan(plan, false);
        }
        startPlanFollower(conf.getLong(
            YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
            YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void initializeNewPlans(Configuration conf) {
    LOG.info("Refreshing Reservation system");
    writeLock.lock();
    try {
      // Create a plan corresponding to every new reservable queue
      Set<String> planQueueNames = scheduler.getPlanQueues();
      for (String planQueueName : planQueueNames) {
        if (!plans.containsKey(planQueueName)) {
          Plan plan = initializePlan(planQueueName);
          plans.put(planQueueName, plan);
        } else {
          LOG.warn("Plan based on reservation queue {} already exists.",
              planQueueName);
        }
      }
      // Update the plan follower with the active plans
      if (planFollower != null) {
        planFollower.setPlans(plans.values());
      }
    } catch (YarnException e) {
      LOG.warn("Exception while trying to refresh reservable queues", e);
    } finally {
      writeLock.unlock();
    }
  }

  private PlanFollower createPlanFollower() {
    String planFollowerPolicyClassName =
        conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
            getDefaultPlanFollower());
    if (planFollowerPolicyClassName == null) {
      return null;
    }
    LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
    try {
      Class<?> planFollowerPolicyClazz =
          conf.getClassByName(planFollowerPolicyClassName);
      if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
        return (PlanFollower) ReflectionUtils
            .newInstance(planFollowerPolicyClazz, conf);
      } else {
        throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
            + " not instance of " + PlanFollower.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException(
          "Could not instantiate PlanFollowerPolicy: "
              + planFollowerPolicyClassName,
          e);
    }
  }

  private String getDefaultPlanFollower() {
    // currently only capacity scheduler is supported
    if (scheduler instanceof CapacityScheduler) {
      return CapacitySchedulerPlanFollower.class.getName();
    } else if (scheduler instanceof FairScheduler) {
      return FairSchedulerPlanFollower.class.getName();
    }
    return null;
  }

  @Override
  public Plan getPlan(String planName) {
    readLock.lock();
    try {
      return plans.get(planName);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * @return the planStepSize
   */
  @Override
  public long getPlanFollowerTimeStep() {
    readLock.lock();
    try {
      return planStepSize;
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public void synchronizePlan(String planName, boolean shouldReplan) {
    writeLock.lock();
    try {
      Plan plan = plans.get(planName);
      if (plan != null) {
        planFollower.synchronizePlan(plan, shouldReplan);
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void startPlanFollower(long initialDelay) {
    if (planFollower != null) {
      scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
      scheduledExecutorService.scheduleWithFixedDelay(planFollower,
          initialDelay, planStepSize, TimeUnit.MILLISECONDS);
    }
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    Configuration configuration = new Configuration(conf);
    reinitialize(configuration, rmContext);
    // Create the plan follower with the active plans
    planFollower = createPlanFollower();
    if (planFollower != null) {
      planFollower.init(clock, scheduler, plans.values());
    }
    super.serviceInit(conf);
  }

  @Override
  public void serviceStart() throws Exception {
    if (!isRecoveryEnabled) {
      startPlanFollower(planStepSize);
    }
    super.serviceStart();
  }

  @Override
  public void serviceStop() {
    // Stop the plan follower
    if (scheduledExecutorService != null
        && !scheduledExecutorService.isShutdown()) {
      scheduledExecutorService.shutdown();
    }
    // Clear the plans
    plans.clear();
  }

  @Override
  public String getQueueForReservation(ReservationId reservationId) {
    readLock.lock();
    try {
      return resQMap.get(reservationId);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public void setQueueForReservation(ReservationId reservationId,
      String queueName) {
    writeLock.lock();
    try {
      resQMap.put(reservationId, queueName);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public ReservationId getNewReservationId() {
    writeLock.lock();
    try {
      ReservationId resId = ReservationId.newInstance(
          ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet());
      LOG.info("Allocated new reservationId: " + resId);
      return resId;
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public Map<String, Plan> getAllPlans() {
    return plans;
  }

  /**
   * Get the default reservation system corresponding to the scheduler
   * 
   * @param scheduler the scheduler for which the reservation system is required
   *
   * @return the {@link ReservationSystem} based on the configured scheduler
   */
  public static String getDefaultReservationSystem(
      ResourceScheduler scheduler) {
    if (scheduler instanceof CapacityScheduler) {
      return CapacityReservationSystem.class.getName();
    } else if (scheduler instanceof FairScheduler) {
      return FairReservationSystem.class.getName();
    }
    return null;
  }

  protected Plan initializePlan(String planQueueName) throws YarnException {
    String planQueuePath = getPlanQueuePath(planQueueName);
    SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath);
    adPolicy.init(planQueuePath, getReservationSchedulerConfiguration());
    // Calculate the max plan capacity
    Resource minAllocation = getMinAllocation();
    Resource maxAllocation = getMaxAllocation();
    ResourceCalculator rescCalc = getResourceCalculator();
    Resource totCap = getPlanQueueCapacity(planQueueName);
    Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
        getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
        maxAllocation, planQueueName, getReplanner(planQueuePath),
        getReservationSchedulerConfiguration().getMoveOnExpiry(new QueuePath(planQueuePath)),
        maxPeriodicity, rmContext);
    LOG.info("Initialized plan {} based on reservable queue {}",
        plan.toString(), planQueueName);
    return plan;
  }

  protected Planner getReplanner(String planQueueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String plannerClassName = reservationConfig.getReplanner(new QueuePath(planQueueName));
    LOG.info("Using Replanner: " + plannerClassName + " for queue: "
        + planQueueName);
    try {
      Class<?> plannerClazz = conf.getClassByName(plannerClassName);
      if (Planner.class.isAssignableFrom(plannerClazz)) {
        Planner planner =
            (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
        planner.init(planQueueName, reservationConfig);
        return planner;
      } else {
        throw new YarnRuntimeException("Class: " + plannerClazz
            + " not instance of " + Planner.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate Planner: "
          + plannerClassName + " for queue: " + planQueueName, e);
    }
  }

  protected ReservationAgent getAgent(String queueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String agentClassName = reservationConfig.getReservationAgent(new QueuePath(queueName));
    LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
    try {
      Class<?> agentClazz = conf.getClassByName(agentClassName);
      if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
        ReservationAgent resevertionAgent =
            (ReservationAgent) agentClazz.newInstance();
        resevertionAgent.init(conf);
        return resevertionAgent;
      } else {
        throw new YarnRuntimeException("Class: " + agentClassName
            + " not instance of " + ReservationAgent.class.getCanonicalName());
      }
    } catch (ClassNotFoundException | InstantiationException
        | IllegalAccessException e) {
      throw new YarnRuntimeException("Could not instantiate Agent: "
          + agentClassName + " for queue: " + queueName, e);
    }
  }

  protected SharingPolicy getAdmissionPolicy(String queueName) {
    ReservationSchedulerConfiguration reservationConfig =
        getReservationSchedulerConfiguration();
    String admissionPolicyClassName =
        reservationConfig.getReservationAdmissionPolicy(new QueuePath(queueName));
    LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
        + " for queue: " + queueName);
    try {
      Class<?> admissionPolicyClazz =
          conf.getClassByName(admissionPolicyClassName);
      if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
        return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz,
            conf);
      } else {
        throw new YarnRuntimeException("Class: " + admissionPolicyClassName
            + " not instance of " + SharingPolicy.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
          + admissionPolicyClassName + " for queue: " + queueName, e);
    }
  }

  public ReservationsACLsManager getReservationsACLsManager() {
    return this.reservationsACLsManager;
  }

  protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration();

  protected abstract String getPlanQueuePath(String planQueueName);

  protected abstract Resource getPlanQueueCapacity(String planQueueName);

  protected abstract Resource getMinAllocation();

  protected abstract Resource getMaxAllocation();

  protected abstract ResourceCalculator getResourceCalculator();

  protected abstract QueueMetrics getRootQueueMetrics();
}