EntityCacheItem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.hadoop.yarn.server.timeline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * Cache item for timeline server v1.5 reader cache. Each cache item has a
 * TimelineStore that can be filled with data within one entity group.
 */
public class EntityCacheItem {
  private static final Logger LOG
      = LoggerFactory.getLogger(EntityCacheItem.class);

  private TimelineStore store;
  private TimelineEntityGroupId groupId;
  private EntityGroupFSTimelineStore.AppLogs appLogs;
  private long lastRefresh;
  private Configuration config;

  public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
    this.groupId = gId;
    this.config = config;
  }

  /**
   * @return The application log associated to this cache item, may be null.
   */
  public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
    return this.appLogs;
  }

  /**
   * Set the application logs to this cache item. The entity group should be
   * associated with this application.
   *
   * @param incomingAppLogs Application logs this cache item mapped to
   */
  public synchronized void setAppLogs(
      EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
    this.appLogs = incomingAppLogs;
  }

  /**
   * @return The timeline store, either loaded or unloaded, of this cache item.
   * This method will not hold the storage from being reclaimed.
   */
  public synchronized TimelineStore getStore() {
    return store;
  }

  /**
   * Refresh this cache item if it needs refresh. This will enforce an appLogs
   * rescan and then load new data. The refresh process is synchronized with
   * other operations on the same cache item.
   *
   * @param aclManager ACL manager for the timeline storage
   * @param metrics Metrics to trace the status of the entity group store
   * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
   *         object filled with all entities in the group.
   * @throws IOException
   */
  public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
      EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
    if (needRefresh()) {
      long startTime = Time.monotonicNow();
      // If an application is not finished, we only update summary logs (and put
      // new entities into summary storage).
      // Otherwise, since the application is done, we can update detail logs.
      if (!appLogs.isDone()) {
        appLogs.parseSummaryLogs();
      } else if (appLogs.getDetailLogs().isEmpty()) {
        appLogs.scanForLogs();
      }
      if (!appLogs.getDetailLogs().isEmpty()) {
        if (store == null) {
          store = ReflectionUtils.newInstance(config.getClass(
              YarnConfiguration
                  .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CACHE_STORE,
              MemoryTimelineStore.class, TimelineStore.class),
              config);
          store.init(config);
          store.start();
        } else {
          // Store is not null, the refresh is triggered by stale storage.
          metrics.incrCacheStaleRefreshes();
        }
        try (TimelineDataManager tdm =
                new TimelineDataManager(store, aclManager)) {
          tdm.init(config);
          tdm.start();
          // Load data from appLogs to tdm
          appLogs.loadDetailLog(tdm, groupId);
        }
      }
      updateRefreshTimeToNow();
      metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
    } else {
      LOG.debug("Cache new enough, skip refreshing");
      metrics.incrNoRefreshCacheRead();
    }
    return store;
  }

  /**
   * Force releasing the cache item for the given group id, even though there
   * may be active references.
   */
  public synchronized void forceRelease() {
    try {
      if (store != null) {
        store.close();
      }
    } catch (IOException e) {
      LOG.warn("Error closing timeline store", e);
    }
    store = null;
    // reset offsets so next time logs are re-parsed
    for (LogInfo log : appLogs.getDetailLogs()) {
      if (log.getFilename().contains(groupId.toString())) {
        log.setOffset(0);
      }
    }
    LOG.debug("Cache for group {} released. ", groupId);
  }

  private boolean needRefresh() {
    return (Time.monotonicNow() - lastRefresh > 10000);
  }

  private void updateRefreshTimeToNow() {
    this.lastRefresh = Time.monotonicNow();
  }
}