LogAggregationTestcase.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.logaggregation.testutils;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService.LogDeletionTask;
import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;

import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.createDirBucketDirLogPathWithFileStatus;
import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.createDirLogPathWithFileStatus;
import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.createFileLogPathWithFileStatus;
import static org.apache.hadoop.yarn.logaggregation.testutils.FileStatusUtils.createPathWithFileStatusForAppId;
import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
import static org.apache.hadoop.yarn.logaggregation.testutils.MockRMClientUtils.createMockRMClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class LogAggregationTestcase {
  private static final Logger LOG = LoggerFactory.getLogger(LogAggregationTestcase.class);

  private final Configuration conf;
  private final long now;
  private PathWithFileStatus bucketDir;
  private final long bucketDirModTime;
  private PathWithFileStatus userDir;
  private final String userDirName;
  private final long userDirModTime;
  private PathWithFileStatus suffixDir;
  private final String suffix;
  private final String suffixDirName;
  private final long suffixDirModTime;
  private final String bucketId;
  private final Path remoteRootLogPath;
  private final Map<Integer, Exception> injectedAppDirDeletionExceptions;
  private final List<String> fileControllers;
  private final List<Pair<String, Long>> additionalAppDirs;

  private final List<ApplicationId> applicationIds = new ArrayList<>();
  private final int[] runningAppIds;
  private final int[] finishedAppIds;
  private final List<List<PathWithFileStatus>> appFiles = new ArrayList<>();
  private final FileSystem mockFs;
  private List<PathWithFileStatus> appDirs;
  private final List<AppDescriptor> appDescriptors;
  private AggregatedLogDeletionServiceForTest deletionService;
  private ApplicationClientProtocol rmClient;

  public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOException {
    conf = builder.conf;
    now = builder.now;
    bucketDir = builder.bucketDir;
    bucketDirModTime = builder.bucketDirModTime;
    userDir = builder.userDir;
    userDirName = builder.userDirName;
    userDirModTime = builder.userDirModTime;
    suffix = builder.suffix;
    suffixDir = builder.suffixDir;
    suffixDirName = builder.suffixDirName;
    suffixDirModTime = builder.suffixDirModTime;
    bucketId = builder.bucketId;
    appDescriptors = builder.apps;
    runningAppIds = builder.runningAppIds;
    finishedAppIds = builder.finishedAppIds;
    remoteRootLogPath = builder.remoteRootLogPath;
    injectedAppDirDeletionExceptions = builder.injectedAppDirDeletionExceptions;
    fileControllers = builder.fileControllers;
    additionalAppDirs = builder.additionalAppDirs;

    mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
    validateAppControllers();
    setupMocks();

    setupDeletionService();
  }

  private void validateAppControllers() {
    Set<String> controllers = appDescriptors.stream()
            .map(a -> a.fileController)
            .filter(Objects::nonNull)
            .collect(Collectors.toSet());
    Set<String> availableControllers = fileControllers != null ?
            new HashSet<>(this.fileControllers) : Sets.newHashSet();
    Set<String> difference = Sets.difference(controllers, availableControllers);
    if (!difference.isEmpty()) {
      throw new IllegalStateException(String.format("Invalid controller defined!" +
                      " Available: %s, Actual: %s", availableControllers, controllers));
    }
  }

  private void setupMocks() throws IOException {
    createApplicationsByDescriptors();

    List<Path> rootPaths = determineRootPaths();
    for (Path rootPath : rootPaths) {
      String controllerName = rootPath.getName();
      ApplicationId arbitraryAppIdForBucketDir = this.applicationIds.get(0);
      userDir = createDirLogPathWithFileStatus(rootPath, userDirName, userDirModTime);
      suffixDir = createDirLogPathWithFileStatus(userDir.path, suffixDirName, suffixDirModTime);
      if (bucketId != null) {
        bucketDir = createDirLogPathWithFileStatus(suffixDir.path, bucketId, bucketDirModTime);
      } else {
        bucketDir = createDirBucketDirLogPathWithFileStatus(rootPath, userDirName, suffix,
                arbitraryAppIdForBucketDir, bucketDirModTime);
      }
      setupListStatusForPath(rootPath, userDir);
      initFileSystemListings(controllerName);
    }
  }

  private List<Path> determineRootPaths() {
    List<Path> rootPaths = new ArrayList<>();
    if (fileControllers != null && !fileControllers.isEmpty()) {
      for (String fileController : fileControllers) {
        //Generic path: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
        // <application id>/<NodeManager id>

        //remoteRootLogPath: <remote-app-log-dir>/
        //example: mockfs://foo/tmp/logs/

        //userDir: <remote-app-log-dir>/<user>/
        //example: mockfs://foo/tmp/logs/me/

        //suffixDir: <remote-app-log-dir>/<user>/bucket-<suffix>/
        //example: mockfs://foo/tmp/logs/me/bucket-logs/

        //bucketDir: <remote-app-log-dir>/<user>/bucket-<suffix>/<bucket id>/
        //example: mockfs://foo/tmp/logs/me/bucket-logs/0001/

        //remoteRootLogPath with controller: <remote-app-log-dir>/<controllerName>
        //example: mockfs://foo/tmp/logs/IFile
        rootPaths.add(new Path(remoteRootLogPath, fileController));
      }
    } else {
      rootPaths.add(remoteRootLogPath);
    }
    return rootPaths;
  }

  private void initFileSystemListings(String controllerName) throws IOException {
    setupListStatusForPath(userDir, suffixDir);
    setupListStatusForPath(suffixDir, bucketDir);
    setupListStatusForPath(bucketDir, appDirs.stream()
            .filter(app -> app.path.toString().contains(controllerName))
            .map(app -> app.fileStatus)
            .toArray(FileStatus[]::new));

    for (Pair<String, Long> appDirPair : additionalAppDirs) {
      PathWithFileStatus appDir = createDirLogPathWithFileStatus(bucketDir.path,
              appDirPair.getLeft(), appDirPair.getRight());
      setupListStatusForPath(appDir, new FileStatus[] {});
    }
  }

  private void createApplicationsByDescriptors() throws IOException {
    int len = appDescriptors.size();
    appDirs = new ArrayList<>(len);

    for (int i = 0; i < len; i++) {
      AppDescriptor appDesc = appDescriptors.get(i);
      ApplicationId applicationId = appDesc.createApplicationId(now, i + 1);
      applicationIds.add(applicationId);
      Path basePath = this.remoteRootLogPath;
      if (appDesc.fileController != null) {
        basePath = new Path(basePath, appDesc.fileController);
      }

      PathWithFileStatus appDir = createPathWithFileStatusForAppId(
              basePath, applicationId, userDirName, suffix, appDesc.modTimeOfAppDir);
      LOG.debug("Created application with ID '{}' to path '{}'", applicationId, appDir.path);
      appDirs.add(appDir);
      addAppChildrenFiles(appDesc, appDir);
    }

    setupFsMocksForAppsAndChildrenFiles();

    for (Map.Entry<Integer, Exception> e : injectedAppDirDeletionExceptions.entrySet()) {
      when(mockFs.delete(this.appDirs.get(e.getKey()).path, true)).thenThrow(e.getValue());
    }
  }

  private void setupFsMocksForAppsAndChildrenFiles() throws IOException {
    for (int i = 0; i < appDirs.size(); i++) {
      List<PathWithFileStatus> appChildren = appFiles.get(i);
      Path appPath = appDirs.get(i).path;
      setupListStatusForPath(appPath,
              appChildren.stream()
                      .map(child -> child.fileStatus)
                      .toArray(FileStatus[]::new));
    }
  }

  private void setupListStatusForPath(Path dir, PathWithFileStatus pathWithFileStatus)
          throws IOException {
    setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
  }

  private void setupListStatusForPath(PathWithFileStatus dir, PathWithFileStatus pathWithFileStatus)
          throws IOException {
    setupListStatusForPath(dir, new FileStatus[]{pathWithFileStatus.fileStatus});
  }

  private void setupListStatusForPath(Path dir, FileStatus[] fileStatuses) throws IOException {
    LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir, fileStatuses);
    when(mockFs.listStatus(dir)).thenReturn(fileStatuses);
  }

  private void setupListStatusForPath(PathWithFileStatus dir, FileStatus[] fileStatuses)
          throws IOException {
    LOG.debug("Setting up listStatus. Parent: {}, files: {}", dir.path, fileStatuses);
    when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
  }

  private void setupDeletionService() {
    List<ApplicationId> finishedApps = createFinishedAppsList();
    List<ApplicationId> runningApps = createRunningAppsList();
    deletionService = new AggregatedLogDeletionServiceForTest(runningApps, finishedApps, conf);
  }

  public LogAggregationTestcase startDeletionService() {
    deletionService.init(conf);
    deletionService.start();
    return this;
  }

  private List<ApplicationId> createRunningAppsList() {
    List<ApplicationId> runningApps = new ArrayList<>();
    for (int i : runningAppIds) {
      ApplicationId appId = this.applicationIds.get(i - 1);
      runningApps.add(appId);
    }
    return runningApps;
  }

  private List<ApplicationId> createFinishedAppsList() {
    List<ApplicationId> finishedApps = new ArrayList<>();
    for (int i : finishedAppIds) {
      ApplicationId appId = this.applicationIds.get(i - 1);
      finishedApps.add(appId);
    }
    return finishedApps;
  }

  public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws Exception {
    List<ApplicationId> finishedApps = createFinishedAppsList();
    List<ApplicationId> runningApps = createRunningAppsList();
    rmClient = createMockRMClient(finishedApps, runningApps);
    List<LogDeletionTask> tasks = deletionService.createLogDeletionTasks(conf, retentionSeconds,
            rmClient);
    for (LogDeletionTask deletionTask : tasks) {
      deletionTask.run();
    }

    return this;
  }

  private void addAppChildrenFiles(AppDescriptor appDesc, PathWithFileStatus appDir) {
    List<PathWithFileStatus> appChildren = new ArrayList<>();
    for (Pair<String, Long> fileWithModDate : appDesc.filesWithModDate) {
      PathWithFileStatus appChildFile = createFileLogPathWithFileStatus(appDir.path,
              fileWithModDate.getLeft(),
              fileWithModDate.getRight());
      appChildren.add(appChildFile);
    }
    this.appFiles.add(appChildren);
  }

  public LogAggregationTestcase verifyAppDirsDeleted(long timeout, int... ids) throws IOException {
    for (int id : ids) {
      verifyAppDirDeleted(id, timeout);
    }
    return this;
  }

  public LogAggregationTestcase verifyAppDirsNotDeleted(long timeout, int... ids)
          throws IOException {
    for (int id : ids) {
      verifyAppDirNotDeleted(id, timeout);
    }
    return this;
  }

  public LogAggregationTestcase verifyAppDirDeleted(int id, long timeout) throws IOException {
    verifyAppDirDeletion(id, 1, timeout);
    return this;
  }

  public LogAggregationTestcase verifyAppDirNotDeleted(int id, long timeout) throws IOException {
    verifyAppDirDeletion(id, 0, timeout);
    return this;
  }

  public LogAggregationTestcase verifyAppFilesDeleted(long timeout,
                                                      List<Pair<Integer, Integer>> pairs)
          throws IOException {
    for (Pair<Integer, Integer> pair : pairs) {
      verifyAppFileDeleted(pair.getLeft(), pair.getRight(), timeout);
    }
    return this;
  }

  public LogAggregationTestcase verifyAppFilesNotDeleted(long timeout,
                                                         List<Pair<Integer, Integer>> pairs)
          throws IOException {
    for (Pair<Integer, Integer> pair : pairs) {
      verifyAppFileNotDeleted(pair.getLeft(), pair.getRight(), timeout);
    }
    return this;
  }

  public LogAggregationTestcase verifyAppFileDeleted(int id, int fileNo, long timeout)
          throws IOException {
    verifyAppFileDeletion(id, fileNo, 1, timeout);
    return this;
  }

  public LogAggregationTestcase verifyAppFileNotDeleted(int id, int fileNo, long timeout)
          throws IOException {
    verifyAppFileDeletion(id, fileNo, 0, timeout);
    return this;
  }

  private void verifyAppDirDeletion(int id, int times, long timeout) throws IOException {
    if (timeout == NO_TIMEOUT) {
      verify(mockFs, times(times)).delete(this.appDirs.get(id - 1).path, true);
    } else {
      verify(mockFs, timeout(timeout).times(times)).delete(this.appDirs.get(id - 1).path, true);
    }
  }

  private void verifyAppFileDeletion(int appId, int fileNo, int times, long timeout)
          throws IOException {
    List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
    PathWithFileStatus file = childrenFiles.get(fileNo - 1);
    verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
  }

  private void verifyMockRmClientWasClosedNTimes(int expectedRmClientCloses)
      throws IOException {
    ApplicationClientProtocol mockRMClient;
    if (deletionService != null) {
      mockRMClient = deletionService.getMockRMClient();
    } else {
      mockRMClient = rmClient;
    }
    verify((Closeable)mockRMClient, times(expectedRmClientCloses)).close();
  }

  public void teardown(int expectedRmClientCloses) throws IOException {
    deletionService.stop();
    verifyMockRmClientWasClosedNTimes(expectedRmClientCloses);
  }

  public LogAggregationTestcase refreshLogRetentionSettings() throws IOException {
    deletionService.refreshLogRetentionSettings();
    return this;
  }

  public AggregatedLogDeletionService getDeletionService() {
    return deletionService;
  }

  public LogAggregationTestcase verifyCheckIntervalMilliSecondsEqualTo(
          int checkIntervalMilliSeconds) {
    assertEquals(checkIntervalMilliSeconds, deletionService.getCheckIntervalMsecs());
    return this;
  }

  public LogAggregationTestcase verifyCheckIntervalMilliSecondsNotEqualTo(
          int checkIntervalMilliSeconds) {
    assertTrue(checkIntervalMilliSeconds != deletionService.getCheckIntervalMsecs());
    return this;
  }

  public LogAggregationTestcase verifyAnyPathListedAtLeast(int atLeast, long timeout)
          throws IOException {
    verify(mockFs, timeout(timeout).atLeast(atLeast)).listStatus(any(Path.class));
    return this;
  }

  public LogAggregationTestcase changeModTimeOfApp(int appId, long modTime) {
    PathWithFileStatus appDir = appDirs.get(appId - 1);
    appDir.changeModificationTime(modTime);
    return this;
  }

  public LogAggregationTestcase changeModTimeOfAppLogDir(int appId, int fileNo, long modTime) {
    List<PathWithFileStatus> childrenFiles = this.appFiles.get(appId - 1);
    PathWithFileStatus file = childrenFiles.get(fileNo - 1);
    file.changeModificationTime(modTime);
    return this;
  }

  public LogAggregationTestcase changeModTimeOfBucketDir(long modTime) {
    bucketDir.changeModificationTime(modTime);
    return this;
  }

  public LogAggregationTestcase reinitAllPaths() throws IOException {
    List<Path> rootPaths = determineRootPaths();
    for (Path rootPath : rootPaths) {
      String controllerName = rootPath.getName();
      initFileSystemListings(controllerName);
    }
    setupFsMocksForAppsAndChildrenFiles();
    return this;
  }

}