NMLogAggregationStatusTracker.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@link NMLogAggregationStatusTracker} is used to cache log aggregation
 * status for finished applications. It will also delete the old cached
 * log aggregation status periodically.
 *
 */
public class NMLogAggregationStatusTracker extends CompositeService {

  private static final Logger LOG =
       LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);

  private final ReadLock readLocker;
  private final WriteLock writeLocker;
  private final Context nmContext;
  private final long rollingInterval;
  private final Timer timer;
  private final Map<ApplicationId, AppLogAggregationStatusForRMRecovery>
      recoveryStatuses;
  private boolean disabled = false;

  public NMLogAggregationStatusTracker(Context context) {
    super(NMLogAggregationStatusTracker.class.getName());
    this.nmContext = context;
    Configuration conf = context.getConf();
    if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
      disabled = true;
    }
    this.recoveryStatuses = new ConcurrentHashMap<>();
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLocker = lock.readLock();
    this.writeLocker = lock.writeLock();
    this.timer = new Timer();
    long configuredRollingInterval = conf.getLong(
        YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
    if (configuredRollingInterval <= 0) {
      this.rollingInterval = YarnConfiguration
          .DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
      LOG.warn("The configured log-aggregation-status.time-out.ms is "
          + configuredRollingInterval + " which should be larger than 0. "
          + "Using the default value:" + this.rollingInterval + " instead.");
    } else {
      this.rollingInterval = configuredRollingInterval;
    }
    LOG.info("the rolling interval seconds for the NodeManager Cached Log "
        + "aggregation status is " + (rollingInterval/1000));
  }

  @Override
  protected void serviceStart() throws Exception {
    if (disabled) {
      LOG.warn("Log Aggregation is disabled."
          + "So is the LogAggregationStatusTracker.");
    } else {
      this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
          rollingInterval, rollingInterval);
    }
  }

  @Override
  public void serviceStop() throws Exception {
    this.timer.cancel();
  }

  public void updateLogAggregationStatus(ApplicationId appId,
      LogAggregationStatus logAggregationStatus, long updateTime,
      String diagnosis, boolean finalized) {
    if (disabled) {
      LOG.warn("The log aggregation is disabled. No need to update "
          + "the log aggregation status");
    }
    // In NM, each application has exactly one appLogAggregator thread
    // to handle the log aggregation. So, it is fine which multiple
    // appLogAggregator thread to update log aggregation status for its
    // own application. This is why we are using readLocker here.
    this.readLocker.lock();
    try {
      AppLogAggregationStatusForRMRecovery tracker = recoveryStatuses
          .get(appId);
      if (tracker == null) {
        Application application = this.nmContext.getApplications().get(appId);
        if (application == null) {
          LOG.warn("The application:" + appId + " has already finished,"
              + " and has been removed from NodeManager, we should not "
              + "receive the log aggregation status update for "
              + "this application.");
          return;
        }
        AppLogAggregationStatusForRMRecovery newTracker =
            new AppLogAggregationStatusForRMRecovery(logAggregationStatus,
                diagnosis);
        newTracker.setLastModifiedTime(updateTime);
        newTracker.setFinalized(finalized);
        recoveryStatuses.put(appId, newTracker);
      } else {
        if (tracker.isFinalized()) {
          LOG.warn("Ignore the log aggregation status update request "
              + "for the application:" + appId + ". The cached log aggregation "
              + "status is " + tracker.getLogAggregationStatus() + ".");
        } else {
          if (tracker.getLastModifiedTime() > updateTime) {
            LOG.warn("Ignore the log aggregation status update request "
                + "for the application:" + appId + ". The request log "
                + "aggregation status update is older than the cached "
                + "log aggregation status.");
          } else {
            tracker.setLogAggregationStatus(logAggregationStatus);
            tracker.setDiagnosis(diagnosis);
            tracker.setLastModifiedTime(updateTime);
            tracker.setFinalized(finalized);
            recoveryStatuses.put(appId, tracker);
          }
        }
      }
    } finally {
      this.readLocker.unlock();
    }
  }

  public List<LogAggregationReport> pullCachedLogAggregationReports() {
    List<LogAggregationReport> reports = new ArrayList<>();
    if (disabled) {
      LOG.warn("The log aggregation is disabled."
          + "There is no cached log aggregation status.");
      return reports;
    }
    // When we pull cached Log aggregation reports for all application in
    // this NM, we should make sure that we need to block all of the
    // updateLogAggregationStatus calls. So, the writeLocker is used here.
    this.writeLocker.lock();
    try {
      for(Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker :
          recoveryStatuses.entrySet()) {
        AppLogAggregationStatusForRMRecovery current = tracker.getValue();
        LogAggregationReport report = LogAggregationReport.newInstance(
            tracker.getKey(), current.getLogAggregationStatus(),
            current.getDiagnosis());
        reports.add(report);
      }
      return reports;
    } finally {
      this.writeLocker.unlock();
    }
  }

  private class LogAggregationStatusRoller extends TimerTask {
    @Override
    public void run() {
      rollLogAggregationStatus();
    }
  }

  private void rollLogAggregationStatus() {
    // When we call rollLogAggregationStatus, basically fetch all
    // cached log aggregation status and delete the out-of-timeout period
    // log aggregation status, we should block the rollLogAggregationStatus
    // calls as well as pullCachedLogAggregationReports call. So, the
    // writeLocker is used here.
    this.writeLocker.lock();
    try {
      long currentTimeStamp = System.currentTimeMillis();
      LOG.info("Rolling over the cached log aggregation status.");
      Iterator<Entry<ApplicationId, AppLogAggregationStatusForRMRecovery>> it
          = recoveryStatuses.entrySet().iterator();
      while (it.hasNext()) {
        Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker =
            it.next();
        // the application has finished.
        if (nmContext.getApplications().get(tracker.getKey()) == null) {
          if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
              > rollingInterval) {
            it.remove();
          }
        }
      }
    } finally {
      this.writeLocker.unlock();
    }
  }

  private static class AppLogAggregationStatusForRMRecovery {
    private LogAggregationStatus logAggregationStatus;
    private long lastModifiedTime;
    private boolean finalized;
    private String diagnosis;

    AppLogAggregationStatusForRMRecovery(
        LogAggregationStatus logAggregationStatus, String diagnosis) {
      this.setLogAggregationStatus(logAggregationStatus);
      this.setDiagnosis(diagnosis);
    }

    public LogAggregationStatus getLogAggregationStatus() {
      return logAggregationStatus;
    }

    public void setLogAggregationStatus(
        LogAggregationStatus logAggregationStatus) {
      this.logAggregationStatus = logAggregationStatus;
    }

    public long getLastModifiedTime() {
      return lastModifiedTime;
    }

    public void setLastModifiedTime(long lastModifiedTime) {
      this.lastModifiedTime = lastModifiedTime;
    }

    public boolean isFinalized() {
      return finalized;
    }

    public void setFinalized(boolean finalized) {
      this.finalized = finalized;
    }

    public String getDiagnosis() {
      return diagnosis;
    }

    public void setDiagnosis(String diagnosis) {
      this.diagnosis = diagnosis;
    }
  }
}