TestAppLogAggregatorImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationDFSException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
 * Unit tests of AppLogAggregatorImpl class.
 */
public class TestAppLogAggregatorImpl {

  private static final File LOCAL_LOG_DIR = new File("target",
      TestAppLogAggregatorImpl.class.getName() + "-localLogDir");
  private static final File REMOTE_LOG_FILE = new File("target",
      TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile");

  @BeforeEach
  public void setUp() throws IOException {
    if(LOCAL_LOG_DIR.exists()) {
      FileUtils.cleanDirectory(LOCAL_LOG_DIR);
    }
    if(REMOTE_LOG_FILE.exists()) {
      FileUtils.cleanDirectory(REMOTE_LOG_FILE);
    }
  }

  @AfterEach
  public void cleanUp() throws IOException {
    FileUtils.deleteDirectory(LOCAL_LOG_DIR);
    FileUtils.deleteQuietly(REMOTE_LOG_FILE);
  }

  @Test
  public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles()
      throws Exception {
    final ApplicationId applicationId =
        ApplicationId.newInstance(System.currentTimeMillis(), 0);
    final ApplicationAttemptId attemptId =
        ApplicationAttemptId.newInstance(applicationId, 0);
    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);

    // create artificial log files
    final File appLogDir = new File(LOCAL_LOG_DIR, applicationId.toString());
    final File containerLogDir = new File(appLogDir, containerId.toString());
    containerLogDir.mkdirs();
    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);

