TestFileSystemTimelineWriterImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.storage;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;

import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.buildEntityTypeSubpath;
import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;

public class TestFileSystemTimelineWriterImpl extends AbstractHadoopTestBase {
  private static final Logger LOG =
          LoggerFactory.getLogger(TestFileSystemTimelineWriterImpl.class);

  public static final String UP = ".." + File.separator;

  @TempDir
  private File tmpFolder;

  /**
   * Unit test for PoC YARN 3264.
   *
   * @throws Exception
   */
  @Test
  void testWriteEntityToFile() throws Exception {
    TimelineEntities te = new TimelineEntities();
    TimelineEntity entity = new TimelineEntity();
    String id = "hello";
    String type = "world";
    entity.setId(id);
    entity.setType(type);
    entity.setCreatedTime(1425016501000L);
    te.addEntity(entity);

    TimelineMetric metric = new TimelineMetric();
    String metricId = "CPU";
    metric.setId(metricId);
    metric.setType(TimelineMetric.Type.SINGLE_VALUE);
    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
    metric.addValue(1425016501000L, 1234567L);

    TimelineEntity entity2 = new TimelineEntity();
    String id2 = "metric";
    String type2 = "app";
    entity2.setId(id2);
    entity2.setType(type2);
    entity2.setCreatedTime(1425016503000L);
    entity2.addMetric(metric);
    te.addEntity(entity2);

    Map<String, TimelineMetric> aggregatedMetrics =
        new HashMap<>();
    aggregatedMetrics.put(metricId, metric);

    try (FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl()) {
      Configuration conf = new YarnConfiguration();
      String outputRoot = tmpFolder.getAbsolutePath();
      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
          outputRoot);
      fsi.init(conf);
      fsi.start();
      fsi.write(
          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
              "flow_version", 12345678L, "app_id"),
          te, UserGroupInformation.createRemoteUser("user_id"));

      String fileName = outputRoot + File.separator + "entities" +
          File.separator + "cluster_id" + File.separator + "user_id" +
          File.separator + "flow_name" + File.separator + "flow_version" +
          File.separator + "12345678" + File.separator + "app_id" +
          File.separator + type
          + File.separator + id +
          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
      List<String> data = readFromFile(FileSystem.get(conf), new Path(fileName), 2);
      // ensure there's only one entity + 1 new line
      Assertions.assertThat(data).hasSize(2);
      // confirm the contents same as what was written
      assertRecordMatches(data.get(0), entity);

