OpportunisticContainerAllocatorAMService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords
    .FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;

import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;

import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;


import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;

import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

/**
 * The OpportunisticContainerAllocatorAMService is started instead of the
 * ApplicationMasterService if opportunistic scheduling is enabled for the YARN
 * cluster (either centralized or distributed opportunistic scheduling).
 *
 * It extends the functionality of the ApplicationMasterService by servicing
 * clients (AMs and AMRMProxy request interceptors) that understand the
 * DistributedSchedulingProtocol.
 */
public class OpportunisticContainerAllocatorAMService
    extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
    EventHandler<SchedulerEvent> {

  private static final Logger LOG =
      LoggerFactory.getLogger(OpportunisticContainerAllocatorAMService.class);

  private final NodeQueueLoadMonitor nodeMonitor;
  private final OpportunisticContainerAllocator oppContainerAllocator;

  private final int numNodes;

  private final long cacheRefreshInterval;
  private volatile List<RemoteNode> cachedNodes;
  private volatile long lastCacheUpdateTime;

  class OpportunisticAMSProcessor implements
      ApplicationMasterServiceProcessor {

    private ApplicationMasterServiceContext context;
    private ApplicationMasterServiceProcessor nextProcessor;

    private YarnScheduler getScheduler() {
      return ((RMContext)context).getScheduler();
    }

    @Override
    public void init(ApplicationMasterServiceContext amsContext,
        ApplicationMasterServiceProcessor next) {
      this.context = amsContext;
      // The AMSProcessingChain guarantees that 'next' is not null.
      this.nextProcessor = next;
    }

    @Override
    public void registerApplicationMaster(
        ApplicationAttemptId applicationAttemptId,
        RegisterApplicationMasterRequest request,
        RegisterApplicationMasterResponse response)
        throws IOException, YarnException {
      SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
          getScheduler()).getApplicationAttempt(applicationAttemptId);
      if (appAttempt.getOpportunisticContainerContext() == null) {
        OpportunisticContainerContext opCtx =
            new OpportunisticContainerContext();
        opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
            .ContainerIdGenerator() {
          @Override
          public long generateContainerId() {
            return appAttempt.getAppSchedulingInfo().getNewContainerId();
          }
        });
        int tokenExpiryInterval = getConfig()
            .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
                YarnConfiguration.
                    DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
        opCtx.updateAllocationParams(
            getScheduler().getMinimumResourceCapability(),
            getScheduler().getMaximumResourceCapability(),
            getScheduler().getMinimumResourceCapability(),
            tokenExpiryInterval);
        appAttempt.setOpportunisticContainerContext(opCtx);
      }
      nextProcessor.registerApplicationMaster(
          applicationAttemptId, request, response);
    }

    @Override
    public void allocate(ApplicationAttemptId appAttemptId,
        AllocateRequest request, AllocateResponse response)
        throws YarnException {
      // Partition requests to GUARANTEED and OPPORTUNISTIC.
      OpportunisticContainerAllocator.PartitionedResourceRequests
          partitionedAsks =
          oppContainerAllocator.partitionAskList(request.getAskList());

      // Allocate OPPORTUNISTIC containers.
      SchedulerApplicationAttempt appAttempt =
          ((AbstractYarnScheduler)rmContext.getScheduler())
              .getApplicationAttempt(appAttemptId);

      if (!appAttempt.getApplicationAttemptId().equals(appAttemptId)){
        LOG.error("Calling allocate on previous or removed or non "
            + "existent application attempt {}", appAttemptId);
        return;
      }

      OpportunisticContainerContext oppCtx =
          appAttempt.getOpportunisticContainerContext();
      oppCtx.updateNodeList(getLeastLoadedNodes());

      if (!partitionedAsks.getOpportunistic().isEmpty()) {
        String appPartition = appAttempt.getAppAMNodePartitionName();

        for (ResourceRequest req : partitionedAsks.getOpportunistic()) {
          if (null == req.getNodeLabelExpression()) {
            req.setNodeLabelExpression(appPartition);
          }
        }
      }

      List<Container> oppContainers =
          oppContainerAllocator.allocateContainers(
              request.getResourceBlacklistRequest(),
              partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
              ResourceManager.getClusterTimeStamp(), appAttempt.getUser());

      // Create RMContainers and update the NMTokens.
      if (!oppContainers.isEmpty()) {
        OpportunisticSchedulerMetrics schedulerMetrics =
            OpportunisticSchedulerMetrics.getMetrics();
        schedulerMetrics.incrAllocatedOppContainers(oppContainers.size());
        handleNewContainers(oppContainers, false);
        appAttempt.updateNMTokens(oppContainers);
        ApplicationMasterServiceUtils.addToAllocatedContainers(
            response, oppContainers);
      }

      // Allocate GUARANTEED containers.
      request.setAskList(partitionedAsks.getGuaranteed());
      nextProcessor.allocate(appAttemptId, request, response);
    }

    @Override
    public void finishApplicationMaster(
        ApplicationAttemptId applicationAttemptId,
        FinishApplicationMasterRequest request,
        FinishApplicationMasterResponse response) {
      nextProcessor.finishApplicationMaster(applicationAttemptId,
          request, response);
    }
  }

  public OpportunisticContainerAllocatorAMService(RMContext rmContext,
      YarnScheduler scheduler) {
    super(OpportunisticContainerAllocatorAMService.class.getName(),
        rmContext, scheduler);
    int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt(
        YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
        YarnConfiguration.
            DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
    this.numNodes = rmContext.getYarnConfiguration().getInt(
        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
        YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
        YarnConfiguration.
            DEFAULT_NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
    this.cacheRefreshInterval = nodeSortInterval;
    this.lastCacheUpdateTime = System.currentTimeMillis();
    NodeQueueLoadMonitor.LoadComparator comparator =
        NodeQueueLoadMonitor.LoadComparator.valueOf(
            rmContext.getYarnConfiguration().get(
                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
                YarnConfiguration.
                    DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR));

    NodeQueueLoadMonitor topKSelector =
        new NodeQueueLoadMonitor(nodeSortInterval, comparator, numNodes);

    float sigma = rmContext.getYarnConfiguration()
        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
            YarnConfiguration.DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV);

    int limitMin, limitMax;

    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH ||
        comparator ==
            NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) {
      limitMin = rmContext.getYarnConfiguration()
          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
      limitMax = rmContext.getYarnConfiguration()
          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
    } else {
      limitMin = rmContext.getYarnConfiguration()
          .getInt(
              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
      limitMax = rmContext.getYarnConfiguration()
          .getInt(
              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
    }

    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
    this.nodeMonitor = topKSelector;
    this.oppContainerAllocator =
        new CentralizedOpportunisticContainerAllocator(
            rmContext.getContainerTokenSecretManager(),
            maxAllocationsPerAMHeartbeat, nodeMonitor);
  }

  @Override
  public Server getServer(YarnRPC rpc, Configuration serverConf,
      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
    if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
      Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
          addr, serverConf, secretManager,
          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
      // To support application running on NMs that DO NOT support
      // Dist Scheduling... The server multiplexes both the
      // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
      ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
          ApplicationMasterProtocolPB.class,
          ApplicationMasterProtocolService.newReflectiveBlockingService(
              new ApplicationMasterProtocolPBServiceImpl(this)));
      return server;
    }
    return super.getServer(rpc, serverConf, addr, secretManager);
  }

  @Override
  protected List<ApplicationMasterServiceProcessor> getProcessorList(
      Configuration conf) {
    List<ApplicationMasterServiceProcessor> retVal =
        super.getProcessorList(conf);
    retVal.add(new OpportunisticAMSProcessor());
    return retVal;
  }

  @Override
  public RegisterDistributedSchedulingAMResponse
      registerApplicationMasterForDistributedScheduling(
      RegisterApplicationMasterRequest request) throws YarnException,
      IOException {
    RegisterApplicationMasterResponse response =
        registerApplicationMaster(request);
    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
    dsResp.setRegisterResponse(response);
    dsResp.setMinContainerResource(
        rmContext.getScheduler().getMinimumResourceCapability());
    dsResp.setMaxContainerResource(
        rmContext.getScheduler().getMaximumResourceCapability());
    dsResp.setContainerTokenExpiryInterval(
        getConfig().getInt(
            YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
            YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS));
    dsResp.setContainerIdStart(
        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);

    // Set nodes to be used for scheduling
    dsResp.setNodesForScheduling(getLeastLoadedNodes());
    return dsResp;
  }

  @Override
  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
      DistributedSchedulingAllocateRequest request)
      throws YarnException, IOException {
    List<Container> distAllocContainers = request.getAllocatedContainers();
    handleNewContainers(distAllocContainers, true);
    AllocateResponse response = allocate(request.getAllocateRequest());
    DistributedSchedulingAllocateResponse dsResp = recordFactory
        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
    dsResp.setAllocateResponse(response);
    dsResp.setNodesForScheduling(getLeastLoadedNodes());
    return dsResp;
  }

  private void handleNewContainers(List<Container> allocContainers,
      boolean isRemotelyAllocated) {
    for (Container container : allocContainers) {
      // Create RMContainer
      RMContainer rmContainer =
          SchedulerUtils.createOpportunisticRmContainer(
              rmContext, container, isRemotelyAllocated);
      if (rmContainer!=null) {
        rmContainer.handle(
            new RMContainerEvent(container.getId(),
                RMContainerEventType.ACQUIRED));
      }
    }
  }

  @Override
  protected void serviceStart() throws Exception {
    if (this.nodeMonitor != null) {
      this.nodeMonitor.start();
    }
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    if (nodeMonitor != null) {
      nodeMonitor.stop();
    }
    super.serviceStop();
  }

  @Override
  public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      if (!(event instanceof NodeAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
          nodeAddedEvent.getAddedRMNode());
      break;
    case NODE_REMOVED:
      if (!(event instanceof NodeRemovedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeRemovedSchedulerEvent nodeRemovedEvent =
          (NodeRemovedSchedulerEvent) event;
      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
      break;
    case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
          event;
      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
      break;
    case NODE_RESOURCE_UPDATE:
      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
          (NodeResourceUpdateSchedulerEvent) event;
      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
          nodeResourceUpdatedEvent.getResourceOption());
      break;

    // <-- IGNORED EVENTS : START -->
    case APP_ADDED:
      break;
    case APP_REMOVED:
      break;
    case APP_ATTEMPT_ADDED:
      break;
    case APP_ATTEMPT_REMOVED:
      break;
    case CONTAINER_EXPIRED:
      break;
    case NODE_LABELS_UPDATE:
      break;
    case RELEASE_CONTAINER:
      break;
    case NODE_ATTRIBUTES_UPDATE:
      break;
    case KILL_RESERVED_CONTAINER:
      break;
    case MARK_CONTAINER_FOR_PREEMPTION:
      break;
    case MARK_CONTAINER_FOR_KILLABLE:
      break;
    case MARK_CONTAINER_FOR_NONKILLABLE:
      break;
    case MANAGE_QUEUE:
      break;
    // <-- IGNORED EVENTS : END -->
    default:
      LOG.error("Unknown event arrived at" +
          "OpportunisticContainerAllocatorAMService: {}", event);
    }

  }

  QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
    return nodeMonitor.getThresholdCalculator();
  }

  @VisibleForTesting
  synchronized List<RemoteNode> getLeastLoadedNodes() {
    long currTime = System.currentTimeMillis();
    if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
        || (cachedNodes == null)) {
      cachedNodes = convertToRemoteNodes(
          this.nodeMonitor.selectLeastLoadedNodes(this.numNodes));
      if (cachedNodes.size() > 0) {
        lastCacheUpdateTime = currTime;
      }
    }
    return cachedNodes;
  }

  private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
    ArrayList<RemoteNode> retNodes = new ArrayList<>();
    for (NodeId nId : nodeIds) {
      RemoteNode remoteNode = convertToRemoteNode(nId);
      if (null != remoteNode) {
        retNodes.add(remoteNode);
      }
    }
    return retNodes;
  }

  private RemoteNode convertToRemoteNode(NodeId nodeId) {
    SchedulerNode node =
        ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
    if (node != null) {
      RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
      rNode.setRackName(node.getRackName());
      rNode.setNodePartition(node.getPartition());
      return rNode;
    }
    return null;
  }
}