TestLogAggregationIndexedFileController.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Function test for {@link LogAggregationIndexedFileController}.
 *
 */
public class TestLogAggregationIndexedFileController
      extends Configured {

  private final String rootLocalLogDir = "target/LocalLogs";
  private final Path rootLocalLogDirPath = new Path(rootLocalLogDir);
  private final String remoteLogDir = "target/remote-app";
  private static final FsPermission LOG_FILE_UMASK = FsPermission
      .createImmutable((short) (0777));
  private static final UserGroupInformation USER_UGI = UserGroupInformation
      .createRemoteUser("testUser");
  private static final String ZERO_FILE = "zero";
  private FileSystem fs;
  private ApplicationId appId;
  private ContainerId containerId;
  private NodeId nodeId;

  private ByteArrayOutputStream sysOutStream;

  private Configuration getTestConf() {
    Configuration conf = new Configuration();
    conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
            remoteLogDir);
    conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
            "logs");
    conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
    return conf;
  }

  @BeforeEach
  public void setUp() throws IOException {
    setConf(getTestConf());
    appId = ApplicationId.newInstance(123456, 1);
    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
        appId, 1);
    containerId = ContainerId.newContainerId(attemptId, 1);
    nodeId = NodeId.newInstance("localhost", 9999);
    fs = FileSystem.get(getConf());
    sysOutStream = new ByteArrayOutputStream();
    PrintStream sysOut =  new PrintStream(sysOutStream);
    System.setOut(sysOut);

    ByteArrayOutputStream sysErrStream = new ByteArrayOutputStream();
    PrintStream sysErr = new PrintStream(sysErrStream);
    System.setErr(sysErr);
  }

  @AfterEach
  public void teardown() throws Exception {
    fs.delete(rootLocalLogDirPath, true);
    fs.delete(new Path(remoteLogDir), true);
  }

  @Test
  @Timeout(15000)
  void testLogAggregationIndexFileFormat() throws Exception {
    if (fs.exists(rootLocalLogDirPath)) {
      fs.delete(rootLocalLogDirPath, true);
    }
    assertTrue(fs.mkdirs(rootLocalLogDirPath));

    Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
    if (fs.exists(appLogsDir)) {
      fs.delete(appLogsDir, true);
    }
    assertTrue(fs.mkdirs(appLogsDir));

    List<String> logTypes = new ArrayList<String>();
    logTypes.add("syslog");
    logTypes.add("stdout");
    logTypes.add("stderr");

    Set<File> files = new HashSet<>();

    LogKey key1 = new LogKey(containerId.toString());

    for (String logType : logTypes) {
      File file = createAndWriteLocalLogFile(containerId, appLogsDir,
          logType);
      files.add(file);
    }
    files.add(createZeroLocalLogFile(appLogsDir));

    LogValue value = mock(LogValue.class);
    when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);

    final ControlledClock clock = new ControlledClock();
    clock.setTime(System.currentTimeMillis());
    LogAggregationIndexedFileController fileFormat = new LogAggregationIndexedFileController() {
      private int rollOverCheck = 0;

      @Override
      public Clock getSystemClock() {
        return clock;
      }

      @Override
      public boolean isRollover(final FileContext fc, final Path candidate) throws IOException {
        rollOverCheck++;
        if (rollOverCheck >= 3) {
          return true;
        }
        return false;
      }
    };

    fileFormat.initialize(getConf(), "Indexed");

    Map<ApplicationAccessType, String> appAcls = new HashMap<>();
    Path appDir = fileFormat.getRemoteAppLogDir(appId,
        USER_UGI.getShortUserName());
    if (fs.exists(appDir)) {
      fs.delete(appDir, true);
    }
    assertTrue(fs.mkdirs(appDir));

    Path logPath = fileFormat.getRemoteNodeLogFileForApp(
        appId, USER_UGI.getShortUserName(), nodeId);
    LogAggregationFileControllerContext context =
        new LogAggregationFileControllerContext(
            logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
    // initialize the writer
    fileFormat.initializeWriter(context);

    fileFormat.write(key1, value);
    fileFormat.postWrite(context);
    fileFormat.closeWriter();

    ContainerLogsRequest logRequest = new ContainerLogsRequest();
    logRequest.setAppId(appId);
    logRequest.setNodeId(nodeId.toString());
    logRequest.setAppOwner(USER_UGI.getShortUserName());
    logRequest.setContainerId(containerId.toString());
    logRequest.setBytes(Long.MAX_VALUE);
    List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertEquals(1, meta.size());
    List<String> fileNames = new ArrayList<>();
    for (ContainerLogMeta log : meta) {
      assertEquals(containerId.toString(), log.getContainerId());
      assertEquals(nodeId.toString(), log.getNodeId());
      assertEquals(4, log.getContainerLogMeta().size());
      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
        fileNames.add(file.getFileName());
      }
    }
    fileNames.removeAll(logTypes);
    fileNames.remove(ZERO_FILE);
    assertTrue(fileNames.isEmpty());

    boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : logTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    assertZeroFileIsContained(sysOutStream.toString());
    sysOutStream.reset();

    Configuration factoryConf = new Configuration(getConf());
    factoryConf.set("yarn.log-aggregation.file-formats", "Indexed");
    factoryConf.set("yarn.log-aggregation.file-controller.Indexed.class",
        "org.apache.hadoop.yarn.logaggregation.filecontroller.ifile"
            + ".LogAggregationIndexedFileController");
    LogAggregationFileControllerFactory factory =
        new LogAggregationFileControllerFactory(factoryConf);
    LogAggregationFileController fileController = factory
        .getFileControllerForRead(appId, USER_UGI.getShortUserName());
    assertTrue(fileController instanceof
        LogAggregationIndexedFileController);
    foundLogs = fileController.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : logTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    sysOutStream.reset();

    // create a checksum file
    Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
            appId, USER_UGI.getShortUserName()),
        LogAggregationUtils.getNodeString(nodeId)
            + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
    FSDataOutputStream fInput = null;
    try {
      String nodeName = logPath.getName() + "_" + clock.getTime();
      fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
      fInput.writeInt(nodeName.length());
      fInput.write(nodeName.getBytes(
          StandardCharsets.UTF_8));
      fInput.writeLong(0);
    } finally {
      IOUtils.closeStream(fInput);
    }
    meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertTrue(meta.isEmpty());
    foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertFalse(foundLogs);
    sysOutStream.reset();
    fs.delete(checksumFile, false);
    assertFalse(fs.exists(checksumFile));

    List<String> newLogTypes = new ArrayList<>(logTypes);
    files.clear();
    newLogTypes.add("test1");
    files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
        "test1"));
    newLogTypes.add("test2");
    files.add(createAndWriteLocalLogFile(containerId, appLogsDir,
        "test2"));
    LogValue value2 = mock(LogValue.class);
    when(value2.getPendingLogFilesToUploadForThisContainer())
        .thenReturn(files);

    // initialize the writer
    fileFormat.initializeWriter(context);
    fileFormat.write(key1, value2);
    fileFormat.closeWriter();

    // We did not call postWriter which we would keep the checksum file.
    // We can only get the logs/logmeta from the first write.
    meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertThat(meta.size()).isEqualTo(1);
    for (ContainerLogMeta log : meta) {
      assertEquals(containerId.toString(), log.getContainerId());
      assertEquals(nodeId.toString(), log.getNodeId());
      assertEquals(4, log.getContainerLogMeta().size());
      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
        fileNames.add(file.getFileName());
      }
    }
    fileNames.removeAll(logTypes);
    fileNames.remove(ZERO_FILE);
    assertTrue(fileNames.isEmpty());
    foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : logTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    assertFalse(sysOutStream.toString().contains(logMessage(
        containerId, "test1")));
    assertFalse(sysOutStream.toString().contains(logMessage(
        containerId, "test2")));
    sysOutStream.reset();

    // Call postWrite and we should get all logs/logmetas for both
    // first write and second write
    fileFormat.initializeWriter(context);
    fileFormat.write(key1, value2);
    fileFormat.postWrite(context);
    fileFormat.closeWriter();
    meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertThat(meta.size()).isEqualTo(2);
    for (ContainerLogMeta log : meta) {
      assertEquals(containerId.toString(), log.getContainerId());
      assertEquals(nodeId.toString(), log.getNodeId());
      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
        fileNames.add(file.getFileName());
      }
    }
    fileNames.removeAll(newLogTypes);
    fileNames.remove(ZERO_FILE);
    assertTrue(fileNames.isEmpty());
    foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : newLogTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    sysOutStream.reset();

    // start to roll over old logs
    clock.setTime(System.currentTimeMillis());
    fileFormat.initializeWriter(context);
    fileFormat.write(key1, value2);
    fileFormat.postWrite(context);
    fileFormat.closeWriter();
    FileStatus[] status = fs.listStatus(logPath.getParent());
    assertEquals(2, status.length);
    meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertThat(meta.size()).isEqualTo(3);
    for (ContainerLogMeta log : meta) {
      assertEquals(containerId.toString(), log.getContainerId());
      assertEquals(nodeId.toString(), log.getNodeId());
      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
        fileNames.add(file.getFileName());
      }
    }
    fileNames.removeAll(newLogTypes);
    fileNames.remove(ZERO_FILE);
    assertTrue(fileNames.isEmpty());
    foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : newLogTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    sysOutStream.reset();
  }

  @Test
  @Timeout(15000)
  void testFetchApplicationLogsHar() throws Exception {
    List<String> newLogTypes = new ArrayList<>();
    newLogTypes.add("syslog");
    newLogTypes.add("stdout");
    newLogTypes.add("stderr");
    newLogTypes.add("test1");
    newLogTypes.add("test2");
    URL harUrl = ClassLoader.getSystemClassLoader()
        .getResource("application_123456_0001.har");
    assertNotNull(harUrl);

    Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
        + "/logs/application_123456_0001");
    if (fs.exists(path)) {
      fs.delete(path, true);
    }
    assertTrue(fs.mkdirs(path));
    Path harPath = new Path(path, "application_123456_0001.har");
    fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
    assertTrue(fs.exists(harPath));
    LogAggregationIndexedFileController fileFormat
        = new LogAggregationIndexedFileController();
    fileFormat.initialize(getConf(), "Indexed");
    ContainerLogsRequest logRequest = new ContainerLogsRequest();
    logRequest.setAppId(appId);
    logRequest.setNodeId(nodeId.toString());
    logRequest.setAppOwner(USER_UGI.getShortUserName());
    logRequest.setContainerId(containerId.toString());
    logRequest.setBytes(Long.MAX_VALUE);
    List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
        logRequest);
    assertEquals(3, meta.size());
    List<String> fileNames = new ArrayList<>();
    for (ContainerLogMeta log : meta) {
      assertEquals(containerId.toString(), log.getContainerId());
      assertEquals(nodeId.toString(), log.getNodeId());
      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
        fileNames.add(file.getFileName());
      }
    }
    fileNames.removeAll(newLogTypes);
    assertTrue(fileNames.isEmpty());
    boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
    assertTrue(foundLogs);
    for (String logType : newLogTypes) {
      assertTrue(sysOutStream.toString().contains(logMessage(
          containerId, logType)));
    }
    sysOutStream.reset();
  }

  private void assertZeroFileIsContained(String outStream) {
    assertTrue(outStream.contains(
        "LogContents:\n" +
        "\n" +
        "End of LogType:zero"));
  }

  private File createZeroLocalLogFile(Path localLogDir) throws IOException {
    return createAndWriteLocalLogFile(localLogDir, ZERO_FILE, "");
  }

  private File createAndWriteLocalLogFile(ContainerId containerId,
      Path localLogDir, String logType) throws IOException {
    return createAndWriteLocalLogFile(localLogDir, logType,
        logMessage(containerId, logType));
  }

  private File createAndWriteLocalLogFile(Path localLogDir, String logType,
      String message) throws IOException {
    File file = new File(localLogDir.toString(), logType);
    if (file.exists()) {
      file.delete();
    }
    file.createNewFile();
    Writer writer = null;
    try {
      writer = new FileWriter(file);
      writer.write(message);
      writer.close();
      return file;
    } finally {
      IOUtils.closeStream(writer);
    }
  }

  private String logMessage(ContainerId containerId, String logType) {
    return "Hello " + containerId + " in " + logType + "!";
  }

  @Test
  void testGetRollOverLogMaxSize() {
    String fileControllerName = "testController";
    String remoteDirConf = String.format(
        YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
        fileControllerName);
    Configuration conf = new Configuration();
    LogAggregationIndexedFileController fileFormat
        = new LogAggregationIndexedFileController();
    long defaultRolloverSize = 10L * 1024 * 1024 * 1024;

    // test local filesystem
    fileFormat.initialize(conf, fileControllerName);
    assertThat(fileFormat.getRollOverLogMaxSize(conf))
        .isEqualTo(defaultRolloverSize);

    // test file system supporting append
    conf.set(remoteDirConf, "webhdfs://localhost/path");
    fileFormat.initialize(conf, fileControllerName);
    assertThat(fileFormat.getRollOverLogMaxSize(conf))
        .isEqualTo(defaultRolloverSize);

    // test file system not supporting append
    conf.set(remoteDirConf, "s3a://test/path");
    fileFormat.initialize(conf, fileControllerName);
    assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero();
  }

  @Test
  void testGetLogMetaFilesOfNode() throws Exception {
    if (fs.exists(rootLocalLogDirPath)) {
      fs.delete(rootLocalLogDirPath, true);
    }
    assertTrue(fs.mkdirs(rootLocalLogDirPath));

    Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
    if (fs.exists(appLogsDir)) {
      fs.delete(appLogsDir, true);
    }
    assertTrue(fs.mkdirs(appLogsDir));

    List<String> logTypes = new ArrayList<String>();
    logTypes.add("syslog");
    logTypes.add("stdout");
    logTypes.add("stderr");

    Set<File> files = new HashSet<>();

    LogKey key1 = new LogKey(containerId.toString());

    for (String logType : logTypes) {
      File file = createAndWriteLocalLogFile(containerId, appLogsDir,
          logType);
      files.add(file);
    }
    files.add(createZeroLocalLogFile(appLogsDir));

    LogValue value = mock(LogValue.class);
    when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);

    LogAggregationIndexedFileController fileFormat =
        new LogAggregationIndexedFileController();

    fileFormat.initialize(getConf(), "Indexed");

    Map<ApplicationAccessType, String> appAcls = new HashMap<>();
    Path appDir = fileFormat.getRemoteAppLogDir(appId,
        USER_UGI.getShortUserName());
    if (fs.exists(appDir)) {
      fs.delete(appDir, true);
    }
    assertTrue(fs.mkdirs(appDir));

    Path logPath = fileFormat.getRemoteNodeLogFileForApp(
        appId, USER_UGI.getShortUserName(), nodeId);
    LogAggregationFileControllerContext context =
        new LogAggregationFileControllerContext(
            logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
    // initialize the writer
    fileFormat.initializeWriter(context);

    fileFormat.write(key1, value);
    fileFormat.postWrite(context);
    fileFormat.closeWriter();

    ContainerLogsRequest logRequest = new ContainerLogsRequest();
    logRequest.setAppId(appId);
    logRequest.setNodeId(nodeId.toString());
    logRequest.setAppOwner(USER_UGI.getShortUserName());
    logRequest.setContainerId(containerId.toString());
    logRequest.setBytes(Long.MAX_VALUE);
    // create a checksum file
    final ControlledClock clock = new ControlledClock();
    clock.setTime(System.currentTimeMillis());
    Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
            appId, USER_UGI.getShortUserName()),
        LogAggregationUtils.getNodeString(nodeId)
            + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
    FSDataOutputStream fInput = null;
    try {
      String nodeName = logPath.getName() + "_" + clock.getTime();
      fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
      fInput.writeInt(nodeName.length());
      fInput.write(nodeName.getBytes(
          StandardCharsets.UTF_8));
      fInput.writeLong(0);
    } finally {
      IOUtils.closeStream(fInput);
    }

    Path nodePath = LogAggregationUtils.getRemoteAppLogDir(
        fileFormat.getRemoteRootLogDir(), appId, USER_UGI.getShortUserName(),
        fileFormat.getRemoteRootLogDirSuffix());
    FileStatus[] nodes = fs.listStatus(nodePath);
    ExtendedLogMetaRequest req =
        new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder().build();
    for (FileStatus node : nodes) {
      Map<String, List<ContainerLogFileInfo>> metas =
          fileFormat.getLogMetaFilesOfNode(req, node, appId);

      if (node.getPath().getName().contains(
          LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX)) {
        assertTrue(metas.isEmpty(),
            "Checksum node files should not contain any logs");
      } else {
        assertFalse(metas.isEmpty(),
            "Non-checksum node files should contain log files");
        assertEquals(4, metas.values().stream().findFirst().get().size());
      }
    }

  }
}