ContainerUpdateContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
    .RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Class encapsulates all outstanding container increase and decrease
 * requests for an application.
 */
public class ContainerUpdateContext {

  public static final ContainerId UNDEFINED =
      ContainerId.newContainerId(ApplicationAttemptId.newInstance(
              ApplicationId.newInstance(-1, -1), -1), -1);
  protected static final RecordFactory RECORD_FACTORY =
      RecordFactoryProvider.getRecordFactory(null);

  // Keep track of containers that are undergoing promotion
  private final Map<SchedulerRequestKey, Map<Resource,
      Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();

  private final Map<ContainerId, Resource> outstandingDecreases =
      new HashMap<>();
  private final AppSchedulingInfo appSchedulingInfo;

  ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
    this.appSchedulingInfo = appSchedulingInfo;
  }

  /**
   * Add the container to outstanding decreases.
   * @param updateReq UpdateContainerRequest.
   * @param schedulerNode SchedulerNode.
   * @param container Container.
   * @return If it was possible to decrease the container.
   */
  public synchronized boolean checkAndAddToOutstandingDecreases(
      UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
      Container container) {
    if (outstandingDecreases.containsKey(container.getId())) {
      return false;
    }
    if (ContainerUpdateType.DECREASE_RESOURCE ==
        updateReq.getContainerUpdateType()) {
      SchedulerRequestKey updateKey = new SchedulerRequestKey
          (container.getPriority(),
              container.getAllocationRequestId(), container.getId());
      cancelPreviousRequest(schedulerNode, updateKey);
      outstandingDecreases.put(container.getId(), updateReq.getCapability());
    } else {
      outstandingDecreases.put(container.getId(), container.getResource());
    }
    return true;
  }

  /**
   * Add the container to outstanding increases.
   * @param rmContainer RMContainer.
   * @param schedulerNode SchedulerNode.
   * @param updateRequest UpdateContainerRequest.
   * @return true if updated to outstanding increases was successful.
   */
  public synchronized boolean checkAndAddToOutstandingIncreases(
      RMContainer rmContainer, SchedulerNode schedulerNode,
      UpdateContainerRequest updateRequest) {
    Container container = rmContainer.getContainer();
    SchedulerRequestKey schedulerKey =
        SchedulerRequestKey.create(updateRequest,
            rmContainer.getAllocatedSchedulerKey());
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap == null) {
      resourceMap = new HashMap<>();
      outstandingIncreases.put(schedulerKey, resourceMap);
    } else {
      // Updating Resource for and existing increase container
      if (ContainerUpdateType.INCREASE_RESOURCE ==
          updateRequest.getContainerUpdateType()) {
        cancelPreviousRequest(schedulerNode, schedulerKey);
      } else {
        return false;
      }
    }
    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
    Map<NodeId, Set<ContainerId>> locationMap =
        resourceMap.get(resToIncrease);
    if (locationMap == null) {
      locationMap = new HashMap<>();
      resourceMap.put(resToIncrease, locationMap);
    }
    Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
    if (containerIds == null) {
      containerIds = new HashSet<>();
      locationMap.put(container.getNodeId(), containerIds);
    }
    if (outstandingDecreases.containsKey(container.getId())) {
      return false;
    }

    containerIds.add(container.getId());
    if (!Resources.isNone(resToIncrease)) {
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
          new HashMap<>();
      Map<String, ResourceRequest> resMap =
          createResourceRequests(rmContainer, schedulerNode,
              schedulerKey, resToIncrease);
      updateResReqs.put(schedulerKey, resMap);
      appSchedulingInfo.updateResourceRequests(updateResReqs, false);
    }
    return true;
  }

  private void cancelPreviousRequest(SchedulerNode schedulerNode,
      SchedulerRequestKey schedulerKey) {
    AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
        appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
    if (appPlacementAllocator != null) {
      PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk(
          ResourceRequest.ANY);
      // Decrement the pending using a dummy RR with
      // resource = prev update req capability
      if (pendingAsk != null && pendingAsk.getCount() > 0) {
        Container container = Container.newInstance(UNDEFINED,
            schedulerNode.getNodeID(), "host:port",
            pendingAsk.getPerAllocationResource(),
            schedulerKey.getPriority(), null);
        appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
            schedulerKey,
            new RMContainerImpl(container, schedulerKey,
                appSchedulingInfo.getApplicationAttemptId(),
                schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
                appSchedulingInfo.getRMContext(),
                appPlacementAllocator.getPrimaryRequestedNodePartition()));
      }
    }
  }

  private Map<String, ResourceRequest> createResourceRequests(
      RMContainer rmContainer, SchedulerNode schedulerNode,
      SchedulerRequestKey schedulerKey, Resource resToIncrease) {
    Map<String, ResourceRequest> resMap = new HashMap<>();
    resMap.put(rmContainer.getContainer().getNodeId().getHost(),
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, rmContainer.getContainer().getNodeId().getHost()));
    resMap.put(schedulerNode.getRackName(),
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, schedulerNode.getRackName()));
    resMap.put(ResourceRequest.ANY,
        createResourceReqForIncrease(schedulerKey, resToIncrease,
            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
            rmContainer, ResourceRequest.ANY));
    return resMap;
  }

  private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
      RMContainer rmContainer) {
    if (updateReq.getContainerUpdateType() ==
        ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
      return rmContainer.getContainer().getResource();
    }
    if (updateReq.getContainerUpdateType() ==
        ContainerUpdateType.INCREASE_RESOURCE) {
      //       This has to equal the Resources in excess of fitsIn()
      //       for container increase and is equal to the container total
      //       resource for Promotion.
      Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
          rmContainer.getContainer().getResource());
      return Resources.add(maxCap,
          Resources.negate(rmContainer.getContainer().getResource()));
    }
    return null;
  }

  private static ResourceRequest createResourceReqForIncrease(
      SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
      ResourceRequest rr, RMContainer rmContainer, String resourceName) {
    rr.setResourceName(resourceName);
    rr.setNumContainers(1);
    rr.setRelaxLocality(false);
    rr.setPriority(rmContainer.getContainer().getPriority());
    rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
    rr.setCapability(resToIncrease);
    rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
    rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
        ExecutionType.GUARANTEED, true));
    return rr;
  }

  /**
   * Remove Container from outstanding increases / decreases. Calling this
   * method essentially completes the update process.
   * @param schedulerKey SchedulerRequestKey.
   * @param container Container.
   */
  public synchronized void removeFromOutstandingUpdate(
      SchedulerRequestKey schedulerKey, Container container) {
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap != null) {
      Map<NodeId, Set<ContainerId>> locationMap =
          resourceMap.get(container.getResource());
      if (locationMap != null) {
        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
        if (containerIds != null && !containerIds.isEmpty()) {
          containerIds.remove(container.getId());
          if (containerIds.isEmpty()) {
            locationMap.remove(container.getNodeId());
          }
        }
        if (locationMap.isEmpty()) {
          resourceMap.remove(container.getResource());
        }
      }
      if (resourceMap.isEmpty()) {
        outstandingIncreases.remove(schedulerKey);
      }
    }
    outstandingDecreases.remove(container.getId());
  }

  /**
   * Check if a new container is to be matched up against an outstanding
   * Container increase request.
   * @param node SchedulerNode.
   * @param schedulerKey SchedulerRequestKey.
   * @param rmContainer RMContainer.
   * @return ContainerId.
   */
  public ContainerId matchContainerToOutstandingIncreaseReq(
      SchedulerNode node, SchedulerRequestKey schedulerKey,
      RMContainer rmContainer) {
    ContainerId retVal = null;
    Container container = rmContainer.getContainer();
    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
        outstandingIncreases.get(schedulerKey);
    if (resourceMap != null) {
      Map<NodeId, Set<ContainerId>> locationMap =
          resourceMap.get(container.getResource());
      if (locationMap != null) {
        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
        if (containerIds != null && !containerIds.isEmpty()) {
          retVal = containerIds.iterator().next();
        }
      }
    }
    // Allocation happened on NM on the same host, but not on the NM
    // we need.. We need to signal that this container has to be released.
    // We also need to add these requests back.. to be reallocated.
    if (resourceMap != null && retVal == null) {
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate =
          new HashMap<>();
      Map<String, ResourceRequest> resMap = createResourceRequests
          (rmContainer, node, schedulerKey,
          rmContainer.getContainer().getResource());
      reqsToUpdate.put(schedulerKey, resMap);
      appSchedulingInfo.updateResourceRequests(reqsToUpdate, true);
      return UNDEFINED;
    }
    return retVal;
  }

  /**
   * Swaps the existing RMContainer's and the temp RMContainers internal
   * container references after adjusting the resources in each.
   * @param tempRMContainer Temp RMContainer.
   * @param existingRMContainer Existing RMContainer.
   * @param updateType Update Type.
   * @return Existing RMContainer after swapping the container references.
   */
  public RMContainer swapContainer(RMContainer tempRMContainer,
      RMContainer existingRMContainer, ContainerUpdateType updateType) {
    ContainerId matchedContainerId = existingRMContainer.getContainerId();
    // Swap updated container with the existing container
    Container tempContainer = tempRMContainer.getContainer();

    Resource updatedResource = createUpdatedResource(
        tempContainer, existingRMContainer.getContainer(), updateType);
    Resource resourceToRelease = createResourceToRelease(
        existingRMContainer.getContainer(), updateType);
    Container newContainer = Container.newInstance(matchedContainerId,
        existingRMContainer.getContainer().getNodeId(),
        existingRMContainer.getContainer().getNodeHttpAddress(),
        updatedResource,
        existingRMContainer.getContainer().getPriority(), null,
        tempContainer.getExecutionType());
    newContainer.setExposedPorts(
        existingRMContainer.getContainer().getExposedPorts());
    newContainer.setAllocationRequestId(
        existingRMContainer.getContainer().getAllocationRequestId());
    newContainer.setVersion(existingRMContainer.getContainer().getVersion());

    tempRMContainer.getContainer().setResource(resourceToRelease);
    tempRMContainer.getContainer().setExecutionType(
        existingRMContainer.getContainer().getExecutionType());

    ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
    return existingRMContainer;
  }

  /**
   * Returns the resource that the container will finally be assigned with
   * at the end of the update operation.
   * @param tempContainer Temporary Container created for the operation.
   * @param existingContainer Existing Container.
   * @param updateType Update Type.
   * @return Final Resource.
   */
  private Resource createUpdatedResource(Container tempContainer,
      Container existingContainer, ContainerUpdateType updateType) {
    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
      return Resources.add(existingContainer.getResource(),
          tempContainer.getResource());
    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
      return outstandingDecreases.get(existingContainer.getId());
    } else {
      return existingContainer.getResource();
    }
  }

  /**
   * Returns the resources that need to be released at the end of the update
   * operation.
   * @param existingContainer Existing Container.
   * @param updateType Updated type.
   * @return Resources to be released.
   */
  private Resource createResourceToRelease(Container existingContainer,
      ContainerUpdateType updateType) {
    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
      return Resources.none();
    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
      return Resources.add(existingContainer.getResource(),
          Resources.negate(
              outstandingDecreases.get(existingContainer.getId())));
    } else {
      return existingContainer.getResource();
    }
  }
}