TestMapTask.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public class TestMapTask {
  private static File testRootDir = new File(
      System.getProperty("test.build.data",
          System.getProperty("java.io.tmpdir", "/tmp")),
      TestMapTask.class.getName());

  @Before
  public void setup() throws Exception {
    if(!testRootDir.exists()) {
      testRootDir.mkdirs();
    }
  }

  @After
  public void cleanup() throws Exception {
    FileUtil.fullyDelete(testRootDir);
  }

  @Rule
  public ExpectedException exception = ExpectedException.none();

  // Verify output files for shuffle have group read permission even when
  // the configured umask normally would prevent it.
  @Test
  public void testShufflePermissions() throws Exception {
    JobConf conf = new JobConf();
    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
    conf.set(MRConfig.LOCAL_DIR, testRootDir.getAbsolutePath());
    MapOutputFile mof = new MROutputFiles();
    mof.setConf(conf);
    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
    MapTask mockTask = mock(MapTask.class);
    doReturn(mof).when(mockTask).getMapOutputFile();
    doReturn(attemptId).when(mockTask).getTaskID();
    doReturn(new Progress()).when(mockTask).getSortPhase();
    TaskReporter mockReporter = mock(TaskReporter.class);
    doReturn(new Counter()).when(mockReporter).getCounter(
        any(TaskCounter.class));
    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask,
        conf, mockReporter);
    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
    mob.init(ctx);
    mob.flush();
    mob.close();
    Path outputFile = mof.getOutputFile();
    FileSystem lfs = FileSystem.getLocal(conf);
    FsPermission perms = lfs.getFileStatus(outputFile).getPermission();
    Assert.assertEquals("Incorrect output file perms",
        (short)0640, perms.toShort());
    Path indexFile = mof.getOutputIndexFile();
    perms = lfs.getFileStatus(indexFile).getPermission();
    Assert.assertEquals("Incorrect index file perms",
        (short)0640, perms.toShort());
  }

  @Test
  public void testSpillFilesCountLimitInvalidValue() throws Exception {
    JobConf conf = new JobConf();
    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
    conf.set(MRConfig.LOCAL_DIR, testRootDir.getAbsolutePath());
    conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2);
    MapOutputFile mof = new MROutputFiles();
    mof.setConf(conf);
    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
    MapTask mockTask = mock(MapTask.class);
    doReturn(mof).when(mockTask).getMapOutputFile();
    doReturn(attemptId).when(mockTask).getTaskID();
    doReturn(new Progress()).when(mockTask).getSortPhase();
    TaskReporter mockReporter = mock(TaskReporter.class);
    doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();

    exception.expect(IOException.class);
    exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " +
        "current value: -2");

    mob.init(ctx);
    mob.close();
  }

  @Test
  public void testSpillFilesCountBreach() throws Exception {
    JobConf conf = new JobConf();
    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
    conf.set(MRConfig.LOCAL_DIR, testRootDir.getAbsolutePath());
    conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2);
    MapOutputFile mof = new MROutputFiles();
    mof.setConf(conf);
    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
    MapTask mockTask = mock(MapTask.class);
    doReturn(mof).when(mockTask).getMapOutputFile();
    doReturn(attemptId).when(mockTask).getTaskID();
    doReturn(new Progress()).when(mockTask).getSortPhase();
    TaskReporter mockReporter = mock(TaskReporter.class);
    doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
    mob.numSpills = 2;
    mob.init(ctx);

    Method method = mob.getClass().getDeclaredMethod("incrementNumSpills");
    method.setAccessible(true);
    boolean gotExceptionWithMessage = false;
    try {
      method.invoke(mob);
    } catch(InvocationTargetException e) {
      Throwable targetException = e.getTargetException();
      if (targetException != null) {
        String errorMessage = targetException.getMessage();
        if (errorMessage != null) {
          if(errorMessage.equals("Too many spill files got created, control it with " +
              "mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) {
            gotExceptionWithMessage = true;
          }
        }
      }
    }

    mob.close();

    Assert.assertTrue(gotExceptionWithMessage);
  }
}