ITestS3ACommitterMRJob.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit.integration;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.util.Sets;
import org.assertj.core.api.Assertions;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DurationInfo;

import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
import static org.apache.hadoop.mapred.JobConf.MAPRED_TASK_ENV;

/**
 * Test an MR Job with all the different committers.
 * <p>
 * This is a fairly complex parameterization: it is designed to
 * avoid the overhead of starting a Yarn cluster for
 * individual committer types, so speed up operations.
 * <p>
 * It also implicitly guarantees that there is never more than one of these
 * MR jobs active at a time, so avoids overloading the test machine with too
 * many processes.
 * How the binding works:
 * <ol>
 *   <li>
 *     Each parameterized suite is configured through its own
 *     {@link CommitterTestBinding} subclass.
 *   </li>
 *   <li>
 *     JUnit runs these test suites one parameterized binding at a time.
 *   </li>
 *   <li>
 *     The test suites are declared to be executed in ascending order, so
 *     that for a specific binding, the order is {@link #test_000()},
 *     {@link #test_100()} {@link #test_200_execute()} and finally
 *     {@link #test_500()}.
 *   </li>
 *   <li>
 *     {@link #test_000()} calls {@link CommitterTestBinding#validate()} to
 *     as to validate the state of the committer. This is primarily to
 *     verify that the binding setup mechanism is working.
 *   </li>
 *   <li>
 *     {@link #test_100()} is relayed to
 *     {@link CommitterTestBinding#test_100()},
 *     for any preflight tests.
 *   </li>
 *   <li>
 *     The {@link #test_200_execute()} test runs the MR job for that
 *     particular binding with standard reporting and verification of the
 *     outcome.
 *   </li>
 *   <li>
 *     {@link #test_500()} test is relayed to
 *     {@link CommitterTestBinding#test_500()}, for any post-MR-job tests.
 * </ol>
 *
 * A new S3A FileSystem instance is created for each test_ method, so the
 * pre-execute and post-execute validators cannot inspect the state of the
 * FS as part of their tests.
 * However, as the MR workers and AM all run in their own processes, there's
 * generally no useful information about the job in the local S3AFileSystem
 * instance.
 */
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {

  private static final Logger LOG =
      LoggerFactory.getLogger(ITestS3ACommitterMRJob.class);

  /**
   * Test array for parameterized test runs.
   *
   * @return the committer binding for this run.
   */
  @Parameterized.Parameters(name = "{0}")
  public static Collection<Object[]> params() {
    return Arrays.asList(new Object[][]{
        {new DirectoryCommitterTestBinding()},
        {new PartitionCommitterTestBinding()},
        {new MagicCommitterTestBinding()},
    });
  }

  /**
   * The committer binding for this instance.
   */
  private final CommitterTestBinding committerTestBinding;

  /**
   * Parameterized constructor.
   * @param committerTestBinding binding for the test.
   */
  public ITestS3ACommitterMRJob(
      final CommitterTestBinding committerTestBinding) {
    this.committerTestBinding = committerTestBinding;
  }

  @Override
  public void setup() throws Exception {
    super.setup();
    // configure the test binding for this specific test case.
    committerTestBinding.setup(getClusterBinding(), getFileSystem());
  }

  @Override
  protected Configuration createConfiguration() {
    Configuration conf = super.createConfiguration();
    disableFilesystemCaching(conf);
    return conf;
  }

  @Rule
  public final TemporaryFolder localFilesDir = new TemporaryFolder();

  @Override
  protected String committerName() {
    return committerTestBinding.getCommitterName();
  }

  /**
   * Verify that the committer binding is happy.
   */
  @Test
  public void test_000() throws Throwable {
    committerTestBinding.validate();

  }
  @Test
  public void test_100() throws Throwable {
    committerTestBinding.test_100();
  }

  @Test
  public void test_200_execute() throws Exception {
    describe("Run an MR with committer %s", committerName());

    S3AFileSystem fs = getFileSystem();
    // final dest is in S3A
    // we can't use the method name as the template places square braces into
    // that and URI creation fails.

    Path outputPath = path("ITestS3ACommitterMRJob-execute-"+ committerName());

    String commitUUID = UUID.randomUUID().toString();
    String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
    int numFiles = getTestFileCount();

    // create all the input files on the local FS.
    List<String> expectedFiles = new ArrayList<>(numFiles);
    Set<String> expectedKeys = Sets.newHashSet();
    for (int i = 0; i < numFiles; i += 1) {
      File file = localFilesDir.newFile(i + ".text");
      try (FileOutputStream out = new FileOutputStream(file)) {
        out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
      }
      String filename = String.format("part-m-%05d%s", i, suffix);
      Path path = new Path(outputPath, filename);
      expectedFiles.add(path.toString());
      expectedKeys.add("/" + fs.pathToKey(path));
    }
    Collections.sort(expectedFiles);

    Job mrJob = createJob(newJobConf());
    JobConf jobConf = (JobConf) mrJob.getConfiguration();

    mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
    FileOutputFormat.setOutputPath(mrJob, outputPath);

    File mockResultsFile = localFilesDir.newFile("committer.bin");
    mockResultsFile.delete();
    String committerPath = "file:" + mockResultsFile;
    jobConf.set("mock-results-file", committerPath);

    // setting up staging options is harmless for other committers
    jobConf.set(FS_S3A_COMMITTER_UUID, commitUUID);

    mrJob.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(mrJob,
        new Path(localFilesDir.getRoot().toURI()));

    mrJob.setMapperClass(MapClass.class);
    mrJob.setNumReduceTasks(0);

    // an attempt to set up log4j properly, which clearly doesn't work
    URL log4j = getClass().getClassLoader().getResource("log4j.properties");
    if (log4j != null && "file".equals(log4j.getProtocol())) {
      Path log4jPath = new Path(log4j.toURI());
      LOG.debug("Using log4j path {}", log4jPath);
      mrJob.addFileToClassPath(log4jPath);
      String sysprops = String.format("-Xmx128m -Dlog4j.configuration=%s",
          log4j);
      jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
      jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops);
      jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
    }

    applyCustomConfigOptions(jobConf);
    // fail fast if anything goes wrong
    mrJob.setMaxMapAttempts(1);

    try (DurationInfo ignore = new DurationInfo(LOG, "Job Submit")) {
      mrJob.submit();
    }
    String jobID = mrJob.getJobID().toString();
    String logLocation = "logs under "
        + getYarn().getTestWorkDir().getAbsolutePath();
    try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
      mrJob.waitForCompletion(true);
    }
    JobStatus status = mrJob.getStatus();
    if (!mrJob.isSuccessful()) {
      // failure of job.
      // be as meaningful as possible.
      String message =
          String.format("Job %s failed in state %s with cause %s.\n"
                  + "Consult %s",
              jobID,
              status.getState(),
              status.getFailureInfo(),
              logLocation);
      LOG.error(message);
      fail(message);
    }

    Path successPath = new Path(outputPath, _SUCCESS);
    SuccessData successData = validateSuccessFile(outputPath,
        committerName(),
        fs,
        "MR job " + jobID,
        1,
        "");
    String commitData = successData.toString();

    FileStatus[] results = fs.listStatus(outputPath,
        S3AUtils.HIDDEN_FILE_FILTER);
    int fileCount = results.length;
    Assertions.assertThat(fileCount)
        .describedAs("No files from job %s in output directory %s; see %s",
            jobID,
            outputPath,
            logLocation)
        .isNotEqualTo(0);

    List<String> actualFiles = Arrays.stream(results)
        .map(s -> s.getPath().toString())
        .sorted()
        .collect(Collectors.toList());

    Assertions.assertThat(actualFiles)
        .describedAs("Files found in %s", outputPath)
        .isEqualTo(expectedFiles);

    Assertions.assertThat(successData.getFilenames())
        .describedAs("Success files listed in %s:%s",
            successPath, commitData)
        .isNotEmpty()
        .containsExactlyInAnyOrderElementsOf(expectedKeys);

    assertPathDoesNotExist("temporary dir should only be from"
            + " classic file committers",
        new Path(outputPath, CommitConstants.TEMPORARY));
    customPostExecutionValidation(outputPath, successData, jobID);
  }

  @Override
  protected void applyCustomConfigOptions(final JobConf jobConf)
      throws IOException {
    jobConf.set(MAPRED_TASK_ENV, "AWS_REGION=" + jobConf.get(Constants.AWS_REGION));
    jobConf.set("yarn.app.mapreduce.am.env", "AWS_REGION=" + jobConf.get(Constants.AWS_REGION));
    committerTestBinding.applyCustomConfigOptions(jobConf);
  }

  @Override
  protected void customPostExecutionValidation(final Path destPath,
      final SuccessData successData, String jobId) throws Exception {
    committerTestBinding.validateResult(destPath, successData, jobId);
  }

  /**
   * This is the extra test which committer test bindings can add.
   */
  @Test
  public void test_500() throws Throwable {
    committerTestBinding.test_500();
  }

  /**
   *  Test Mapper.
   *  This is executed in separate process, and must not make any assumptions
   *  about external state.
   */
  public static class MapClass
      extends Mapper<LongWritable, Text, LongWritable, Text> {

    private int operations;

    private String id = "";

    private LongWritable l = new LongWritable();

    private Text t = new Text();

    @Override
    protected void setup(Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      // force in Log4J logging
      org.apache.log4j.BasicConfigurator.configure();
      // and pick up scale test flag as passed down
      boolean scaleMap = context.getConfiguration()
          .getBoolean(KEY_SCALE_TESTS_ENABLED, false);
      operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
      id = context.getTaskAttemptID().toString();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      for (int i = 0; i < operations; i++) {
        l.set(i);
        t.set(String.format("%s:%05d", id, i));
        context.write(l, t);
      }
    }
  }

  /**
   * A binding class for committer tests.
   * Subclasses of this will be instantiated and drive the parameterized
   * test suite.
   *
   * These classes will be instantiated in a static array of the suite, and
   * not bound to a cluster binding or filesystem.
   *
   * The per-method test {@link #setup()} method will call
   * {@link #setup(ClusterBinding, S3AFileSystem)}, to link the instance
   * to the specific test cluster <i>and test filesystem</i> in use
   * in that test.
   */
  private abstract static class CommitterTestBinding {

    /**
     * Name.
     */
    private final String committerName;

    /**
     * Cluster binding.
     */
    private ClusterBinding binding;

    /**
     * The S3A filesystem.
     */
    private S3AFileSystem remoteFS;

    /**
     * Constructor.
     * @param committerName name of the committer for messages.
     */
    protected CommitterTestBinding(final String committerName) {
      this.committerName = committerName;
    }

    /**
     * Set up the test binding: this is called during test setup.
     * @param cluster the active test cluster.
     * @param fs the S3A Filesystem used as a destination.
     */
    private void setup(
        ClusterBinding cluster,
        S3AFileSystem fs) {
      this.binding = cluster;
      this.remoteFS = fs;
    }

    protected String getCommitterName() {
      return committerName;
    }

    protected ClusterBinding getBinding() {
      return binding;
    }

    protected S3AFileSystem getRemoteFS() {
      return remoteFS;
    }

    protected FileSystem getClusterFS() throws IOException {
      return getBinding().getClusterFS();
    }

    @Override
    public String toString() {
      return committerName;
    }

    /**
     * Override point to let implementations tune the MR Job conf.
     * @param jobConf configuration
     */
    protected void applyCustomConfigOptions(JobConf jobConf)
        throws IOException {
    }

    /**
     * Override point for any committer specific validation operations;
     * called after the base assertions have all passed.
     * @param destPath destination of work
     * @param successData loaded success data
     * @throws Exception failure
     */
    protected void validateResult(Path destPath,
        SuccessData successData, String jobId)
        throws Exception {

    }

    /**
     * A test to run before the main {@link #test_200_execute()} test is
     * invoked.
     * @throws Throwable failure.
     */
    void test_100() throws Throwable {

    }

    /**
     * A test to run after the main {@link #test_200_execute()} test is
     * invoked.
     * @throws Throwable failure.
     */
    void test_500() throws Throwable {

    }

    /**
     * Validate the state of the binding.
     * This is called in {@link #test_000()} so will
     * fail independently of the other tests.
     * @throws Throwable failure.
     */
    public void validate() throws Throwable {
      assertNotNull("Not bound to a cluster", binding);
      assertNotNull("No cluster filesystem", getClusterFS());
      assertNotNull("No yarn cluster", binding.getYarn());
    }
  }

  /**
   * The directory staging committer.
   */
  private static final class DirectoryCommitterTestBinding
      extends CommitterTestBinding {

    private DirectoryCommitterTestBinding() {
      super(DirectoryStagingCommitter.NAME);
    }

    /**
     * Verify that staging commit dirs are made absolute under the user's
     * home directory, so, in a secure cluster, private.
     */
    @Override
    void test_100() throws Throwable {
      FileSystem fs = getClusterFS();
      Configuration conf = fs.getConf();
      String pri = "private";
      conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, pri);
      Path dir = getMultipartUploadCommitsDirectory(conf, "uuid");
      Assertions.assertThat(dir.isAbsolute())
          .describedAs("non-absolute path")
          .isTrue();
      String stagingTempDir = dir.toString().toLowerCase(Locale.ENGLISH);
      String self = UserGroupInformation.getCurrentUser()
          .getShortUserName().toLowerCase(Locale.ENGLISH);
      Assertions.assertThat(stagingTempDir)
          .describedAs("Staging committer temp path in cluster")
          .contains(pri + "/" + self)
          .endsWith("uuid/" + STAGING_UPLOADS);
    }
  }

  /**
   * The partition committer test binding.
   */
  private static final class PartitionCommitterTestBinding
      extends CommitterTestBinding {

    private PartitionCommitterTestBinding() {
      super(PartitionedStagingCommitter.NAME);
    }

  }

  /**
   * The magic committer test binding.
   * This includes extra result validation.
   */
  private static final class MagicCommitterTestBinding
      extends CommitterTestBinding {

    private MagicCommitterTestBinding() {
      super(MagicS3GuardCommitter.NAME);
    }

    /**
     * The result validation here is that there isn't a "MAGIC PATH" directory
     * any more.
     * @param destPath destination of work
     * @param successData loaded success data
     * @throws Exception failure
     */
    @Override
    protected void validateResult(final Path destPath,
        final SuccessData successData, final String jobId)
        throws Exception {
      Path magicDir = new Path(destPath, MAGIC_PATH_PREFIX + jobId);

      // if an FNFE isn't raised on getFileStatus, list out the directory
      // tree
      S3AFileSystem fs = getRemoteFS();
      // log the contents
      lsR(fs, destPath, true);
      // and look for the magic directory
      // HADOOP-16632 shows how partitioned/speculative tasks can leave
      // data here and it is not an error. So just log and continue
      try {
        final FileStatus st = fs.getFileStatus(magicDir);
        LOG.warn("Found magic dir which should"
            + " have been deleted at {}", st);
        applyLocatedFiles(fs.listFiles(magicDir, true),
            (status) -> LOG.warn("{}", status));
      } catch (FileNotFoundException ignored) {
        // expected
      }
    }
  }

}