CentralizedOpportunisticContainerAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * <p>
 * The CentralizedOpportunisticContainerAllocator allocates opportunistic
 * containers by considering all the nodes present in the cluster, after
 * modifying the container sizes to respect the limits set by the
 * ResourceManager. It tries to distribute the containers as evenly as
 * possible.
 * </p>
 */
public class CentralizedOpportunisticContainerAllocator extends
    OpportunisticContainerAllocator {

  private static final Logger LOG =
      LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class);

  private NodeQueueLoadMonitor nodeQueueLoadMonitor;
  private OpportunisticSchedulerMetrics metrics =
      OpportunisticSchedulerMetrics.getMetrics();

  /**
   * Create a new Centralized Opportunistic Container Allocator.
   * @param tokenSecretManager TokenSecretManager
   */
  public CentralizedOpportunisticContainerAllocator(
      BaseContainerTokenSecretManager tokenSecretManager) {
    super(tokenSecretManager);
  }

  /**
   * Create a new Centralized Opportunistic Container Allocator.
   * @param tokenSecretManager TokenSecretManager
   * @param maxAllocationsPerAMHeartbeat max number of containers to be
   *                                     allocated in one AM heartbeat
   * @param nodeQueueLoadMonitor NodeQueueLoadMonitor.
   */
  public CentralizedOpportunisticContainerAllocator(
      BaseContainerTokenSecretManager tokenSecretManager,
      int maxAllocationsPerAMHeartbeat,
      NodeQueueLoadMonitor nodeQueueLoadMonitor) {
    super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
    this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
  }

  @VisibleForTesting
  void setNodeQueueLoadMonitor(NodeQueueLoadMonitor nodeQueueLoadMonitor) {
    this.nodeQueueLoadMonitor = nodeQueueLoadMonitor;
  }

  @Override
  public List<Container> allocateContainers(
      ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs,
      ApplicationAttemptId applicationAttemptId,
      OpportunisticContainerContext opportContext, long rmIdentifier,
      String appSubmitter) throws YarnException {

    updateBlacklist(blackList, opportContext);

    // Add OPPORTUNISTIC requests to the outstanding ones.
    opportContext.addToOutstandingReqs(oppResourceReqs);

    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
    List<Container> allocatedContainers = new ArrayList<>();
    int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat();
    List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();

    for (SchedulerRequestKey schedulerKey :
        opportContext.getOutstandingOpReqs().descendingKeySet()) {
      // Allocated containers :
      //  Key = Requested Capability,
      //  Value = List of Containers of given cap (the actual container size
      //          might be different than what is requested, which is why
      //          we need the requested capability (key) to match against
      //          the outstanding reqs)
      int remAllocs = -1;
      if (maxAllocationsPerAMHeartbeat > 0) {
        remAllocs =
            maxAllocationsPerAMHeartbeat - getTotalAllocations(allocations);
        if (remAllocs <= 0) {
          LOG.info("Not allocating more containers as we have reached max "
                  + "allocations per AM heartbeat {}",
              maxAllocationsPerAMHeartbeat);
          break;
        }
      }
      Map<Resource, List<Allocation>> allocation = allocatePerSchedulerKey(
          rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
          appSubmitter, nodeBlackList, remAllocs);
      if (allocation.size() > 0) {
        allocations.add(allocation);
      }
    }
    matchAllocation(allocations, allocatedContainers, opportContext);
    return allocatedContainers;
  }

  private Map<Resource, List<Allocation>> allocatePerSchedulerKey(
      long rmIdentifier, OpportunisticContainerContext appContext,
      SchedulerRequestKey schedKey, ApplicationAttemptId appAttId,
      String userName, Set<String> blackList, int maxAllocations)
      throws YarnException {
    Map<Resource, List<Allocation>> allocations = new HashMap<>();
    int totalAllocated = 0;
    for (EnrichedResourceRequest enrichedAsk :
        appContext.getOutstandingOpReqs().get(schedKey).values()) {
      int remainingAllocs = -1;
      if (maxAllocations > 0) {
        remainingAllocs = maxAllocations - totalAllocated;
        if (remainingAllocs <= 0) {
          LOG.info("Not allocating more containers as max allocations per AM "
              + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat());
          break;
        }
      }

      totalAllocated += allocateContainersPerRequest(rmIdentifier,
          appContext.getAppParams(),
          appContext.getContainerIdGenerator(), blackList,
          appAttId, userName, allocations, enrichedAsk,
          remainingAllocs);
      ResourceRequest anyAsk = enrichedAsk.getRequest();
      if (!allocations.isEmpty()) {
        LOG.info("Opportunistic allocation requested for [priority={}, "
                + "allocationRequestId={}, num_containers={}, capability={}] "
                + "allocated = {}", anyAsk.getPriority(),
            anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
            anyAsk.getCapability(), allocations.keySet());
      }
    }
    return allocations;
  }

  @SuppressWarnings("checkstyle:parameternumber")
  private int allocateContainersPerRequest(long rmIdentifier,
      AllocationParams appParams, ContainerIdGenerator idCounter,
      Set<String> blacklist,
      ApplicationAttemptId id,
      String userName, Map<Resource, List<Allocation>> allocations,
      EnrichedResourceRequest enrichedAsk, int maxAllocations)
      throws YarnException {
    ResourceRequest anyAsk = enrichedAsk.getRequest();
    int totalAllocated = 0;
    int maxToAllocate = anyAsk.getNumContainers()
        - (allocations.isEmpty() ? 0 :
        allocations.get(anyAsk.getCapability()).size());
    if (maxAllocations >= 0) {
      maxToAllocate = Math.min(maxAllocations, maxToAllocate);
    }

    // allocate node local
    if (maxToAllocate > 0) {
      Map<String, AtomicInteger> nodeLocations = enrichedAsk.getNodeMap();
      for (Map.Entry<String, AtomicInteger> nodeLocation :
          nodeLocations.entrySet()) {
        int numContainers = nodeLocation.getValue().get();
        numContainers = Math.min(numContainers, maxToAllocate);
        List<Container> allocatedContainers =
            allocateNodeLocal(enrichedAsk, nodeLocation.getKey(),
                numContainers, rmIdentifier, appParams, idCounter, blacklist,
                id, userName, allocations);
        totalAllocated += allocatedContainers.size();
        maxToAllocate -= allocatedContainers.size();
        // no more containers to allocate
        if (maxToAllocate <= 0) {
          break;
        }
      }
    }

    // if still left, allocate rack local
    if (maxToAllocate > 0) {
      Map<String, AtomicInteger> rackLocations = enrichedAsk.getRackMap();
      for (Map.Entry<String, AtomicInteger> rack : rackLocations.entrySet()) {
        int numContainers = rack.getValue().get();
        numContainers = Math.min(numContainers, maxToAllocate);
        List<Container> allocatedContainers =
            allocateRackLocal(enrichedAsk, rack.getKey(), numContainers,
                rmIdentifier, appParams, idCounter, blacklist, id,
                userName, allocations);
        totalAllocated += allocatedContainers.size();
        maxToAllocate -= allocatedContainers.size();
        // no more containers to allocate
        if (maxToAllocate <= 0) {
          break;
        }
      }
    }

    // if still left, try on ANY
    if (maxToAllocate > 0) {
      List<Container> allocatedContainers = allocateAny(enrichedAsk,
          maxToAllocate, rmIdentifier, appParams, idCounter, blacklist,
          id, userName, allocations);
      totalAllocated += allocatedContainers.size();
    }
    return totalAllocated;
  }

  @SuppressWarnings("checkstyle:parameternumber")
  private List<Container> allocateNodeLocal(
      EnrichedResourceRequest enrichedAsk,
      String nodeLocation,
      int toAllocate, long rmIdentifier,
      AllocationParams appParams, ContainerIdGenerator idCounter,
      Set<String> blacklist,
      ApplicationAttemptId id,
      String userName, Map<Resource, List<Allocation>> allocations)
      throws YarnException {
    List<Container> allocatedContainers = new ArrayList<>();
    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
    while (toAllocate > 0) {
      RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation,
          blacklist, resourceRequest.getCapability());
      if (node != null) {
        toAllocate--;
        Container container = createContainer(rmIdentifier, appParams,
            idCounter, id, userName, allocations, nodeLocation,
            resourceRequest, convertToRemoteNode(node));
        allocatedContainers.add(container);
        LOG.info("Allocated [{}] as opportunistic at location [{}]",
            container.getId(), nodeLocation);
        metrics.incrNodeLocalOppContainers();
      } else {
        // we couldn't allocate any - break the loop.
        break;
      }
    }
    return allocatedContainers;
  }

  @SuppressWarnings("checkstyle:parameternumber")
  private List<Container> allocateRackLocal(EnrichedResourceRequest enrichedAsk,
      String rackLocation, int toAllocate, long rmIdentifier,
      AllocationParams appParams, ContainerIdGenerator idCounter,
      Set<String> blacklist,
      ApplicationAttemptId id,
      String userName, Map<Resource, List<Allocation>> allocations)
      throws YarnException {
    List<Container> allocatedContainers = new ArrayList<>();
    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
    while (toAllocate > 0) {
      RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation,
          blacklist, resourceRequest.getCapability());
      if (node != null) {
        toAllocate--;
        Container container = createContainer(rmIdentifier, appParams,
            idCounter, id, userName, allocations, rackLocation,
            resourceRequest, convertToRemoteNode(node));
        allocatedContainers.add(container);
        metrics.incrRackLocalOppContainers();
        LOG.info("Allocated [{}] as opportunistic at location [{}]",
            container.getId(), rackLocation);
      } else {
        // we couldn't allocate any - break the loop.
        break;
      }
    }
    return allocatedContainers;
  }

  @SuppressWarnings("checkstyle:parameternumber")
  private List<Container> allocateAny(EnrichedResourceRequest enrichedAsk,
      int toAllocate, long rmIdentifier,
      AllocationParams appParams, ContainerIdGenerator idCounter,
      Set<String> blacklist,
      ApplicationAttemptId id,
      String userName, Map<Resource, List<Allocation>> allocations)
      throws YarnException {
    List<Container> allocatedContainers = new ArrayList<>();
    final ResourceRequest resourceRequest = enrichedAsk.getRequest();
    while (toAllocate > 0) {
      RMNode node = nodeQueueLoadMonitor.selectAnyNode(
          blacklist, resourceRequest.getCapability());
      if (node != null) {
        toAllocate--;
        Container container = createContainer(rmIdentifier, appParams,
            idCounter, id, userName, allocations, ResourceRequest.ANY,
            resourceRequest, convertToRemoteNode(node));
        allocatedContainers.add(container);
        metrics.incrOffSwitchOppContainers();
        LOG.info("Allocated [{}] as opportunistic at location [{}]",
            container.getId(), ResourceRequest.ANY);
      } else {
        // we couldn't allocate any - break the loop.
        break;
      }
    }
    return allocatedContainers;
  }

  private RemoteNode convertToRemoteNode(RMNode rmNode) {
    if (rmNode != null) {
      RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(),
          rmNode.getHttpAddress());
      rNode.setRackName(rmNode.getRackName());
      return rNode;
    }
    return null;
  }
}