TestUnnecessaryBlockingOnHistoryFileInfo.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.v2.hs;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

import static org.mockito.Mockito.mock;

/**
 * The test in this class is created specifically to address the issue in
 * MAPREDUCE-6684. In cases where there are two threads trying to load different
 * jobs through job history file manager, one thread could be blocked by the
 * other that is loading a huge job file, which is undesirable.
 *
 */
public class TestUnnecessaryBlockingOnHistoryFileInfo {
  /**
   * The intermediate done directory that JHS scans for completed jobs.
   */
  private final static File INTERMEDIATE_DIR = new File("target",
      TestUnnecessaryBlockingOnHistoryFileInfo.class.getName() +
          "/intermediate");
  /**
   * A test user directory under intermediate done directory.
   */
  private final static File USER_DIR = new File(INTERMEDIATE_DIR, "test");

  @BeforeAll
  public static void setUp() throws IOException {
    if(USER_DIR.exists()) {
      FileUtils.cleanDirectory(USER_DIR);
    }
    USER_DIR.mkdirs();
  }

  @AfterAll
  public static void cleanUp() throws IOException {
    FileUtils.deleteDirectory(INTERMEDIATE_DIR);
  }

  /**
   * This create a test case in which two threads are trying to load two
   * different jobs of the same user under the intermediate directory.
   * One thread should not be blocked by the other thread that is loading
   * a huge job files (This is simulated by hanging up parsing the job files
   * forever). The test will fail by triggering the timeout if one thread is
   * blocked by the other while the other thread is holding the lock on its
   * associated job files and hanging up parsing the files.
   */
  @Test
  @Timeout(value = 600)
  public void testTwoThreadsQueryingDifferentJobOfSameUser()
      throws InterruptedException, IOException {
    final Configuration config = new Configuration();
    config.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
        INTERMEDIATE_DIR.getPath());
    config.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, Long.MAX_VALUE);

    final JobId job1 = createJobId(0);
    final JobId job2 = createJobId(1);
    final HistoryFileManagerUnderContention historyFileManager =
        createHistoryFileManager(config, job1, job2);

    Thread webRequest1 = null;
    Thread webRequest2 = null;
    try {
      /**
       * create a dummy .jhist file for job1, and try to load/parse the job
       * files in one child thread.
       */
      createJhistFile(job1);
      webRequest1 = new Thread(
          new Runnable() {
            @Override
            public void run() {
              try {
                HistoryFileManager.HistoryFileInfo historyFileInfo =
                    historyFileManager.getFileInfo(job1);
                historyFileInfo.loadJob();
              } catch (IOException e) {
                e.printStackTrace();
              }
            }
          }
      );
      webRequest1.start();
      historyFileManager.waitUntilIntermediateDirIsScanned(job1);

      /**
       * At this point, thread webRequest1 has finished scanning the
       * intermediate directory and is hanging up parsing the job files while
       * it's holding the lock on the associated HistoryFileInfo object.
       */

      /**
       * create a dummy .jhist file for job2 and try to load/parse the job files
       * in the other child thread. Because job files are not moved from the
       * intermediate directory to the done directory, thread webRequest2
       * will also see the job history files for job1.
       */
      createJhistFile(job2);
      webRequest2 = new Thread(
          new Runnable() {
            @Override
            public void run() {
              try {
                HistoryFileManager.HistoryFileInfo historyFileInfo =
                    historyFileManager.getFileInfo(job2);
                historyFileInfo.loadJob();
              } catch (IOException e) {
                e.printStackTrace();
              }
            }
          }
      );
      webRequest2.start();
      historyFileManager.waitUntilIntermediateDirIsScanned(job2);

      /**
       * If execution had gotten to this point, then thread webRequest2 would
       * not have tried to acquire the lock of the HistoryFileInfo object
       * associated job1, which is permanently held by thread webRequest1 that
       * is hanging up parsing the job history files, so it was able to proceed
       * with parsing job history files of job2.
       */
      Assertions.assertTrue(webRequest2.getState() != Thread.State.BLOCKED,
          "Thread 2 is blocked while it is trying to " +
          "load job2 by Thread 1 which is loading job1.");
    } finally {
      if(webRequest1 != null) {
        webRequest1.interrupt();
      }
      if(webRequest2 != null) {
        webRequest2.interrupt();
      }
    }
  }

  /**
   * Create, initialize and start an instance of HistoryFileManager.
   * @param config the configuration to initialize the HistoryFileManager
   *               instance.
   * @param jobIds the set of jobs expected to be loaded by HistoryFileManager.
   */
  private HistoryFileManagerUnderContention createHistoryFileManager(
      Configuration config, JobId... jobIds) {
    HistoryFileManagerUnderContention historyFileManager =
        new HistoryFileManagerUnderContention(jobIds);
    historyFileManager.init(config);
    historyFileManager.start();
    return historyFileManager;
  }

  /**
   * Create, initialize and start an instance of CacheHistoryStorage.
   * @param config the config to initialize the storage
   * @param historyFileManager the HistoryFileManager to initializae the cache
   */
  private static CachedHistoryStorage createHistoryStorage(
      Configuration config, HistoryFileManager historyFileManager) {
    CachedHistoryStorage historyStorage = new CachedHistoryStorage();
    historyStorage.setHistoryFileManager(historyFileManager);
    historyStorage.init(config);
    historyStorage.start();
    return historyStorage;
  }

  private static JobId createJobId(int id) {
    JobId jobId = new JobIdPBImpl();
    jobId.setId(id);
    jobId.setAppId(ApplicationIdPBImpl.newInstance(0, id));
    return jobId;
  }

  /**
   * Create a dummy .jhist file under the intermediate directory for given job.
   * @param jobId the id of the given job
   * @return true if file is created successfully, false otherwise
   */
  private static boolean createJhistFile(JobId jobId) throws IOException {
    StringBuilder fileName = new StringBuilder(jobId.toString());
    long finishTime = System.currentTimeMillis();
    fileName.append("-").append(finishTime - 1000)
        .append("-").append("test")
        .append("-").append(jobId.getId())
        .append("-").append(finishTime)
        .append(".jhist");
    File jhistFile = new File(USER_DIR, fileName.toString());
    return jhistFile.createNewFile();
  }

  /**
   * A test implementation of HistoryFileManager that does not move files
   * from intermediate directory to done directory and hangs up parsing
   * job history files.
   */
  class HistoryFileManagerUnderContention extends HistoryFileManager {
    /**
     * A map of job to a signal that indicates whether the intermediate
     * directory is done being scanned before the job files are parsed.
     */
    private Map<JobId, CountDownLatch> scanningDoneSignals = new HashMap<>();

    /**
     * A HistoryFileManager that expects to load given jobs and hangs up
     * parsing the job files. It perform no moving of files from the
     * intermediate directory to done directory.
     * @param jobId the set of jobs expected to load and parse
     */
    public HistoryFileManagerUnderContention(JobId... jobId) {
      for(JobId job: jobId) {
        scanningDoneSignals.put(job, new CountDownLatch(1));
      }
    }

    /**
     * Wait until scanning of the intermediate directory finishes and load
     * of the given job is started.
     */
    public void waitUntilIntermediateDirIsScanned(JobId jobId)
        throws InterruptedException {
      if(scanningDoneSignals.containsKey(jobId)) {
        scanningDoneSignals.get(jobId).await();
      }
    }

    /**
     * Create a HistoryFileInfo instance that hangs on parsing job files.
     */
    @Override
    protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo(
        Path historyFile, Path confFile, Path summaryFile,
        JobIndexInfo jobIndexInfo, boolean isInDone) {
      return new HistoryFileInfo(historyFile, confFile, summaryFile,
          jobIndexInfo, isInDone,
          scanningDoneSignals.get(jobIndexInfo .getJobId()));
    }

    /**
     * Create a dummy ThreadPoolExecutor that does not execute submitted tasks.
     */
    @Override
    protected ThreadPoolExecutor createMoveToDoneThreadPool(
        int numMoveThreads) {
      return mock(ThreadPoolExecutor.class);
    }

    /**
     * A HistoryFileInfo implementation that takes forever to parse the
     * associated job files. This mimics the behavior of parsing huge job files.
     */
    class HistoryFileInfo extends HistoryFileManager.HistoryFileInfo {
      /**
       * A signal that indicates scanning of the intermediate directory is done
       * as HistoryFileManager is in the process of loading the HistoryFileInfo
       * instance.
       */
      private final CountDownLatch scanningDoneSignal;

      HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
          JobIndexInfo jobIndexInfo, boolean isInDone,
          CountDownLatch scanningDoneSignal) {
        super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
        this.scanningDoneSignal = scanningDoneSignal;
      }

      /**
       * An test implementation that takes forever to load a job in order to
       * mimic what happens when job files of large size are parsed in JHS.
       * Before loading, we signal that scanning of the intermediate directory
       * is finished.
       */
      @Override
      public synchronized Job loadJob() throws IOException {
        if(scanningDoneSignal != null) {
          scanningDoneSignal.countDown();
        }
        while(!Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
          }
        }
        return null;
      }
    }
  }
}