ActivitiesLogger.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;

import java.util.function.Supplier;

/**
 * Utility for logging scheduler activities
 */
public class ActivitiesLogger {
  private static final Logger LOG =
      LoggerFactory.getLogger(ActivitiesLogger.class);

  /**
   * Methods for recording activities from an app
   */
  public static class APP {

    /*
     * Record skipped application activity when no container allocated /
     * reserved / re-reserved. Scheduler will look at following applications
     * within the same leaf queue.
     */
    public static void recordSkippedAppActivityWithoutAllocation(
        ActivitiesManager activitiesManager, SchedulerNode node,
        SchedulerApplicationAttempt application,
        SchedulerRequestKey requestKey,
        String diagnostic, ActivityLevel level) {
      recordAppActivityWithoutAllocation(activitiesManager, node, application,
          requestKey, diagnostic, ActivityState.SKIPPED, level);
    }

    /*
     * Record application activity when rejected because of queue maximum
     * capacity or user limit.
     */
    public static void recordRejectedAppActivityFromLeafQueue(
        ActivitiesManager activitiesManager, SchedulerNode node,
        SchedulerApplicationAttempt application, Priority priority,
        String diagnostic) {
      if (activitiesManager == null) {
        return;
      }
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      if (activitiesManager.shouldRecordThisNode(nodeId)) {
        recordActivity(activitiesManager, nodeId, application.getQueueName(),
            application.getApplicationId().toString(), priority,
            ActivityState.REJECTED, diagnostic, ActivityLevel.APP);
      }
      finishSkippedAppAllocationRecording(activitiesManager,
          application.getApplicationId(), ActivityState.REJECTED, diagnostic);
    }

    /*
     * Record application activity when no container allocated /
     * reserved / re-reserved. Scheduler will look at following applications
     * within the same leaf queue.
     */
    public static void recordAppActivityWithoutAllocation(
        ActivitiesManager activitiesManager, SchedulerNode node,
        SchedulerApplicationAttempt application,
        SchedulerRequestKey schedulerKey,
        String diagnostic, ActivityState appState, ActivityLevel level) {
      if (activitiesManager == null) {
        return;
      }
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      if (activitiesManager.shouldRecordThisNode(nodeId)) {
        String requestName = null;
        Integer priority = null;
        Long allocationRequestId = null;
        if (level == ActivityLevel.NODE || level == ActivityLevel.REQUEST) {
          if (schedulerKey == null) {
            LOG.warn("Request key should not be null at " + level + " level.");
            return;
          }
          priority = getPriority(schedulerKey);
          allocationRequestId = schedulerKey.getAllocationRequestId();
          requestName = getRequestName(priority, allocationRequestId);
        }
        switch (level) {
        case NODE:
          recordSchedulerActivityAtNodeLevel(activitiesManager, application,
              requestName, priority, allocationRequestId, null, nodeId,
              appState, diagnostic);
          break;
        case REQUEST:
          recordSchedulerActivityAtRequestLevel(activitiesManager, application,
              requestName, priority, allocationRequestId, nodeId, appState,
              diagnostic);
          break;
        case APP:
          recordSchedulerActivityAtAppLevel(activitiesManager, application,
              nodeId, appState, diagnostic);
          break;
        default:
          LOG.warn("Doesn't handle app activities at " + level + " level.");
          break;
        }
      }
      // Add application-container activity into specific application allocation
      // Under this condition, it fails to allocate a container to this
      // application, so containerId is null.
      if (activitiesManager.shouldRecordThisApp(
          application.getApplicationId())) {
        activitiesManager.addSchedulingActivityForApp(
            application.getApplicationId(), null,
            getPriority(schedulerKey), appState,
            diagnostic, level, nodeId,
            schedulerKey == null ?
                null : schedulerKey.getAllocationRequestId());
      }
    }

