TestJobOutputCommitter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output;

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;

/**
 * A JUnit test to test Map-Reduce job committer.
 */
public class TestJobOutputCommitter extends HadoopTestCase {

  public TestJobOutputCommitter() throws IOException {
    super(CLUSTER_MR, LOCAL_FS, 1, 1);
  }

  private static String TEST_ROOT_DIR = new File(System.getProperty(
      "test.build.data", "/tmp")
      + "/" + "test-job-output-committer").toString();
  private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
  private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
  private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
  private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
  private static int outDirs = 0;
  private FileSystem fs;
  private Configuration conf = null;

  @BeforeEach
  public void setUp() throws Exception {
    super.setUp();
    conf = createJobConf();
    fs = getFileSystem();
  }

  @AfterEach
  public void tearDown() throws Exception {
    fs.delete(new Path(TEST_ROOT_DIR), true);
    super.tearDown();
  }

  /** 
   * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
   * making a _failed/_killed in the output folder
   */
  static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
    public CommitterWithCustomDeprecatedCleanup(Path outputPath,
        TaskAttemptContext context) throws IOException {
      super(outputPath, context);
    }

    @Override
    public void cleanupJob(JobContext context) throws IOException {
      System.err.println("---- HERE ----");
      Path outputPath = FileOutputFormat.getOutputPath(context);
      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
      fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
    }
  }
  
  /**
   * Committer with abort making a _failed/_killed in the output folder
   */
  static class CommitterWithCustomAbort extends FileOutputCommitter {
    public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext context)
        throws IOException {
      super(outputPath, context);
    }

    @Override
    public void abortJob(JobContext context, JobStatus.State state)
        throws IOException {
      Path outputPath = FileOutputFormat.getOutputPath(context);
      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
      String fileName = 
        (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
          : ABORT_KILLED_FILE_NAME;
      fs.create(new Path(outputPath, fileName)).close();
    }
  }

  private Path getNewOutputDir() {
    return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
  }

  static class MyOutputFormatWithCustomAbort<K, V> 
  extends TextOutputFormat<K, V> {
    private OutputCommitter committer = null;

    public synchronized OutputCommitter getOutputCommitter(
        TaskAttemptContext context) throws IOException {
      if (committer == null) {
        Path output = getOutputPath(context);
        committer = new CommitterWithCustomAbort(output, context);
      }
      return committer;
    }
  }

  static class MyOutputFormatWithCustomCleanup<K, V> 
  extends TextOutputFormat<K, V> {
    private OutputCommitter committer = null;

    public synchronized OutputCommitter getOutputCommitter(
        TaskAttemptContext context) throws IOException {
      if (committer == null) {
        Path output = getOutputPath(context);
        committer = new CommitterWithCustomDeprecatedCleanup(output, context);
      }
      return committer;
    }
  }

  // run a job with 1 map and let it run to completion
  private void testSuccessfulJob(String filename,
      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
    Path outDir = getNewOutputDir();
    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
    job.setOutputFormatClass(output);

    assertTrue(job.waitForCompletion(true), "Job failed!");

    Path testFile = new Path(outDir, filename);
    assertTrue(fs.exists(testFile), "Done file missing for job " + job.getJobID());

    // check if the files from the missing set exists
    for (String ex : exclude) {
      Path file = new Path(outDir, ex);
      assertFalse(fs.exists(file),
          "File " + file + " should not be present for successful job "
          + job.getJobID());
    }
  }

  // run a job for which all the attempts simply fail.
  private void testFailedJob(String fileName,
      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
    Path outDir = getNewOutputDir();
    Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
    job.setOutputFormatClass(output);

    assertFalse(job.waitForCompletion(true), "Job did not fail!");

    if (fileName != null) {
      Path testFile = new Path(outDir, fileName);
      assertTrue(fs.exists(testFile),
          "File " + testFile + " missing for failed job " + job.getJobID());
    }

    // check if the files from the missing set exists
    for (String ex : exclude) {
      Path file = new Path(outDir, ex);
      assertFalse(fs.exists(file), "File " + file + " should not be present for failed job "
          + job.getJobID());
    }
  }

  // run a job which gets stuck in mapper and kill it.
  private void testKilledJob(String fileName,
      Class<? extends OutputFormat> output, String[] exclude) throws Exception {
    Path outDir = getNewOutputDir();
    Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
    job.setOutputFormatClass(output);

    job.submit();

    // wait for the setup to be completed
    while (job.setupProgress() != 1.0f) {
      UtilsForTests.waitFor(100);
    }

    job.killJob(); // kill the job

    assertFalse(job.waitForCompletion(true), "Job did not get kill");

    if (fileName != null) {
      Path testFile = new Path(outDir, fileName);
      assertTrue(fs.exists(testFile),
          "File " + testFile + " missing for job " + job.getJobID());
    }

    // check if the files from the missing set exists
    for (String ex : exclude) {
      Path file = new Path(outDir, ex);
      assertFalse(fs.exists(file), "File " + file + " should not be present for killed job "
          + job.getJobID());
    }
  }

  /**
   * Test default cleanup/abort behavior
   * 
   * @throws Exception
   */
  @Test
  public void testDefaultCleanupAndAbort() throws Exception {
    // check with a successful job
    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
                      TextOutputFormat.class, new String[] {});

    // check with a failed job
    testFailedJob(null, TextOutputFormat.class,
                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });

    // check default abort job kill
    testKilledJob(null, TextOutputFormat.class,
                  new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
  }

  /**
   * Test if a failed job with custom committer runs the abort code.
   * 
   * @throws Exception
   */
  @Test
  public void testCustomAbort() throws Exception {
    // check with a successful job
    testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
                      MyOutputFormatWithCustomAbort.class, 
                      new String[] {ABORT_FAILED_FILE_NAME,
                                    ABORT_KILLED_FILE_NAME});

    // check with a failed job
    testFailedJob(ABORT_FAILED_FILE_NAME, 
                  MyOutputFormatWithCustomAbort.class, 
                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
                                ABORT_KILLED_FILE_NAME});

    // check with a killed job
    testKilledJob(ABORT_KILLED_FILE_NAME, 
                  MyOutputFormatWithCustomAbort.class, 
                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, 
                                ABORT_FAILED_FILE_NAME});
  }

  /**
   * Test if a failed job with custom committer runs the deprecated
   * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api 
   * compatibility testing.
   * @throws Exception 
   */
  @Test
  public void testCustomCleanup() throws Exception {
    // check with a successful job
    testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, 
                      MyOutputFormatWithCustomCleanup.class, 
                      new String[] {});

    // check with a failed job
    testFailedJob(CUSTOM_CLEANUP_FILE_NAME, 
                  MyOutputFormatWithCustomCleanup.class, 
                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});

    // check with a killed job
    testKilledJob(CUSTOM_CLEANUP_FILE_NAME, 
                  MyOutputFormatWithCustomCleanup.class, 
                  new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
  }
}