RMAppLogAggregation.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.rmapp;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

/**
 * Log aggregation logic used by RMApp.
 *
 */
public class RMAppLogAggregation {
  private final boolean logAggregationEnabled;
  private final ReadLock readLock;
  private final WriteLock writeLock;
  private long logAggregationStartTime = 0;
  private final long logAggregationStatusTimeout;
  private final Map<NodeId, LogAggregationReport> logAggregationStatus =
      new ConcurrentHashMap<>();
  private volatile LogAggregationStatus logAggregationStatusForAppReport;
  private int logAggregationSucceed = 0;
  private int logAggregationFailed = 0;
  private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
      new HashMap<>();
  private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
      new HashMap<>();
  private final int maxLogAggregationDiagnosticsInMemory;

  RMAppLogAggregation(Configuration conf, ReadLock readLock,
      WriteLock writeLock) {
    this.readLock = readLock;
    this.writeLock = writeLock;
    this.logAggregationStatusTimeout = getLogAggregationStatusTimeout(conf);
    this.logAggregationEnabled = getEnabledFlagFromConf(conf);
    this.logAggregationStatusForAppReport =
        this.logAggregationEnabled ? LogAggregationStatus.NOT_START :
            LogAggregationStatus.DISABLED;
    this.maxLogAggregationDiagnosticsInMemory =
        getMaxLogAggregationDiagnostics(conf);
  }

  private long getLogAggregationStatusTimeout(Configuration conf) {
    long statusTimeout =
        conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
            YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
    if (statusTimeout <= 0) {
      return YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
    } else {
      return statusTimeout;
    }
  }

  private boolean getEnabledFlagFromConf(Configuration conf) {
    return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
  }

