TestContainerLogsUtils.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.logaggregation;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;

/**
 * This class contains several utility functions for log aggregation tests.
 * Any assertion libraries shouldn't be used here because this class is used by
 * multiple modules including MapReduce.
 */
public final class TestContainerLogsUtils {

  private TestContainerLogsUtils() {}

  /**
   * Utility function to create container log file and upload
   * it into remote file system.
   * @param conf the configuration
   * @param fs the FileSystem
   * @param rootLogDir the root log directory
   * @param appId the application id
   * @param containerToContent mapping between container id and its content
   * @param nodeId the nodeId
   * @param fileName the log file name
   * @param user the application user
   * @param deleteRemoteLogDir whether to delete remote log dir.
   * @throws IOException if we can not create log files locally
   *         or we can not upload container logs into RemoteFS.
   */
  public static void createContainerLogFileInRemoteFS(Configuration conf,
      FileSystem fs, String rootLogDir, ApplicationId appId,
      Map<ContainerId, String> containerToContent, NodeId nodeId,
      String fileName, String user, boolean deleteRemoteLogDir)
      throws Exception {
    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
    // create local logs
    List<String> rootLogDirList = new ArrayList<String>();
    rootLogDirList.add(rootLogDir);
    Path rootLogDirPath = new Path(rootLogDir);
    if (fs.exists(rootLogDirPath)) {
      fs.delete(rootLogDirPath, true);
    }
    fs.mkdirs(rootLogDirPath);
    // Make sure the target dir is created. If not, FileNotFoundException is thrown
    fs.getFileStatus(rootLogDirPath);
    Path appLogsDir = new Path(rootLogDirPath, appId.toString());
    if (fs.exists(appLogsDir)) {
      fs.delete(appLogsDir, true);
    }
    fs.mkdirs(appLogsDir);
    // Make sure the target dir is created. If not, FileNotFoundException is thrown
    fs.getFileStatus(appLogsDir);
    createContainerLogInLocalDir(appLogsDir, containerToContent, fs, fileName);
    // upload container logs to remote log dir

    LogAggregationFileControllerFactory factory =
        new LogAggregationFileControllerFactory(conf);
    LogAggregationFileController fileController =
        factory.getFileControllerForWrite();

    Path path = fileController.getRemoteAppLogDir(appId, user);

    if (fs.exists(path) && deleteRemoteLogDir) {
      fs.delete(path, true);
    }
    fs.mkdirs(path);
    // Make sure the target dir is created. If not, FileNotFoundException is thrown
    fs.getFileStatus(path);
    uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, appId,
        containerToContent.keySet(), path);
  }

  private static void createContainerLogInLocalDir(Path appLogsDir,
      Map<ContainerId, String> containerToContent, FileSystem fs,
      String fileName) throws IOException {
    for (Map.Entry<ContainerId, String> containerAndContent :
        containerToContent.entrySet()) {
      ContainerId containerId = containerAndContent.getKey();
      String content = containerAndContent.getValue();
      Path containerLogsDir = new Path(appLogsDir, containerId.toString());
      if (fs.exists(containerLogsDir)) {
        fs.delete(containerLogsDir, true);
      }
      fs.mkdirs(containerLogsDir);
      // Make sure the target dir is created. If not, FileNotFoundException is thrown
      fs.getFileStatus(containerLogsDir);
      Writer writer =
          new FileWriter(new File(containerLogsDir.toString(), fileName));
      writer.write(content);
      writer.close();
    }
  }

  private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
      Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
      ApplicationId appId, Iterable<ContainerId> containerIds, Path appDir)
      throws Exception {
    Path path =
        new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
    LogAggregationFileControllerFactory factory
        = new LogAggregationFileControllerFactory(configuration);
    LogAggregationFileController fileController = factory
        .getFileControllerForWrite();
    try {
      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
      LogAggregationFileControllerContext context
          = new LogAggregationFileControllerContext(
              path, path, true, 1000,
              appId, appAcls, nodeId, ugi);
      fileController.initializeWriter(context);
      for (ContainerId containerId : containerIds) {
        fileController.write(new AggregatedLogFormat.LogKey(containerId),
            new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
                ugi.getShortUserName()));
      }
    } finally {
      fileController.closeWriter();
    }
  }
}