      // verify aggregated metrics
      String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
          File.separator + "cluster_id" + File.separator + "user_id" +
          File.separator + "flow_name" + File.separator + "flow_version" +
          File.separator + "12345678" + File.separator + "app_id" +
          File.separator + type2
          + File.separator + id2 +
          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
      Path path2 = new Path(fileName2);
      List<String> data2 = readFromFile(FileSystem.get(conf), path2, 2);
      // ensure there's only one entity + 1 new line
      Assertions.assertThat(data).hasSize(2);
      // confirm the contents same as what was written
      assertRecordMatches(data2.get(0), entity2);
    }
  }

  /**
   * Assert a read in string matches the json value of the entity
   * @param d record
   * @param entity expected
   */
  private static void assertRecordMatches(final String d, final TimelineEntity entity)
      throws IOException {
    Assertions.assertThat(d)
        .isEqualTo(TimelineUtils.dumpTimelineRecordtoJSON(entity));
  }

  @Test
  void testWriteMultipleEntities() throws Exception {
    String id = "appId";
    String type = "app";

    TimelineEntities te1 = new TimelineEntities();
    TimelineEntity entity = new TimelineEntity();
    entity.setId(id);
    entity.setType(type);
    entity.setCreatedTime(1425016501000L);
    te1.addEntity(entity);

    TimelineEntities te2 = new TimelineEntities();
    TimelineEntity entity2 = new TimelineEntity();
    entity2.setId(id);
    entity2.setType(type);
    entity2.setCreatedTime(1425016503000L);
    te2.addEntity(entity2);

    try (FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl()) {
      Configuration conf = new YarnConfiguration();
      String outputRoot = tmpFolder.getAbsolutePath();
      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
          outputRoot);
      fsi.init(conf);
      fsi.start();
      fsi.write(
          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
              "flow_version", 12345678L, "app_id"),
          te1, UserGroupInformation.createRemoteUser("user_id"));
      fsi.write(
          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
              "flow_version", 12345678L, "app_id"),
          te2, UserGroupInformation.createRemoteUser("user_id"));

      String fileName = outputRoot + File.separator + "entities"
          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
          "flow_name" ,"flow_version" ,12345678, "app_id", type)
          + File.separator + id
          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
      Path path = new Path(fileName);
      FileSystem fs = FileSystem.get(conf);
      List<String> data = readFromFile(fs, path, 3);
      // confirm the contents same as what was written
      assertRecordMatches(data.get(0), entity);

      // confirm the contents same as what was written
      assertRecordMatches(data.get(1), entity2);
    }
  }

  @Test
  void testWriteEntitiesWithEmptyFlowName() throws Exception {
    String id = "appId";
    String type = "app";

    TimelineEntities te = new TimelineEntities();
    TimelineEntity entity = new TimelineEntity();
    entity.setId(id);
    entity.setType(type);
    entity.setCreatedTime(1425016501000L);
    te.addEntity(entity);

    try (FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl()) {
      Configuration conf = new YarnConfiguration();
      String outputRoot = tmpFolder.getAbsolutePath();
      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
          outputRoot);
      fsi.init(conf);
      fsi.start();
      fsi.write(
          new TimelineCollectorContext("cluster_id", "user_id", "",
              "flow_version", 12345678L, "app_id"),
          te, UserGroupInformation.createRemoteUser("user_id"));

      String fileName = outputRoot + File.separator + "entities"
          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
          "" ,"flow_version" ,12345678, "app_id", type)
          + File.separator + id
          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;

      List<String> data = readFromFile(FileSystem.get(conf), new Path(fileName), 2);
      // confirm the contents same as what was written
      assertRecordMatches(data.get(0), entity);
    }
  }

  /**
   * Stress test the escaping logic.
   */
  @Test
  void testWriteEntitiesWithEscaping() throws Exception {
    String id = UP + "appid";
    String type = UP + "type";

    TimelineEntities te = new TimelineEntities();
    TimelineEntity entity = new TimelineEntity();
    entity.setId(id);
    entity.setType(type);
    entity.setCreatedTime(1425016501000L);
    te.addEntity(entity);

    try (FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl()) {
      Configuration conf = new YarnConfiguration();
      String outputRoot = tmpFolder.getAbsolutePath();
      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
          outputRoot);
      fsi.init(conf);
      fsi.start();
      final String flowName = UP + "flow_name?";
      final String flowVersion = UP + "flow_version/";
      fsi.write(
          new TimelineCollectorContext("cluster_id", "user_id", flowName,
              flowVersion, 12345678L, "app_id"),
          te, UserGroupInformation.createRemoteUser("user_id"));

      String fileName = outputRoot + File.separator + "entities"
          + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
          flowName, flowVersion,12345678, "app_id", type)
          + File.separator + escape(id, "id")
          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;

      List<String> data = readFromFile(FileSystem.get(conf), new Path(fileName), 2);
      // confirm the contents same as what was written
      assertRecordMatches(data.get(0), entity);
    }
  }

  /**
   * Test escape downgrades file separators and inserts the fallback on a null input.
   */
  @Test
  public void testEscapingAndFallback() throws Throwable {
    Assertions.assertThat(escape("", "fallback"))
        .isEqualTo("fallback");
    Assertions.assertThat(escape(File.separator, "fallback"))
        .isEqualTo("_");
    Assertions.assertThat(escape("?:", ""))
        .isEqualTo("__");
  }

  /**
   * Read a file line by line, logging its name first and verifying it is actually a file.
   * Asserts the number of lines read is as expected.
   * @param fs fs
   * @param path path
   * @param entryCount number of entries expected.
   * @return a possibly empty list of lines
   * @throws IOException IO failure
   */
  private List<String> readFromFile(FileSystem fs, Path path, int entryCount)
          throws IOException {

    LOG.info("Reading file from {}", path);
    assertIsFile(fs, path);
    BufferedReader br = new BufferedReader(
            new InputStreamReader(fs.open(path)));
    List<String> data = new ArrayList<>();
    String line = br.readLine();
    data.add(line);
    while(line != null) {
      line = br.readLine();
      data.add(line);
    }
    Assertions.assertThat(data).hasSize(entryCount);
    return data;
  }

}