    final long logRetentionSecs = 10000;
    final long recoveredLogInitedTime = -1;

    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
        applicationId, containerId, logRetentionSecs,
        recoveredLogInitedTime, logFiles, logFiles);
  }

  @Test
  public void testAggregatorWhenNoFileOlderThanRetentionPolicyShouldUploadAll()
      throws IOException {

    final ApplicationId applicationId =
        ApplicationId.newInstance(System.currentTimeMillis(), 0);
    final ApplicationAttemptId attemptId =
        ApplicationAttemptId.newInstance(applicationId, 0);
    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);

    // create artificial log files
    final File appLogDir = new File(LOCAL_LOG_DIR,
        applicationId.toString());
    final File containerLogDir = new File(appLogDir,
        containerId.toString());
    containerLogDir.mkdirs();
    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);

    // set log retention period to 1 week.
    final long logRententionSec = 7 * 24 * 60 * 60;
    final long recoveredLogInitedTimeMillis =
        System.currentTimeMillis() - 60*60;

    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(applicationId,
        containerId, logRententionSec, recoveredLogInitedTimeMillis,
        logFiles, logFiles);
  }

  @Test
  public void testAggregatorWhenAllFilesOlderThanRetentionShouldUploadNone()
      throws IOException {

    final ApplicationId applicationId =
        ApplicationId.newInstance(System.currentTimeMillis(), 0);
    final ApplicationAttemptId attemptId =
        ApplicationAttemptId.newInstance(applicationId, 0);
    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);

    // create artificial log files
    final File appLogDir = new File(LOCAL_LOG_DIR,
        applicationId.toString());
    final File containerLogDir = new File(appLogDir,
        containerId.toString());
    containerLogDir.mkdirs();
    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);


    final long week = 7 * 24 * 60 * 60;
    final long recoveredLogInitedTimeMillis = System.currentTimeMillis() -
        2 * week * 1000;
    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
        applicationId, containerId, week, recoveredLogInitedTimeMillis,
        logFiles, new HashSet<File>());
  }

  /**
   * Create the given number of log files under the container log directory.
   * @param containerLogDir the directory to create container log files
   * @param numOfFiles  the number of log files to create
   * @return the set of log files created
   */
  private static Set<File> createContainerLogFiles(File containerLogDir,
      int numOfFiles) throws IOException {
    assert(numOfFiles >= 0);
    assert(containerLogDir.exists());

    Set<File> containerLogFiles = new HashSet<>();
    for(int i = 0; i < numOfFiles; i++) {
      final File logFile = new File(containerLogDir, "logfile" + i);
      logFile.createNewFile();
      containerLogFiles.add(logFile);
    }
    return containerLogFiles;
  }

  /**
   * Verify if the application log aggregator, configured with given log
   * retention period and the recovered log initialization time of
   * the application, uploads and deletes the set of log files as expected.
   * @param appId    application id
   * @param containerId  container id
   * @param logRetentionSecs log retention period
   * @param recoveredLogInitedTimeMillis recovered log initialization time
   * @param expectedFilesToDelete   the set of files expected to be deleted
   * @param expectedFilesToUpload  the set of files expected to be uploaded.
   */
  public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
      ApplicationId appId, ContainerId containerId, long logRetentionSecs,
      long recoveredLogInitedTimeMillis, Set<File> expectedFilesToDelete,
      Set<File> expectedFilesToUpload) throws IOException {

    final Set<String> filesExpected2Delete = new HashSet<>();
    for(File file: expectedFilesToDelete) {
      filesExpected2Delete.add(file.getAbsolutePath());
    }
    final Set<String> filesExpected2Upload = new HashSet<>();
    for(File file: expectedFilesToUpload) {
      filesExpected2Upload.add(file.getAbsolutePath());
    }

    // deletion service with verification to check files to delete
    DeletionService deletionServiceWithExpectedFiles =
        createDeletionServiceWithExpectedFile2Delete(filesExpected2Delete);

    final YarnConfiguration config = new YarnConfiguration();
    config.setLong(
        YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);

    LogAggregationTFileController format = spy(
        new LogAggregationTFileController());
    format.initialize(config, "TFile");

    Context context = createContext(config);
    final AppLogAggregatorInTest appLogAggregator =
        createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
            config, context, recoveredLogInitedTimeMillis,
            deletionServiceWithExpectedFiles, format);
    appLogAggregator.startContainerLogAggregation(
        new ContainerLogContext(containerId, ContainerType.TASK, 0));
    // set app finished flag first
    appLogAggregator.finishLogAggregation();
    appLogAggregator.run();

    // verify uploaded files
    ArgumentCaptor<LogValue> logValCaptor =
        ArgumentCaptor.forClass(LogValue.class);
    verify(appLogAggregator.getLogAggregationFileController()).write(
        any(LogKey.class), logValCaptor.capture());
    Set<String> filesUploaded = new HashSet<>();
    LogValue logValue = logValCaptor.getValue();
    for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
      filesUploaded.add(file.getAbsolutePath());
    }
    verifyFilesUploaded(filesUploaded , filesExpected2Upload);
  }


  private static void verifyFilesUploaded(Set<String> filesUploaded,
      Set<String> filesExpected) {
    final String errMsgPrefix = "The set of files uploaded are not the same " +
        "as expected";
    if(filesUploaded.size() != filesExpected.size()) {
      fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " +
          "expected size: " + filesExpected.size());
    }
    for(String file: filesExpected) {
      if(!filesUploaded.contains(file)) {
        fail(errMsgPrefix + ": expecting " + file);
      }
    }
  }

  private static AppLogAggregatorInTest createAppLogAggregator(
      ApplicationId applicationId, String rootLogDir,
      YarnConfiguration config, Context context,
      long recoveredLogInitedTimeMillis,
      DeletionService deletionServiceWithFilesToExpect,
      LogAggregationTFileController tFileController)
      throws IOException {

    final Dispatcher dispatcher = createNullDispatcher();
    final NodeId nodeId = NodeId.newInstance("localhost", 0);
    final String userId = "AppLogAggregatorTest";
    final UserGroupInformation ugi =
        UserGroupInformation.createRemoteUser(userId);
    final LocalDirsHandlerService dirsService =
        createLocalDirsHandlerService(config, rootLogDir);
    final DeletionService deletionService = deletionServiceWithFilesToExpect;
    final LogAggregationContext logAggregationContext = null;
    final Map<ApplicationAccessType, String> appAcls = new HashMap<>();

    final FileContext fakeLfs = mock(FileContext.class);
    final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
    return new AppLogAggregatorInTest(dispatcher, deletionService,
        config, applicationId, ugi, nodeId, dirsService,
        remoteLogDirForApp, appAcls, logAggregationContext,
        context, fakeLfs, recoveredLogInitedTimeMillis, tFileController);
  }

  /**
   * Create a deletionService that verifies the paths of container log files
   * passed to the delete method of DeletionService by AppLogAggregatorImpl.
   * This approach is taken due to lack of support of varargs captor in the
   * current mockito version 1.8.5 (The support is added in 1.10.x).
   **/
  private static DeletionService createDeletionServiceWithExpectedFile2Delete(
      final Set<String> expectedPathsForDeletion) {
    DeletionService deletionServiceWithExpectedFiles = mock(DeletionService
        .class);
    // verify paths passed to first invocation of delete method against
    // expected paths
    doAnswer(new Answer<Void>() {
        @Override
        public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
          Set<String> paths = new HashSet<>();
          Object[] tasks = invocationOnMock.getArguments();
          for(int i = 0; i < tasks.length; i++) {
            FileDeletionTask task = (FileDeletionTask) tasks[i];
            for (Path path: task.getBaseDirs()) {
              paths.add(new File(path.toUri().getRawPath()).getAbsolutePath());
            }
          }
          verifyFilesToDelete(expectedPathsForDeletion, paths);
          return null;
        }
      }).doNothing().when(deletionServiceWithExpectedFiles).delete(
          any(FileDeletionTask.class));

    return deletionServiceWithExpectedFiles;
  }

  private static void verifyFilesToDelete(Set<String> files2ToDelete,
      Set<String> filesExpected) {
    final String errMsgPrefix = "The set of paths for deletion are not the " +
        "same as expected";
    if(files2ToDelete.size() != filesExpected.size()) {
      fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " +
          "expected size: " + filesExpected.size());
    }
    for(String file: filesExpected) {
      if(!files2ToDelete.contains(file)) {
        fail(errMsgPrefix + ": expecting " + file);
      }
    }
  }

  private static Dispatcher createNullDispatcher() {
    return new Dispatcher() {
      @Override
      public EventHandler<Event> getEventHandler() {
        return new EventHandler<Event>() {
          @Override
          public void handle(Event event) {
            // do nothing
          }
        };
      }

      @Override
      public void register(Class<? extends Enum> eventType,
          EventHandler handler) {
        // do nothing
      }
    };
  }

  private static LocalDirsHandlerService createLocalDirsHandlerService(
      YarnConfiguration conf, final String rootLogDir) {
    LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() {
      @Override
      public List<String> getLogDirsForRead() {
        return new ArrayList<String>() {
          {
            add(rootLogDir);
          }
        };
      }
      @Override
      public List<String> getLogDirsForCleanup() {
        return new ArrayList<String>() {
          {
            add(rootLogDir);
          }
        };
      }
    };

    dirsHandlerService.init(conf);
    // appLogAggregator only calls LocalDirsHandlerServer for local directories
    // so it is ok to not start the service.
    return dirsHandlerService;
  }

  private static Context createContext(YarnConfiguration conf) {
    return new NodeManager.NMContext(
        new NMContainerTokenSecretManager(conf),
        new NMTokenSecretManagerInNM(),
        null,
        new ApplicationACLsManager(conf),
        new NMNullStateStoreService(), false, conf);
  }

  private static final class AppLogAggregatorInTest extends
      AppLogAggregatorImpl {

    final DeletionService deletionService;
    final ApplicationId applicationId;
    final ArgumentCaptor<LogValue> logValue;

    public AppLogAggregatorInTest(Dispatcher dispatcher,
        DeletionService deletionService, Configuration conf,
        ApplicationId appId, UserGroupInformation ugi, NodeId nodeId,
        LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
        Map<ApplicationAccessType, String> appAcls,
        LogAggregationContext logAggregationContext, Context context,
        FileContext lfs, long recoveredLogInitedTime,
        LogAggregationTFileController format) throws IOException {
      super(dispatcher, deletionService, conf, appId, ugi, nodeId,
          dirsHandler, remoteNodeLogFileForApp, appAcls,
          logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
          format);
      this.applicationId = appId;
      this.deletionService = deletionService;
      this.logValue = ArgumentCaptor.forClass(LogValue.class);
    }
  }

  @Test
  public void testDFSQuotaExceeded() throws Exception {

    // the expectation is that no log files are deleted if the quota has
    // been exceeded, since that would result in loss of logs
    DeletionService deletionServiceWithExpectedFiles =
        createDeletionServiceWithExpectedFile2Delete(Collections.emptySet());

    final YarnConfiguration config = new YarnConfiguration();

    ApplicationId appId = ApplicationId.newInstance(1357543L, 1);

    // we need a LogAggregationTFileController that throws a
    // LogAggregationDFSException
    LogAggregationTFileController format =
        Mockito.mock(LogAggregationTFileController.class);
    Mockito.doThrow(new LogAggregationDFSException())
        .when(format).closeWriter();

    NodeManager.NMContext context = (NMContext) createContext(config);
    context.setNMLogAggregationStatusTracker(
        Mockito.mock(NMLogAggregationStatusTracker.class));

    final AppLogAggregatorInTest appLogAggregator =
        createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
            config, context, 1000L, deletionServiceWithExpectedFiles, format);

    appLogAggregator.startContainerLogAggregation(
        new ContainerLogContext(
            ContainerId.newContainerId(
                ApplicationAttemptId.newInstance(appId, 0), 0),
            ContainerType.TASK, 0));
    // set app finished flag first
    appLogAggregator.finishLogAggregation();
    appLogAggregator.run();

    // verify that no files have been uploaded
    ArgumentCaptor<LogValue> logValCaptor =
        ArgumentCaptor.forClass(LogValue.class);
    verify(appLogAggregator.getLogAggregationFileController()).write(
        any(LogKey.class), logValCaptor.capture());
    Set<String> filesUploaded = new HashSet<>();
    LogValue logValue = logValCaptor.getValue();
    for (File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
      filesUploaded.add(file.getAbsolutePath());
    }
    verifyFilesUploaded(filesUploaded, Collections.emptySet());
  }
}