SingleConstraintAppPlacementAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;

/**
 * This is a simple implementation to do affinity or anti-affinity for
 * inter/intra apps.
 */
public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
    extends AppPlacementAllocator<N> {
  private static final Logger LOG =
      LoggerFactory.getLogger(SingleConstraintAppPlacementAllocator.class);

  private ReentrantReadWriteLock.ReadLock readLock;
  private ReentrantReadWriteLock.WriteLock writeLock;

  private SchedulingRequest schedulingRequest = null;
  private String targetNodePartition;
  private AllocationTagsManager allocationTagsManager;
  private PlacementConstraintManager placementConstraintManager;

  public SingleConstraintAppPlacementAllocator() {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    readLock = lock.readLock();
    writeLock = lock.writeLock();
  }

  @Override
  public PendingAskUpdateResult updatePendingAsk(
      Collection<ResourceRequest> requests,
      boolean recoverPreemptedRequestForAContainer) {
    if (requests != null && !requests.isEmpty()) {
      throw new SchedulerInvalidResourceRequestException(
          this.getClass().getName()
              + " not be able to handle ResourceRequest, there exists a "
              + "SchedulingRequest with the same scheduler key="
              + SchedulerRequestKey.create(requests.iterator().next())
              + ", please send ResourceRequest with a different allocationId and "
              + "priority");
    }

    // Do nothing
    return null;
  }

  private PendingAskUpdateResult internalUpdatePendingAsk(
      SchedulingRequest newSchedulingRequest, boolean recoverContainer) {
    // When it is a recover container, there must exists an schedulingRequest.
    if (recoverContainer && schedulingRequest == null) {
      throw new SchedulerInvalidResourceRequestException("Trying to recover a "
          + "container request=" + newSchedulingRequest.toString() + ", however"
          + "there's no existing scheduling request, this should not happen.");
    }

    if (schedulingRequest != null) {
      // If we have an old scheduling request, we will make sure that no changes
      // made except sizing.
      // To avoid unnecessary copy of the data structure, we do this by
      // replacing numAllocations with old numAllocations in the
      // newSchedulingRequest#getResourceSizing, and compare the two objects.
      ResourceSizing sizing = newSchedulingRequest.getResourceSizing();
      int existingNumAllocations =
          schedulingRequest.getResourceSizing().getNumAllocations();

      // When it is a recovered container request, just set
      // #newAllocations = #existingAllocations + 1;
      int newNumAllocations;
      if (recoverContainer) {
        newNumAllocations = existingNumAllocations + 1;
      } else {
        newNumAllocations = sizing.getNumAllocations();
      }
      sizing.setNumAllocations(existingNumAllocations);

      // Compare two objects
      if (!schedulingRequest.equals(newSchedulingRequest)) {
        // Rollback #numAllocations
        sizing.setNumAllocations(newNumAllocations);
        throw new SchedulerInvalidResourceRequestException(
            "Invalid updated SchedulingRequest added to scheduler, "
                + " we only allows changing numAllocations for the updated "
                + "SchedulingRequest. Old=" + schedulingRequest.toString()
                + " new=" + newSchedulingRequest.toString()
                + ", if any fields need to be updated, please cancel the "
                + "old request (by setting numAllocations to 0) and send a "
                + "SchedulingRequest with different combination of "
                + "priority/allocationId");
      } else {
        if (newNumAllocations == existingNumAllocations) {
          // No update on pending asks, return null.
          return null;
        }
      }

      // Rollback #numAllocations
      sizing.setNumAllocations(newNumAllocations);

      // Basic sanity check
      if (newNumAllocations < 0) {
        throw new SchedulerInvalidResourceRequestException(
            "numAllocation in ResourceSizing field must be >= 0, "
                + "updating schedulingRequest failed.");
      }

      PendingAskUpdateResult updateResult = new PendingAskUpdateResult(
          new PendingAsk(schedulingRequest.getResourceSizing()),
          new PendingAsk(newSchedulingRequest.getResourceSizing()),
          targetNodePartition, targetNodePartition);

      // Ok, now everything is same except numAllocation, update numAllocation.
      this.schedulingRequest.getResourceSizing().setNumAllocations(
          newNumAllocations);
      LOG.info(
          "Update numAllocation from old=" + existingNumAllocations + " to new="
              + newNumAllocations);

      return updateResult;
    }

    // For a new schedulingRequest, we need to validate if we support its asks.
    // This will update internal partitions, etc. after the SchedulingRequest is
    // valid.
    validateAndSetSchedulingRequest(newSchedulingRequest);

    return new PendingAskUpdateResult(null,
        new PendingAsk(newSchedulingRequest.getResourceSizing()), null,
        targetNodePartition);
  }

  @Override
  public PendingAskUpdateResult updatePendingAsk(
      SchedulerRequestKey schedulerRequestKey,
      SchedulingRequest newSchedulingRequest,
      boolean recoverPreemptedRequestForAContainer) {
    writeLock.lock();
    try {
      return internalUpdatePendingAsk(newSchedulingRequest,
          recoverPreemptedRequestForAContainer);
    } finally {
      writeLock.unlock();
    }
  }

  private String throwExceptionWithMetaInfo(String message) {
    StringBuilder sb = new StringBuilder();
    sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append(
        " Key=").append(this.schedulerRequestKey).append(". Exception message:")
        .append(message);
    throw new SchedulerInvalidResourceRequestException(sb.toString());
  }

  private void validateAndSetSchedulingRequest(SchedulingRequest
      newSchedulingRequest)
      throws SchedulerInvalidResourceRequestException {
    // Check sizing exists
    if (newSchedulingRequest.getResourceSizing() == null
        || newSchedulingRequest.getResourceSizing().getResources() == null) {
      throwExceptionWithMetaInfo(
          "No ResourceSizing found in the scheduling request, please double "
              + "check");
    }

    // Check execution type == GUARANTEED
    if (newSchedulingRequest.getExecutionType() != null
        && newSchedulingRequest.getExecutionType().getExecutionType()
        != ExecutionType.GUARANTEED) {
      throwExceptionWithMetaInfo(
          "Only GUARANTEED execution type is supported.");
    }

    this.targetNodePartition = validateAndGetTargetNodePartition(
        newSchedulingRequest.getPlacementConstraint());
    this.schedulingRequest = new SchedulingRequestPBImpl(
        ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());


    LOG.info("Successfully added SchedulingRequest to app="
        + appSchedulingInfo.getApplicationAttemptId()
        + " placementConstraint=["
        + schedulingRequest.getPlacementConstraint()
        + "]. nodePartition=" + targetNodePartition);
  }

  // Tentatively find out potential exist node-partition in the placement
  // constraint and set as the app's primary node-partition.
  // Currently only single constraint is handled.
  private String validateAndGetTargetNodePartition(
      PlacementConstraint placementConstraint) {
    String defaultNodeLabelExpression =
        appSchedulingInfo.getDefaultNodeLabelExpression();
    String nodePartition = defaultNodeLabelExpression == null ?
        RMNodeLabelsManager.NO_LABEL : defaultNodeLabelExpression;
    if (placementConstraint != null &&
        placementConstraint.getConstraintExpr() != null) {
      PlacementConstraint.AbstractConstraint ac =
          placementConstraint.getConstraintExpr();
      if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) {
        PlacementConstraint.SingleConstraint singleConstraint =
            (PlacementConstraint.SingleConstraint) ac;
        for (PlacementConstraint.TargetExpression targetExpression :
            singleConstraint.getTargetExpressions()) {
          // Handle node partition
          if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) &&
              targetExpression.getTargetKey().equals(NODE_PARTITION)) {
            Set<String> values = targetExpression.getTargetValues();
            if (values == null || values.isEmpty()) {
              continue;
            }
            if (values.size() > 1) {
              throwExceptionWithMetaInfo(
                  "Inside one targetExpression, we only support"
                      + " affinity to at most one node partition now");
            }
            nodePartition = values.iterator().next();
            if (nodePartition != null) {
              break;
            }
          }
        }
      }
    }
    return nodePartition;
  }

  @Override
  public Map<String, ResourceRequest> getResourceRequests() {
    return Collections.emptyMap();
  }

  @Override
  public PendingAsk getPendingAsk(String resourceName) {
    readLock.lock();
    try {
      if (resourceName.equals("*") && schedulingRequest != null) {
        return new PendingAsk(schedulingRequest.getResourceSizing());
      }
      return PendingAsk.ZERO;
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public int getOutstandingAsksCount(String resourceName) {
    readLock.lock();
    try {
      if (resourceName.equals("*") && schedulingRequest != null) {
        return schedulingRequest.getResourceSizing().getNumAllocations();
      }
      return 0;
    } finally {
      readLock.unlock();
    }
  }

  private void decreasePendingNumAllocation() {
    // Deduct pending #allocations by 1
    ResourceSizing sizing = schedulingRequest.getResourceSizing();
    sizing.setNumAllocations(sizing.getNumAllocations() - 1);

    appSchedulingInfo.decPendingResource(targetNodePartition, sizing.getResources());
  }

  @Override
  public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
      NodeType type, SchedulerNode node) {
    writeLock.lock();
    try {
      // Per container scheduling request, it is just a copy of existing
      // scheduling request with #allocations=1
      SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl(
          ((SchedulingRequestPBImpl) schedulingRequest).getProto());
      containerSchedulingRequest.getResourceSizing().setNumAllocations(1);

      // Deduct sizing
      decreasePendingNumAllocation();

      return new ContainerRequest(containerSchedulingRequest);
    } finally {
      writeLock.unlock();
    }
  }

  private boolean checkCardinalityAndPending(SchedulerNode node,
      Optional<DiagnosticsCollector> dcOpt) {
    // Do we still have pending resource?
    if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
      return false;
    }

    // node type will be ignored.
    try {
      return PlacementConstraintsUtil.canSatisfyConstraints(
          appSchedulingInfo.getApplicationId(), schedulingRequest, node,
          placementConstraintManager, allocationTagsManager, dcOpt);
    } catch (InvalidAllocationTagsQueryException e) {
      LOG.warn("Failed to query node cardinality:", e);
      this.incrementPlacementAttempt();
      return false;
    }
  }

  @Override
  public boolean canAllocate(NodeType type, SchedulerNode node) {
    readLock.lock();
    try {
      return checkCardinalityAndPending(node, Optional.empty());
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public boolean canDelayTo(String resourceName) {
    return true;
  }

  @Override
  public boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode) {
    return precheckNode(schedulerNode, schedulingMode, Optional.empty());
  }

  @Override
  public boolean precheckNode(SchedulerNode schedulerNode,
      SchedulingMode schedulingMode,
      Optional<DiagnosticsCollector> dcOpt) {
    // We will only look at node label = nodeLabelToLookAt according to
    // schedulingMode and partition of node.
    String nodePartitionToLookAt;
    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
      nodePartitionToLookAt = schedulerNode.getPartition();
    } else{
      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
    }

    readLock.lock();
    try {
      // Check node partition as well as cardinality/pending resources.
      boolean rst = this.targetNodePartition.equals(nodePartitionToLookAt);
      if (!rst) {
        if (dcOpt.isPresent()) {
          dcOpt.get().collectPartitionDiagnostics(targetNodePartition,
              nodePartitionToLookAt);
        }
        return rst;
      }
      return checkCardinalityAndPending(schedulerNode, dcOpt);
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public String getPrimaryRequestedNodePartition() {
    return targetNodePartition;
  }

  @Override
  public int getUniqueLocationAsks() {
    return 1;
  }

  @Override
  public void showRequests() {
    readLock.lock();
    try {
      if (schedulingRequest != null) {
        LOG.info(schedulingRequest.toString());
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public SchedulingRequest getSchedulingRequest() {
    return schedulingRequest;
  }

  @VisibleForTesting
  String getTargetNodePartition() {
    return targetNodePartition;
  }

  @Override
  public void initialize(AppSchedulingInfo appSchedulingInfo,
      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
    super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
    this.allocationTagsManager = rmContext.getAllocationTagsManager();
    this.placementConstraintManager = rmContext.getPlacementConstraintManager();
  }
}