    /*
     * Record application activity when container allocated / reserved /
     * re-reserved
     */
    public static void recordAppActivityWithAllocation(
        ActivitiesManager activitiesManager, SchedulerNode node,
        SchedulerApplicationAttempt application, RMContainer updatedContainer,
        ActivityState activityState) {
      if (activitiesManager == null) {
        return;
      }
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      if (nodeId == null || nodeId == ActivitiesManager.EMPTY_NODE_ID) {
        nodeId = updatedContainer.getNodeId();
      }
      if (activitiesManager.shouldRecordThisNode(nodeId)) {
        Integer containerPriority =
            updatedContainer.getContainer().getPriority().getPriority();
        Long allocationRequestId =
            updatedContainer.getContainer().getAllocationRequestId();
        String requestName =
            getRequestName(containerPriority, allocationRequestId);
        // Add node,request,app level activities into scheduler activities.
        recordSchedulerActivityAtNodeLevel(activitiesManager, application,
            requestName, containerPriority, allocationRequestId,
            updatedContainer.getContainer().toString(), nodeId, activityState,
            ActivityDiagnosticConstant.EMPTY);
      }
      // Add application-container activity into specific application allocation
      if (activitiesManager.shouldRecordThisApp(
          application.getApplicationId())) {
        activitiesManager.addSchedulingActivityForApp(
            application.getApplicationId(),
            updatedContainer.getContainerId(),
            updatedContainer.getContainer().getPriority().getPriority(),
            activityState, ActivityDiagnosticConstant.EMPTY,
            ActivityLevel.NODE, nodeId,
            updatedContainer.getContainer().getAllocationRequestId());
      }
    }

    @SuppressWarnings("parameternumber")
    private static void recordSchedulerActivityAtNodeLevel(
        ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
        String requestName, Integer priority, Long allocationRequestId,
        String containerId, NodeId nodeId, ActivityState state,
        String diagnostic) {
      activitiesManager
          .addSchedulingActivityForNode(nodeId, requestName, containerId, null,
              state, diagnostic, ActivityLevel.NODE, null);
      // Record request level activity additionally.
      recordSchedulerActivityAtRequestLevel(activitiesManager, app, requestName,
          priority, allocationRequestId, nodeId, state,
          ActivityDiagnosticConstant.EMPTY);
    }

    @SuppressWarnings("parameternumber")
    private static void recordSchedulerActivityAtRequestLevel(
        ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
        String requestName, Integer priority, Long allocationRequestId,
        NodeId nodeId, ActivityState state, String diagnostic) {
      activitiesManager.addSchedulingActivityForNode(nodeId,
          app.getApplicationId().toString(), requestName, priority,
          state, diagnostic, ActivityLevel.REQUEST,
          allocationRequestId);
      // Record app level activity additionally.
      recordSchedulerActivityAtAppLevel(activitiesManager, app, nodeId, state,
          ActivityDiagnosticConstant.EMPTY);
    }

    private static void recordSchedulerActivityAtAppLevel(
        ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
        NodeId nodeId, ActivityState state, String diagnostic) {
      activitiesManager.addSchedulingActivityForNode(nodeId, app.getQueueName(),
          app.getApplicationId().toString(), app.getPriority().getPriority(),
          state, diagnostic, ActivityLevel.APP, null);
    }

    /*
     * Invoked when scheduler starts to look at this application within one node
     * update.
     */
    public static void startAppAllocationRecording(
        ActivitiesManager activitiesManager, FiCaSchedulerNode node,
        long currentTime,
        SchedulerApplicationAttempt application) {
      if (activitiesManager == null) {
        return;
      }
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      activitiesManager
          .startAppAllocationRecording(nodeId, currentTime,
              application);
    }

    /*
     * Invoked when scheduler finishes looking at this application within one
     * node update, and the app has any container allocated/reserved during
     * this allocation.
     */
    public static void finishAllocatedAppAllocationRecording(
        ActivitiesManager activitiesManager, ApplicationId applicationId,
        ContainerId containerId, ActivityState containerState,
        String diagnostic) {
      if (activitiesManager == null) {
        return;
      }

      if (activitiesManager.shouldRecordThisApp(applicationId)) {
        activitiesManager.finishAppAllocationRecording(applicationId,
            containerId, containerState, diagnostic);
      }
    }

