FileSystemTimelineReaderImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.storage;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *  File System based implementation for TimelineReader. This implementation may
 *  not provide a complete implementation of all the necessary features. This
 *  implementation is provided solely for basic testing purposes, and should not
 *  be used in a non-test situation.
 */
public class FileSystemTimelineReaderImpl extends AbstractService
    implements TimelineReader {

  private static final Logger LOG =
      LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class);

  private FileSystem fs;
  private Path rootPath;
  private Path entitiesPath;
  private static final String ENTITIES_DIR = "entities";

  /** Default extension for output files. */
  private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";

  @VisibleForTesting
  /** Default extension for output files. */
  static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";

  /** Config param for timeline service file system storage root. */
  public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";

  /** Default value for storage location on local disk. */
  private static final String STORAGE_DIR_ROOT = "timeline_service_data";

  private final CSVFormat csvFormat =
      CSVFormat.Builder.create().setHeader("APP", "USER", "FLOW", "FLOWRUN").build();

  public FileSystemTimelineReaderImpl() {
    super(FileSystemTimelineReaderImpl.class.getName());
  }

  @VisibleForTesting
  String getRootPath() {
    return rootPath.toString();
  }

  private static ObjectMapper mapper;

  static {
    mapper = new ObjectMapper();
    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
  }

  /**
   * Deserialize a POJO object from a JSON string.
   *
   * @param <T> Describes the type of class to be returned.
   * @param clazz class to be deserialized.
   * @param jsonString JSON string to deserialize.
   * @return An object based on class type. Used typically for
   *     <cite>TimelineEntity</cite> object.
   * @throws IOException if the underlying input source has problems during
   *     parsing.
   * @throws JsonMappingException  if parser has problems parsing content.
   * @throws JsonGenerationException if there is a problem in JSON writing.
   */
  public static <T> T getTimelineRecordFromJSON(
      String jsonString, Class<T> clazz)
      throws JsonGenerationException, JsonMappingException, IOException {
    return mapper.readValue(jsonString, clazz);
  }

  private static void fillFields(TimelineEntity finalEntity,
      TimelineEntity real, EnumSet<Field> fields) {
    if (fields.contains(Field.ALL)) {
      fields = EnumSet.allOf(Field.class);
    }
    for (Field field : fields) {
      switch(field) {
      case CONFIGS:
        finalEntity.setConfigs(real.getConfigs());
        break;
      case METRICS:
        finalEntity.setMetrics(real.getMetrics());
        break;
      case INFO:
        finalEntity.setInfo(real.getInfo());
        break;
      case IS_RELATED_TO:
        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
        break;
      case RELATES_TO:
        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
        break;
      case EVENTS:
        finalEntity.setEvents(real.getEvents());
        break;
      default:
        continue;
      }
    }
  }

  private String getFlowRunPath(String userId, String clusterId,
      String flowName, Long flowRunId, String appId) throws IOException {
    if (userId != null && flowName != null && flowRunId != null) {
      return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId;
    }
    if (clusterId == null || appId == null) {
      throw new IOException("Unable to get flow info");
    }
    Path clusterIdPath = new Path(entitiesPath, clusterId);
    Path appFlowMappingFilePath = new Path(clusterIdPath,
            APP_FLOW_MAPPING_FILE);
    try (BufferedReader reader =
             new BufferedReader(new InputStreamReader(
                 fs.open(appFlowMappingFilePath), StandardCharsets.UTF_8));
         CSVParser parser = new CSVParser(reader, csvFormat)) {
      for (CSVRecord record : parser.getRecords()) {
        if (record.size() < 4) {
          continue;
        }
        String applicationId = record.get("APP");
        if (applicationId != null && !applicationId.trim().isEmpty() &&
            !applicationId.trim().equals(appId)) {
          continue;
        }
        return record.get(1).trim() + File.separator + record.get(2).trim() +
            File.separator + "*" + File.separator + record.get(3).trim();
      }
      parser.close();
    }
    throw new IOException("Unable to get flow info");
  }

  private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
      EnumSet<Field> fieldsToRetrieve) {
    TimelineEntity entityToBeReturned = new TimelineEntity();
    entityToBeReturned.setIdentifier(entity.getIdentifier());
    entityToBeReturned.setCreatedTime(entity.getCreatedTime());
    if (fieldsToRetrieve != null) {
      fillFields(entityToBeReturned, entity, fieldsToRetrieve);
    }
    return entityToBeReturned;
  }

  private static boolean isTimeInRange(Long time, Long timeBegin,
      Long timeEnd) {
    return (time >= timeBegin) && (time <= timeEnd);
  }

  private static void mergeEntities(TimelineEntity entity1,
      TimelineEntity entity2) {
    // Ideally created time wont change except in the case of issue from client.
    if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) {
      entity1.setCreatedTime(entity2.getCreatedTime());
    }
    for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
      entity1.addConfig(configEntry.getKey(), configEntry.getValue());
    }
    for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
      entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
    }
    for (Entry<String, Set<String>> isRelatedToEntry :
        entity2.getIsRelatedToEntities().entrySet()) {
      String type = isRelatedToEntry.getKey();
      for (String entityId : isRelatedToEntry.getValue()) {
        entity1.addIsRelatedToEntity(type, entityId);
      }
    }
    for (Entry<String, Set<String>> relatesToEntry :
        entity2.getRelatesToEntities().entrySet()) {
      String type = relatesToEntry.getKey();
      for (String entityId : relatesToEntry.getValue()) {
        entity1.addRelatesToEntity(type, entityId);
      }
    }
    for (TimelineEvent event : entity2.getEvents()) {
      entity1.addEvent(event);
    }
    for (TimelineMetric metric2 : entity2.getMetrics()) {
      boolean found = false;
      for (TimelineMetric metric1 : entity1.getMetrics()) {
        if (metric1.getId().equals(metric2.getId())) {
          metric1.addValues(metric2.getValues());
          found = true;
          break;
        }
      }
      if (!found) {
        entity1.addMetric(metric2);
      }
    }
  }

  private static TimelineEntity readEntityFromFile(BufferedReader reader)
      throws IOException {
    TimelineEntity entity =
        getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
    String entityStr = "";
    while ((entityStr = reader.readLine()) != null) {
      if (entityStr.trim().isEmpty()) {
        continue;
      }
      TimelineEntity anotherEntity =
          getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
      if (!entity.getId().equals(anotherEntity.getId()) ||
          !entity.getType().equals(anotherEntity.getType())) {
        continue;
      }
      mergeEntities(entity, anotherEntity);
    }
    return entity;
  }

  private Set<TimelineEntity> getEntities(Path dir, String entityType,
      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
      throws IOException {
    // First sort the selected entities based on created/start time.
    Map<Long, Set<TimelineEntity>> sortedEntities =
        new TreeMap<>(
            new Comparator<Long>() {
            @Override
            public int compare(Long l1, Long l2) {
              return l2.compareTo(l1);
            }
          }
        );
    dir = getNormalPath(dir);
    if (dir != null) {
      RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
              false);
      if (fileStatuses != null) {
        while (fileStatuses.hasNext()) {
          LocatedFileStatus locatedFileStatus = fileStatuses.next();
          Path entityFile = locatedFileStatus.getPath();
          if (!entityFile.getName()
              .contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
            continue;
          }
          try (BufferedReader reader = new BufferedReader(
              new InputStreamReader(fs.open(entityFile),
                  StandardCharsets.UTF_8))) {
            TimelineEntity entity = readEntityFromFile(reader);
            if (!entity.getType().equals(entityType)) {
              continue;
            }
            if (!isTimeInRange(entity.getCreatedTime(),
                filters.getCreatedTimeBegin(),
                filters.getCreatedTimeEnd())) {
              continue;
            }
            if (filters.getRelatesTo() != null &&
                !filters.getRelatesTo().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchRelatesTo(entity,
                    filters.getRelatesTo())) {
              continue;
            }
            if (filters.getIsRelatedTo() != null &&
                !filters.getIsRelatedTo().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchIsRelatedTo(entity,
                    filters.getIsRelatedTo())) {
              continue;
            }
            if (filters.getInfoFilters() != null &&
                !filters.getInfoFilters().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchInfoFilters(entity,
                    filters.getInfoFilters())) {
              continue;
            }
            if (filters.getConfigFilters() != null &&
                !filters.getConfigFilters().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchConfigFilters(entity,
                    filters.getConfigFilters())) {
              continue;
            }
            if (filters.getMetricFilters() != null &&
                !filters.getMetricFilters().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchMetricFilters(entity,
                    filters.getMetricFilters())) {
              continue;
            }
            if (filters.getEventFilters() != null &&
                !filters.getEventFilters().getFilterList().isEmpty() &&
                !TimelineStorageUtils.matchEventFilters(entity,
                    filters.getEventFilters())) {
              continue;
            }
            TimelineEntity entityToBeReturned = createEntityToBeReturned(
                entity, dataToRetrieve.getFieldsToRetrieve());
            Set<TimelineEntity> entitiesCreatedAtSameTime =
                sortedEntities.get(entityToBeReturned.getCreatedTime());
            if (entitiesCreatedAtSameTime == null) {
              entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
            }
            entitiesCreatedAtSameTime.add(entityToBeReturned);
            sortedEntities.put(entityToBeReturned.getCreatedTime(),
                entitiesCreatedAtSameTime);
          }
        }
      }
    }

    Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
    long entitiesAdded = 0;
    for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
      for (TimelineEntity entity : entitySet) {
        entities.add(entity);
        ++entitiesAdded;
        if (entitiesAdded >= filters.getLimit()) {
          return entities;
        }
      }
    }
    return entities;
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
        conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
    rootPath = new Path(outputRoot);
    entitiesPath = new Path(rootPath, ENTITIES_DIR);
    fs = rootPath.getFileSystem(conf);
    super.serviceInit(conf);
  }

  @Override
  public TimelineEntity getEntity(TimelineReaderContext context,
      TimelineDataToRetrieve dataToRetrieve) throws IOException {
    String flowRunPathStr = getFlowRunPath(context.getUserId(),
        context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
        context.getAppId());
    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
    Path appIdPath = new Path(flowRunPath, context.getAppId());
    Path entityTypePath = new Path(appIdPath, context.getEntityType());
    Path entityFilePath = getNormalPath(new Path(entityTypePath,
        context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION));
    if (entityFilePath == null) {
      return null;
    }
    try (BufferedReader reader =
             new BufferedReader(new InputStreamReader(
                 fs.open(entityFilePath), StandardCharsets.UTF_8))) {
      TimelineEntity entity = readEntityFromFile(reader);
      return createEntityToBeReturned(
          entity, dataToRetrieve.getFieldsToRetrieve());
    } catch (FileNotFoundException e) {
      LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" +
          context.getEntityType() + "}. Will send HTTP 404 in response.");
      return null;
    }
  }

  private Path getNormalPath(Path globPath) throws IOException {
    FileStatus[] status = fs.globStatus(globPath);
    if (status == null || status.length < 1) {
      LOG.info("{} do not exist.", globPath);
      return null;
    }
    return status[0].getPath();
  }
  @Override
  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
      throws IOException {
    String flowRunPathStr = getFlowRunPath(context.getUserId(),
        context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
        context.getAppId());
    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
    Path appIdPath = new Path(flowRunPath, context.getAppId());
    Path entityTypePath = new Path(appIdPath, context.getEntityType());

    return getEntities(entityTypePath, context.getEntityType(), filters,
            dataToRetrieve);
  }

  @Override public Set<String> getEntityTypes(TimelineReaderContext context)
      throws IOException {
    Set<String> result = new TreeSet<>();
    String flowRunPathStr = getFlowRunPath(context.getUserId(),
        context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
        context.getAppId());
    if (context.getUserId() == null) {
      context.setUserId(new Path(flowRunPathStr).getParent().getParent().getParent().
          getName());
    }
    Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
    Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
    Path appIdPath = new Path(flowRunPath, context.getAppId());
    FileStatus[] fileStatuses = fs.listStatus(getNormalPath(appIdPath));
    for (FileStatus fileStatus : fileStatuses) {
      if (fileStatus.isDirectory()) {
        result.add(fileStatus.getPath().getName());
      }
    }
    return result;
  }

  @Override
  public TimelineHealth getHealthStatus() {
    try {
      fs.exists(rootPath);
    } catch (IOException e) {
      return new TimelineHealth(
          TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE,
          e.getMessage()
          );
    }
    return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
        "");
  }
}