TestSpeculativeExecOnCluster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.v2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test speculation on Mini Cluster.
 */
@Ignore
@RunWith(Parameterized.class)
public class TestSpeculativeExecOnCluster {
  private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeExecOnCluster.class);

  private static final int NODE_MANAGERS_COUNT = 2;
  private static final boolean ENABLE_SPECULATIVE_MAP = true;
  private static final boolean ENABLE_SPECULATIVE_REDUCE = true;

  private static final int NUM_MAP_DEFAULT = 8 * NODE_MANAGERS_COUNT;
  private static final int NUM_REDUCE_DEFAULT = NUM_MAP_DEFAULT / 2;
  private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
  private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
  private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
  private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;

  private static final String MAP_SLEEP_COUNT =
      "mapreduce.sleepjob.map.sleep.count";
  private static final String REDUCE_SLEEP_COUNT =
      "mapreduce.sleepjob.reduce.sleep.count";
  private static final String MAP_SLEEP_TIME =
      "mapreduce.sleepjob.map.sleep.time";
  private static final String REDUCE_SLEEP_TIME =
      "mapreduce.sleepjob.reduce.sleep.time";
  private static final String MAP_SLEEP_CALCULATOR_TYPE =
      "mapreduce.sleepjob.map.sleep.time.calculator";
  private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";

  private static Map<String, SleepDurationCalculator> mapSleepTypeMapper;


  private static FileSystem localFs;

  static {
    mapSleepTypeMapper = new HashMap<>();
    mapSleepTypeMapper.put("normal_run", new SleepDurationCalcImpl());
    mapSleepTypeMapper.put("stalled_run",
        new StalledSleepDurationCalcImpl());
    mapSleepTypeMapper.put("slowing_run",
        new SlowingSleepDurationCalcImpl());
    mapSleepTypeMapper.put("dynamic_slowing_run",
        new DynamicSleepDurationCalcImpl());
    mapSleepTypeMapper.put("step_stalled_run",
        new StepStalledSleepDurationCalcImpl());
    try {
      localFs = FileSystem.getLocal(new Configuration());
    } catch (IOException io) {
      throw new RuntimeException("problem getting local fs", io);
    }
  }

  private static final Path TEST_ROOT_DIR =
      new Path("target",
          TestSpeculativeExecOnCluster.class.getName() + "-tmpDir")
          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
  private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
  private static final Path TEST_OUT_DIR =
      new Path(TEST_ROOT_DIR, "test.out.dir");

  private MiniMRYarnCluster mrCluster;

  private int myNumMapper;
  private int myNumReduce;
  private int myMapSleepTime;
  private int myReduceSleepTime;
  private int myMapSleepCount;
  private int myReduceSleepCount;
  private String chosenSleepCalc;
  private Class<?> estimatorClass;


  /**
   * The test cases take a long time to run all the estimators against all the
   * cases. We skip the legacy estimators to reduce the execution time.
   */
  private List<String> ignoredTests;


  @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
  public static Collection<Object[]> getTestParameters() {
    List<String> ignoredTests = Arrays.asList(new String[] {
        "stalled_run",
        "slowing_run",
        "step_stalled_run"
    });
    return Arrays.asList(new Object[][] {
        {SimpleExponentialTaskRuntimeEstimator.class, ignoredTests,
            NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT},
        {LegacyTaskRuntimeEstimator.class, ignoredTests,
            NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT}
    });
  }

  public TestSpeculativeExecOnCluster(
      Class<? extends TaskRuntimeEstimator> estimatorKlass,
      List<String> testToIgnore,
      Integer numMapper,
      Integer numReduce) {
    this.ignoredTests = testToIgnore;
    this.estimatorClass = estimatorKlass;
    this.myNumMapper = numMapper;
    this.myNumReduce = numReduce;

  }

  @Before
  public void setup() throws IOException {

    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
          + " not found. Not running test.");
      return;
    }

    if (mrCluster == null) {
      mrCluster = new MiniMRYarnCluster(
          TestSpeculativeExecution.class.getName(), NODE_MANAGERS_COUNT);
      Configuration conf = new Configuration();
      mrCluster.init(conf);
      mrCluster.start();

    }

    // workaround the absent public distcache.
    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
    localFs.setPermission(APP_JAR, new FsPermission("700"));
    myMapSleepTime = MAP_SLEEP_TIME_DEFAULT;
    myReduceSleepTime = REDUCE_SLEEP_TIME_DEFAULT;
    myMapSleepCount = MAP_SLEEP_COUNT_DEFAULT;
    myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT;
    chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
  }

  @After
  public void tearDown() {
    if (mrCluster != null) {
      mrCluster.stop();
      mrCluster = null;
    }
  }

  /**
   * Overrides default behavior of Partitioner for testing.
   */
  public static class SpeculativeSleepJobPartitioner extends
      Partitioner<IntWritable, NullWritable> {
    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
      return k.get() % numPartitions;
    }
  }

  /**
   * Overrides default behavior of InputSplit for testing.
   */
  public static class EmptySplit extends InputSplit implements Writable {
    public void write(DataOutput out) throws IOException { }
    public void readFields(DataInput in) throws IOException { }
    public long getLength() {
      return 0L;
    }
    public String[] getLocations() {
      return new String[0];
    }
  }

  /**
   * Input format that sleeps after updating progress.
   */
  public static class SpeculativeSleepInputFormat
      extends InputFormat<IntWritable, IntWritable> {

    public List<InputSplit> getSplits(JobContext jobContext) {
      List<InputSplit> ret = new ArrayList<InputSplit>();
      int numSplits = jobContext.getConfiguration().
          getInt(MRJobConfig.NUM_MAPS, 1);
      for (int i = 0; i < numSplits; ++i) {
        ret.add(new EmptySplit());
      }
      return ret;
    }

    public RecordReader<IntWritable, IntWritable> createRecordReader(
        InputSplit ignored, TaskAttemptContext taskContext)
        throws IOException {
      Configuration conf = taskContext.getConfiguration();
      final int count = conf.getInt(MAP_SLEEP_COUNT, MAP_SLEEP_COUNT_DEFAULT);
      if (count < 0) {
        throw new IOException("Invalid map count: " + count);
      }
      final int redcount = conf.getInt(REDUCE_SLEEP_COUNT,
          REDUCE_SLEEP_COUNT_DEFAULT);
      if (redcount < 0) {
        throw new IOException("Invalid reduce count: " + redcount);
      }
      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());

      return new RecordReader<IntWritable, IntWritable>() {
        private int records = 0;
        private int emitCount = 0;
        private IntWritable key = null;
        private IntWritable value = null;
        public void initialize(InputSplit split, TaskAttemptContext context) {
        }

        public boolean nextKeyValue()
            throws IOException {
          if (count == 0) {
            return false;
          }
          key = new IntWritable();
          key.set(emitCount);
          int emit = emitPerMapTask / count;
          if ((emitPerMapTask) % count > records) {
            ++emit;
          }
          emitCount += emit;
          value = new IntWritable();
          value.set(emit);
          return records++ < count;
        }
        public IntWritable getCurrentKey() {
          return key;
        }
        public IntWritable getCurrentValue() {
          return value;
        }
        public void close() throws IOException { }
        public float getProgress() throws IOException {
          return count == 0 ? 100 : records / ((float)count);
        }
      };
    }
  }

  /**
   * Interface used to simulate different progress rates of the tasks.
   */
  public interface SleepDurationCalculator {
    long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount,
        long defaultSleepDuration);
  }

  /**
   * All tasks have the same progress.
   */
  public static class SleepDurationCalcImpl implements SleepDurationCalculator {

    private double threshold = 1.0;
    private double slowFactor = 1.0;

    SleepDurationCalcImpl() {

    }

    public long calcSleepDuration(TaskAttemptID taId, int currCount,
        int totalCount, long defaultSleepDuration) {
      if (threshold <= ((double) currCount) / totalCount) {
        return (long) (slowFactor * defaultSleepDuration);
      }
      return defaultSleepDuration;
    }
  }

  /**
   * The first attempt of task_0 slows down by a small factor that should not
   * trigger a speculation. An speculated attempt should never beat the
   * original task.
   * A conservative estimator/speculator will speculate another attempt
   * because of the slower progress.
   */
  public static class SlowingSleepDurationCalcImpl implements
      SleepDurationCalculator {

    private double threshold = 0.4;
    private double slowFactor = 1.2;

    SlowingSleepDurationCalcImpl() {

    }

    public long calcSleepDuration(TaskAttemptID taId, int currCount,
        int totalCount, long defaultSleepDuration) {
      if ((taId.getTaskType() == TaskType.MAP)
          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
        if (threshold <= ((double) currCount) / totalCount) {
          return (long) (slowFactor * defaultSleepDuration);
        }
      }
      return defaultSleepDuration;
    }
  }

  /**
   * The progress of the first Mapper task is stalled by 100 times the other
   * tasks.
   * The speculated attempt should be succeed if the estimator detects
   * the slow down on time.
   */
  public static class StalledSleepDurationCalcImpl implements
      SleepDurationCalculator {

    StalledSleepDurationCalcImpl() {

    }

    public long calcSleepDuration(TaskAttemptID taId, int currCount,
        int totalCount, long defaultSleepDuration) {
      if ((taId.getTaskType() == TaskType.MAP)
          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
        return 1000 * defaultSleepDuration;
      }
      return defaultSleepDuration;
    }
  }


  /**
   * Emulates the behavior with a step change in the progress.
   */
  public static class StepStalledSleepDurationCalcImpl implements
      SleepDurationCalculator {

    private double threshold = 0.4;
    private double slowFactor = 10000;

    StepStalledSleepDurationCalcImpl() {

    }

    public long calcSleepDuration(TaskAttemptID taId, int currCount,
        int totalCount, long defaultSleepDuration) {
      if ((taId.getTaskType() == TaskType.MAP)
          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
        if (threshold <= ((double) currCount) / totalCount) {
          return (long) (slowFactor * defaultSleepDuration);
        }
      }
      return defaultSleepDuration;
    }
  }

  /**
   * Dynamically slows down the progress of the first Mapper task.
   * The speculated attempt should be succeed if the estimator detects
   * the slow down on time.
   */
  public static class DynamicSleepDurationCalcImpl implements
      SleepDurationCalculator {

    private double[] thresholds;
    private double[] slowFactors;

    DynamicSleepDurationCalcImpl() {
      thresholds = new double[] {
          0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9
      };
      slowFactors = new double[] {
          2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0
      };
    }

    public long calcSleepDuration(TaskAttemptID taId, int currCount,
        int totalCount,
        long defaultSleepDuration) {
      if ((taId.getTaskType() == TaskType.MAP)
          && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
        double currProgress = ((double) currCount) / totalCount;
        double slowFactor = 1.0;
        for (int i = 0; i < thresholds.length; i++) {
          if (thresholds[i] >= currProgress) {
            break;
          }
          slowFactor = slowFactors[i];
        }
        return (long) (slowFactor * defaultSleepDuration);
      }
      return defaultSleepDuration;
    }
  }

  /**
   * Dummy class for testing Speculation. Sleeps for a defined period
   * of time in mapper. Generates fake input for map / reduce
   * jobs. Note that generated number of input pairs is in the order
   * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
   * some disk space.
   * The sleep duration for a given task is going to slowDown to evaluate
   * the estimator
   */
  public static class SpeculativeSleepMapper
      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
    private long mapSleepDuration = MAP_SLEEP_TIME_DEFAULT;
    private int mapSleepCount = 1;
    private int count = 0;
    private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();

    protected void setup(Context context)
        throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();
      this.mapSleepCount =
          conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
      this.mapSleepDuration = mapSleepCount == 0 ? 0 :
          conf.getLong(MAP_SLEEP_TIME, MAP_SLEEP_TIME_DEFAULT) / mapSleepCount;
      this.sleepCalc =
          mapSleepTypeMapper.get(conf.get(MAP_SLEEP_CALCULATOR_TYPE,
              MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));

    }

    public void map(IntWritable key, IntWritable value, Context context)
        throws IOException, InterruptedException {
      //it is expected that every map processes mapSleepCount number of records.
      try {
        context.setStatus("Sleeping... (" +
            (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
        long sleepTime = sleepCalc.calcSleepDuration(context.getTaskAttemptID(),
            count, mapSleepCount,
            mapSleepDuration);
        Thread.sleep(sleepTime);
      } catch (InterruptedException ex) {
        throw (IOException) new IOException(
            "Interrupted while sleeping").initCause(ex);
      }
      ++count;
      // output reduceSleepCount * numReduce number of random values, so that
      // each reducer will get reduceSleepCount number of keys.
      int k = key.get();
      for (int i = 0; i < value.get(); ++i) {
        context.write(new IntWritable(k + i), NullWritable.get());
      }
    }
  }

  /**
   * Implementation of the reducer task for testing.
   */
  public static class SpeculativeSleepReducer
      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {

    private long reduceSleepDuration = REDUCE_SLEEP_TIME_DEFAULT;
    private int reduceSleepCount = 1;
    private int count = 0;

    protected void setup(Context context)
        throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();
      this.reduceSleepCount =
          conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
      this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
          conf.getLong(REDUCE_SLEEP_TIME, REDUCE_SLEEP_TIME_DEFAULT)
              / reduceSleepCount;
    }

    public void reduce(IntWritable key, Iterable<NullWritable> values,
        Context context)
        throws IOException {
      try {
        context.setStatus("Sleeping... (" +
            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
        Thread.sleep(reduceSleepDuration);
      } catch (InterruptedException ex) {
        throw (IOException) new IOException(
            "Interrupted while sleeping").initCause(ex);
      }
      count++;
    }
  }

  /**
   * A class used to map the estimatopr implementation to the expected
   * test results.
   */
  class EstimatorMetricsPair {

    private Class<?> estimatorClass;
    private int expectedMapTasks;
    private int expectedReduceTasks;
    private boolean speculativeEstimator;

    EstimatorMetricsPair(Class<?> estimatorClass, int mapTasks, int reduceTasks,
        boolean isToSpeculate) {
      this.estimatorClass = estimatorClass;
      this.expectedMapTasks = mapTasks;
      this.expectedReduceTasks = reduceTasks;
      this.speculativeEstimator = isToSpeculate;
    }

    boolean didSpeculate(Counters counters) {
      long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
          .getValue();
      long launchedReduce = counters
          .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
          .getValue();
      boolean isSpeculated =
          (launchedMaps > expectedMapTasks
              || launchedReduce > expectedReduceTasks);
      return isSpeculated;
    }

    String getErrorMessage(Counters counters) {
      String msg = "Unexpected tasks running estimator "
          + estimatorClass.getName() + "\n\t";
      long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
          .getValue();
      long launchedReduce = counters
          .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
          .getValue();
      if (speculativeEstimator) {
        if (launchedMaps < expectedMapTasks) {
          msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
        }
        if (launchedReduce < expectedReduceTasks) {
          msg += ", reduces " + launchedReduce + ", expected: "
              + expectedReduceTasks;
        }
      } else {
        if (launchedMaps > expectedMapTasks) {
          msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
        }
        if (launchedReduce > expectedReduceTasks) {
          msg += ", reduces " + launchedReduce + ", expected: "
              + expectedReduceTasks;
        }
      }
      return msg;
    }
  }

  @Test
  public void testExecDynamicSlowingSpeculative() throws Exception {
    /*------------------------------------------------------------------
     * Test that Map/Red speculates because:
     * 1- all tasks have same progress rate except for task_0
     * 2- task_0 slows down by dynamic increasing factor
     * 3- A good estimator should readjust the estimation and the speculator
     *    launches a new task.
     *
     * Expected:
     * A- SimpleExponentialTaskRuntimeEstimator: speculates a successful
     *    attempt to beat the slowing task_0
     * B- LegacyTaskRuntimeEstimator: speculates an attempt
     * C- ExponentiallySmoothedTaskRuntimeEstimator: Fails to detect the slow
     *    down and never speculates but it may speculate other tasks
     *    (mappers or reducers)
     * -----------------------------------------------------------------
     */
    chosenSleepCalc = "dynamic_slowing_run";

    if (ignoredTests.contains(chosenSleepCalc)) {
      return;
    }

    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(
            ExponentiallySmoothedTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true)
    };

    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
        continue;
      }
      LOG.info("+++ Dynamic Slow Progress testing against " + estimatorClass
          .getName() + " +++");
      Job job = runSpecTest();

      boolean succeeded = job.waitForCompletion(true);
      Assert.assertTrue(
          "Job expected to succeed with estimator " + estimatorClass.getName(),
          succeeded);
      Assert.assertEquals(
          "Job expected to succeed with estimator " + estimatorClass.getName(),
          JobStatus.State.SUCCEEDED, job.getJobState());
      Counters counters = job.getCounters();

      String errorMessage = specEstimator.getErrorMessage(counters);
      boolean didSpeculate = specEstimator.didSpeculate(counters);
      Assert.assertEquals(errorMessage, didSpeculate,
          specEstimator.speculativeEstimator);
      Assert
          .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
              0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
    }
  }


  @Test
  public void testExecSlowNonSpeculative() throws Exception {
    /*------------------------------------------------------------------
     * Test that Map/Red does not speculate because:
     * 1- all tasks have same progress rate except for task_0
     * 2- task_0 slows down by 0.5 after 50% of the workload
     * 3- A good estimator may adjust the estimation that the task will finish
     *    sooner than a new speculated task.
     *
     * Expected:
     * A- SimpleExponentialTaskRuntimeEstimator: does not speculate because
     *    the new attempt estimated end time is not going to be smaller than the
     *    original end time.
     * B- LegacyTaskRuntimeEstimator: speculates an attempt
     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates an attempt.
     * -----------------------------------------------------------------
     */
    chosenSleepCalc = "slowing_run";

    if (ignoredTests.contains(chosenSleepCalc)) {
      return;
    }

    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, false),
        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(
            ExponentiallySmoothedTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true)
    };

    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
        continue;
      }
      LOG.info("+++ Linear Slow Progress Non Speculative testing against "
          + estimatorClass.getName() + " +++");
      Job job = runSpecTest();

      boolean succeeded = job.waitForCompletion(true);
      Assert.assertTrue(
          "Job expected to succeed with estimator " + estimatorClass.getName(),
          succeeded);
      Assert.assertEquals(
          "Job expected to succeed with estimator " + estimatorClass.getName(),
          JobStatus.State.SUCCEEDED, job.getJobState());
      Counters counters = job.getCounters();

      String errorMessage = specEstimator.getErrorMessage(counters);
      boolean didSpeculate = specEstimator.didSpeculate(counters);
      Assert.assertEquals(errorMessage, didSpeculate,
          specEstimator.speculativeEstimator);
      Assert
          .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
              0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
    }
  }

  @Test
  public void testExecStepStalledSpeculative() throws Exception {
    /*------------------------------------------------------------------
     * Test that Map/Red speculates because:
     * 1- all tasks have same progress rate except for task_0
     * 2- task_0 has long sleep duration
     * 3- A good estimator may adjust the estimation that the task will finish
     *    sooner than a new speculated task.
     *
     * Expected:
     * A- SimpleExponentialTaskRuntimeEstimator: speculates
     * B- LegacyTaskRuntimeEstimator: speculates
     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
     * -----------------------------------------------------------------
     */
    chosenSleepCalc = "step_stalled_run";
    if (ignoredTests.contains(chosenSleepCalc)) {
      return;
    }
    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(
            ExponentiallySmoothedTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true)
    };

    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
        continue;
      }
      LOG.info("+++ Stalled Progress testing against "
          + estimatorClass.getName() + " +++");
      Job job = runSpecTest();

      boolean succeeded = job.waitForCompletion(true);
      Assert.assertTrue("Job expected to succeed with estimator "
          + estimatorClass.getName(), succeeded);
      Assert.assertEquals("Job expected to succeed with estimator "
              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
          job.getJobState());
      Counters counters = job.getCounters();

      String errorMessage = specEstimator.getErrorMessage(counters);
      boolean didSpeculate = specEstimator.didSpeculate(counters);
      Assert.assertEquals(errorMessage, didSpeculate,
          specEstimator.speculativeEstimator);
      Assert.assertEquals("Failed maps higher than 0 "
              + estimatorClass.getName(), 0,
          counters.findCounter(JobCounter.NUM_FAILED_MAPS)
              .getValue());
    }
  }

  @Test
  public void testExecStalledSpeculative() throws Exception {
    /*------------------------------------------------------------------
     * Test that Map/Red speculates because:
     * 1- all tasks have same progress rate except for task_0
     * 2- task_0 has long sleep duration
     * 3- A good estimator may adjust the estimation that the task will finish
     *    sooner than a new speculated task.
     *
     * Expected:
     * A- SimpleExponentialTaskRuntimeEstimator: speculates
     * B- LegacyTaskRuntimeEstimator: speculates
     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
     * -----------------------------------------------------------------
     */
    chosenSleepCalc = "stalled_run";

    if (ignoredTests.contains(chosenSleepCalc)) {
      return;
    }
    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(
            ExponentiallySmoothedTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true)
    };

    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
        continue;
      }
      LOG.info("+++ Stalled Progress testing against "
          + estimatorClass.getName() + " +++");
      Job job = runSpecTest();

      boolean succeeded = job.waitForCompletion(true);
      Assert.assertTrue("Job expected to succeed with estimator "
          + estimatorClass.getName(), succeeded);
      Assert.assertEquals("Job expected to succeed with estimator "
              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
          job.getJobState());
      Counters counters = job.getCounters();

      String errorMessage = specEstimator.getErrorMessage(counters);
      boolean didSpeculate = specEstimator.didSpeculate(counters);
      Assert.assertEquals(errorMessage, didSpeculate,
          specEstimator.speculativeEstimator);
      Assert.assertEquals("Failed maps higher than 0 "
              + estimatorClass.getName(), 0,
          counters.findCounter(JobCounter.NUM_FAILED_MAPS)
              .getValue());
    }
  }

  @Test
  public void testExecNonSpeculative() throws Exception {
    /*------------------------------------------------------------------
     * Test that Map/Red does not speculate because all tasks progress in the
     *    same rate.
     *
     * Expected:
     * A- SimpleExponentialTaskRuntimeEstimator: does not speculate
     * B- LegacyTaskRuntimeEstimator: speculates
     * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
     * -----------------------------------------------------------------
     */
    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
          + " not found. Not running test.");
      return;
    }

    if (ignoredTests.contains(chosenSleepCalc)) {
      return;
    }

    EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
        new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true),
        new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, false),
        new EstimatorMetricsPair(
            ExponentiallySmoothedTaskRuntimeEstimator.class,
            myNumMapper, myNumReduce, true)
    };

    for (EstimatorMetricsPair specEstimator : estimatorPairs) {
      if (!estimatorClass.equals(specEstimator.estimatorClass)) {
        continue;
      }
      LOG.info("+++ No Speculation testing against "
          + estimatorClass.getName() + " +++");
      Job job = runSpecTest();

      boolean succeeded = job.waitForCompletion(true);
      Assert.assertTrue("Job expected to succeed with estimator "
          + estimatorClass.getName(), succeeded);
      Assert.assertEquals("Job expected to succeed with estimator "
              + estimatorClass.getName(), JobStatus.State.SUCCEEDED,
          job.getJobState());
      Counters counters = job.getCounters();

      String errorMessage = specEstimator.getErrorMessage(counters);
      boolean didSpeculate = specEstimator.didSpeculate(counters);
      Assert.assertEquals(errorMessage, didSpeculate,
          specEstimator.speculativeEstimator);
    }
  }

  private Job runSpecTest()
      throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = mrCluster.getConfig();
    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, ENABLE_SPECULATIVE_MAP);
    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, ENABLE_SPECULATIVE_REDUCE);
    conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
        estimatorClass,
        TaskRuntimeEstimator.class);
    conf.setLong(MAP_SLEEP_TIME, myMapSleepTime);
    conf.setLong(REDUCE_SLEEP_TIME, myReduceSleepTime);
    conf.setInt(MAP_SLEEP_COUNT, myMapSleepCount);
    conf.setInt(REDUCE_SLEEP_COUNT, myReduceSleepCount);
    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
    conf.setInt(MRJobConfig.NUM_MAPS, myNumMapper);
    conf.set(MAP_SLEEP_CALCULATOR_TYPE, chosenSleepCalc);
    Job job = Job.getInstance(conf);
    job.setJarByClass(TestSpeculativeExecution.class);
    job.setMapperClass(SpeculativeSleepMapper.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setReducerClass(SpeculativeSleepReducer.class);
    job.setOutputFormatClass(NullOutputFormat.class);
    job.setInputFormatClass(SpeculativeSleepInputFormat.class);
    job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
    job.setNumReduceTasks(myNumReduce);
    FileInputFormat.addInputPath(job, new Path("ignored"));
    // Delete output directory if it exists.
    try {
      localFs.delete(TEST_OUT_DIR, true);
    } catch (IOException e) {
      // ignore
    }
    FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);

    // Creates the Job Configuration
    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
    job.setMaxMapAttempts(2);

    job.submit();

    return job;
  }
}