    /*
     * Invoked when scheduler finishes looking at this application within one
     * node update, and the app DOESN'T have any container allocated/reserved
     * during this allocation.
     */
    public static void finishSkippedAppAllocationRecording(
        ActivitiesManager activitiesManager, ApplicationId applicationId,
        ActivityState containerState, String diagnostic) {
      finishAllocatedAppAllocationRecording(activitiesManager, applicationId,
          null, containerState, diagnostic);
    }
  }

  /**
   * Methods for recording activities from a queue
   */
  public static class QUEUE {
    /*
     * Record activities of a queue
     */
    public static void recordQueueActivity(ActivitiesManager activitiesManager,
        SchedulerNode node, String parentQueueName, String queueName,
        ActivityState state, String diagnostic) {
      recordQueueActivity(activitiesManager, node, parentQueueName, queueName,
          state, () -> diagnostic);
    }

    public static void recordQueueActivity(ActivitiesManager activitiesManager,
        SchedulerNode node, String parentQueueName, String queueName,
        ActivityState state, Supplier<String> diagnosticSupplier) {
      if (activitiesManager == null) {
        return;
      }
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      if (activitiesManager.shouldRecordThisNode(nodeId)) {
        recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
            null, state, diagnosticSupplier.get(), ActivityLevel.QUEUE);
      }
    }
  }

  /**
   * Methods for recording overall activities from one node update
   */
  public static class NODE {

    /*
     * Invoked when node allocation finishes, and there's NO container
     * allocated or reserved during the allocation
     */
    public static void finishSkippedNodeAllocation(
        ActivitiesManager activitiesManager, SchedulerNode node) {
      finishAllocatedNodeAllocation(activitiesManager, node, null,
          AllocationState.SKIPPED);
    }

    /*
     * Invoked when node allocation finishes, and there's any container
     * allocated or reserved during the allocation
     */
    public static void finishAllocatedNodeAllocation(
        ActivitiesManager activitiesManager, SchedulerNode node,
        ContainerId containerId, AllocationState containerState) {
      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
      if (nodeId == null) {
        return;
      }
      if (activitiesManager.shouldRecordThisNode(nodeId)) {
        activitiesManager.updateAllocationFinalState(nodeId,
            containerId, containerState);
      }
    }

    /*
     * Invoked when node heartbeat finishes
     */
    public static void finishNodeUpdateRecording(
        ActivitiesManager activitiesManager, NodeId nodeID, String partition) {
      if (activitiesManager == null) {
        return;
      }
      activitiesManager.finishNodeUpdateRecording(nodeID, partition);
    }

    /*
     * Invoked when node heartbeat starts
     */
    public static void startNodeUpdateRecording(
        ActivitiesManager activitiesManager, NodeId nodeID) {
      if (activitiesManager == null) {
        return;
      }
      activitiesManager.startNodeUpdateRecording(nodeID);
    }
  }

  // Add queue, application or container activity into specific node allocation.
  private static void recordActivity(ActivitiesManager activitiesManager,
      NodeId nodeId, String parentName, String childName, Priority priority,
      ActivityState state, String diagnostic, ActivityLevel level) {
    activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
        childName, priority != null ? priority.getPriority() : null, state,
        diagnostic, level, null);
  }

  private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
      SchedulerNode node) {
    return activitiesManager == null ? null :
        activitiesManager.getRecordingNodeId(node);
  }

  private static String getRequestName(Integer priority,
      Long allocationRequestId) {
    return "request_"
        + (priority == null ? "" : priority)
        + "_" + (allocationRequestId == null ? "" : allocationRequestId);
  }

  private static Integer getPriority(SchedulerRequestKey schedulerKey) {
    Priority priority = schedulerKey == null ?
        null : schedulerKey.getPriority();
    return priority == null ? null : priority.getPriority();
  }
}