  private int getMaxLogAggregationDiagnostics(Configuration conf) {
    return conf.getInt(
        YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
        YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
  }

  Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp(
      RMAppImpl rmApp) {
    this.readLock.lock();
    try {
      if (!isLogAggregationFinished() && RMAppImpl.isAppInFinalState(rmApp) &&
          rmApp.getSystemClock().getTime() > this.logAggregationStartTime
              + this.logAggregationStatusTimeout) {
        for (Map.Entry<NodeId, LogAggregationReport> output :
            logAggregationStatus.entrySet()) {
          if (!output.getValue().getLogAggregationStatus()
              .equals(LogAggregationStatus.TIME_OUT)
              && !output.getValue().getLogAggregationStatus()
              .equals(LogAggregationStatus.SUCCEEDED)
              && !output.getValue().getLogAggregationStatus()
              .equals(LogAggregationStatus.FAILED)) {
            output.getValue().setLogAggregationStatus(
                LogAggregationStatus.TIME_OUT);
          }
        }
      }
      return Collections.unmodifiableMap(logAggregationStatus);
    } finally {
      this.readLock.unlock();
    }
  }

  void aggregateLogReport(NodeId nodeId, LogAggregationReport report,
      RMAppImpl rmApp) {
    this.writeLock.lock();
    try {
      if (this.logAggregationEnabled && !isLogAggregationFinished()) {
        LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
        boolean stateChangedToFinal = false;
        if (curReport == null) {
          this.logAggregationStatus.put(nodeId, report);
          if (isLogAggregationFinishedForNM(report)) {
            stateChangedToFinal = true;
          }
        } else {
          if (isLogAggregationFinishedForNM(report)) {
            if (!isLogAggregationFinishedForNM(curReport)) {
              stateChangedToFinal = true;
            }
          }
          if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
              || curReport.getLogAggregationStatus() !=
              LogAggregationStatus.RUNNING_WITH_FAILURE) {
            if (curReport.getLogAggregationStatus()
                == LogAggregationStatus.TIME_OUT
                && report.getLogAggregationStatus()
                == LogAggregationStatus.RUNNING) {
              // If the log aggregation status got from latest NM heartbeat
              // is RUNNING, and current log aggregation status is TIME_OUT,
              // based on whether there are any failure messages for this NM,
              // we will reset the log aggregation status as RUNNING or
              // RUNNING_WITH_FAILURE
              if (isThereFailureMessageForNM(nodeId)) {
                report.setLogAggregationStatus(
                    LogAggregationStatus.RUNNING_WITH_FAILURE);
              }
            }
            curReport.setLogAggregationStatus(report
                .getLogAggregationStatus());
          }
        }
        updateLogAggregationDiagnosticMessages(nodeId, report);
        if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) {
          updateLogAggregationStatus(nodeId);
        }
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  public LogAggregationStatus getLogAggregationStatusForAppReport(
      RMAppImpl rmApp) {
    boolean appInFinalState = RMAppImpl.isAppInFinalState(rmApp);
    this.readLock.lock();
    try {
      if (!logAggregationEnabled) {
        return LogAggregationStatus.DISABLED;
      }
      if (isLogAggregationFinished()) {
        return this.logAggregationStatusForAppReport;
      }
      Map<NodeId, LogAggregationReport> reports =
          getLogAggregationReportsForApp(rmApp);
      if (reports.size() == 0) {
        return this.logAggregationStatusForAppReport;
      }
      int logNotStartCount = 0;
      int logCompletedCount = 0;
      int logTimeOutCount = 0;
      int logFailedCount = 0;
      int logRunningWithFailure = 0;
      for (Map.Entry<NodeId, LogAggregationReport> report :
          reports.entrySet()) {
        switch (report.getValue().getLogAggregationStatus()) {
          case NOT_START:
            logNotStartCount++;
            break;
          case RUNNING_WITH_FAILURE:
            logRunningWithFailure ++;
            break;
          case SUCCEEDED:
            logCompletedCount++;
            break;
          case FAILED:
            logFailedCount++;
            logCompletedCount++;
            break;
          case TIME_OUT:
            logTimeOutCount++;
            logCompletedCount++;
            break;
          default:
            break;
        }
      }
      if (logNotStartCount == reports.size()) {
        return LogAggregationStatus.NOT_START;
      } else if (logCompletedCount == reports.size()) {
        // We should satisfy two condition in order to return
        // SUCCEEDED or FAILED.
        // 1) make sure the application is in final state
        // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
        // The SUCCEEDED/FAILED status is the final status which means
        // the log aggregation is finished. And the log aggregation status will
        // not be updated anymore.
        if (logFailedCount > 0 && appInFinalState) {
          this.logAggregationStatusForAppReport =
              LogAggregationStatus.FAILED;
          return LogAggregationStatus.FAILED;
        } else if (logTimeOutCount > 0) {
          this.logAggregationStatusForAppReport =
              LogAggregationStatus.TIME_OUT;
          return LogAggregationStatus.TIME_OUT;
        }
        if (appInFinalState) {
          this.logAggregationStatusForAppReport =
              LogAggregationStatus.SUCCEEDED;
          return LogAggregationStatus.SUCCEEDED;
        }
      } else if (logRunningWithFailure > 0) {
        return LogAggregationStatus.RUNNING_WITH_FAILURE;
      }
      return LogAggregationStatus.RUNNING;
    } finally {
      this.readLock.unlock();
    }
  }

  private boolean isLogAggregationFinished() {
    return this.logAggregationStatusForAppReport
        .equals(LogAggregationStatus.SUCCEEDED)
        || this.logAggregationStatusForAppReport
        .equals(LogAggregationStatus.FAILED)
        || this.logAggregationStatusForAppReport
        .equals(LogAggregationStatus.TIME_OUT);

  }

  private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
    return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
        || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
  }

  private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
      LogAggregationReport report) {
    if (report.getDiagnosticMessage() != null
        && !report.getDiagnosticMessage().isEmpty()) {
      if (report.getLogAggregationStatus()
          == LogAggregationStatus.RUNNING ) {
        List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
        if (diagnostics == null) {
          diagnostics = new ArrayList<>();
          logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
        } else {
          if (diagnostics.size()
              == maxLogAggregationDiagnosticsInMemory) {
            diagnostics.remove(0);
          }
        }
        diagnostics.add(report.getDiagnosticMessage());
        this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
            StringUtils.join(diagnostics, "\n"));
      } else if (report.getLogAggregationStatus()
          == LogAggregationStatus.RUNNING_WITH_FAILURE) {
        List<String> failureMessages =
            logAggregationFailureMessagesForNMs.get(nodeId);
        if (failureMessages == null) {
          failureMessages = new ArrayList<>();
          logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
        } else {
          if (failureMessages.size()
              == maxLogAggregationDiagnosticsInMemory) {
            failureMessages.remove(0);
          }
        }
        failureMessages.add(report.getDiagnosticMessage());
      }
    }
  }

  private void updateLogAggregationStatus(NodeId nodeId) {
    LogAggregationStatus status =
        this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
    if (status.equals(LogAggregationStatus.SUCCEEDED)) {
      this.logAggregationSucceed++;
    } else if (status.equals(LogAggregationStatus.FAILED)) {
      this.logAggregationFailed++;
    }
    if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
      this.logAggregationStatusForAppReport =
          LogAggregationStatus.SUCCEEDED;
      // Since the log aggregation status for this application for all NMs
      // is SUCCEEDED, it means all logs are aggregated successfully.
      // We could remove all the cached log aggregation reports
      this.logAggregationStatus.clear();
      this.logAggregationDiagnosticsForNMs.clear();
      this.logAggregationFailureMessagesForNMs.clear();
    } else if (this.logAggregationSucceed + this.logAggregationFailed
        == this.logAggregationStatus.size()) {
      this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
      // We have collected the log aggregation status for all NMs.
      // The log aggregation status is FAILED which means the log
      // aggregation fails in some NMs. We are only interested in the
      // nodes where the log aggregation is failed. So we could remove
      // the log aggregation details for those succeeded NMs
      this.logAggregationStatus.entrySet().removeIf(entry ->
          entry.getValue().getLogAggregationStatus()
          .equals(LogAggregationStatus.SUCCEEDED));
      // the log aggregation has finished/failed.
      // and the status will not be updated anymore.
      this.logAggregationDiagnosticsForNMs.clear();
    }
  }

  String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
    this.readLock.lock();
    try {
      List<String> failureMessages =
          this.logAggregationFailureMessagesForNMs.get(nodeId);
      if (failureMessages == null || failureMessages.isEmpty()) {
        return StringUtils.EMPTY;
      }
      return StringUtils.join(failureMessages, "\n");
    } finally {
      this.readLock.unlock();
    }
  }

  void recordLogAggregationStartTime(long time) {
    logAggregationStartTime = time;
  }

  public boolean isEnabled() {
    return logAggregationEnabled;
  }

  private boolean hasReportForNodeManager(NodeId nodeId) {
    return logAggregationStatus.containsKey(nodeId);
  }

  private void addReportForNodeManager(NodeId nodeId,
      LogAggregationReport report) {
    logAggregationStatus.put(nodeId, report);
  }

  public boolean isFinished() {
    return isLogAggregationFinished();
  }

  private boolean isThereFailureMessageForNM(NodeId nodeId) {
    return logAggregationFailureMessagesForNMs.get(nodeId) != null
        && !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty();
  }

  long getLogAggregationStartTime() {
    return logAggregationStartTime;
  }

  void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) {
    if (!hasReportForNodeManager(nodeId)) {
      LogAggregationStatus status = isEnabled() ? LogAggregationStatus.NOT_START
          : LogAggregationStatus.DISABLED;
      addReportForNodeManager(nodeId,
          LogAggregationReport.newInstance(applicationId, status, ""));
    }
  }
}