UnreliableManifestStoreOperations.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.util.JsonSerialization;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.OPERATION_TIMED_OUT;

/**
 * Wrap an existing {@link ManifestStoreOperations} implementation and fail on
 * specific paths.
 * This is for testing. It could be implemented via
 * Mockito 2 spy code but is not so that:
 * 1. It can be backported to Hadoop versions using Mockito 1.x.
 * 2. It can be extended to use in production.
 * 3. You can actually debug what's going on.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class UnreliableManifestStoreOperations extends ManifestStoreOperations {

  private static final Logger LOG = LoggerFactory.getLogger(
      UnreliableManifestStoreOperations.class);

  /**
   * The timeout message ABFS raises.
   */
  public static final String E_TIMEOUT
      = "Operation could not be completed within the specified time";

  /**
   * Text to use in simulated failure exceptions.
   */
  public static final String SIMULATED_FAILURE = "Simulated failure";

  /**
   * Default failure limit.
   * Set to a large enough value that most tests don't hit it.
   */
  private static final int DEFAULT_FAILURE_LIMIT = Integer.MAX_VALUE;

  /**
   * Underlying store operations to wrap.
   */
  private final ManifestStoreOperations wrappedOperations;

  /**
   * Paths of delete operations to fail.
   */
  private final Set<Path> deletePathsToFail = new HashSet<>();

  /**
   * Paths of delete operations to time out, as ABFS may.
   */
  private final Set<Path> deletePathsToTimeOut = new HashSet<>();

  /**
   * Paths of List operations to fail.
   */
  private final Set<Path> listToFail = new HashSet<>();

  /**
   * Paths of mkdirs operations to fail.
   */
  private final Set<Path> mkdirsToFail = new HashSet<>();

  /**
   * Paths which don't exist.
   */
  private final Set<Path> pathNotFound = new HashSet<>();

  /**
   * Source file whose rename/commit will fail.
   */
  private final Set<Path> renameSourceFilesToFail = new HashSet<>();

  /**
   * Dest dir into which all renames/commits will fail.
   * Subdirectories under this are not checked.
   */
  private final Set<Path> renameDestDirsToFail = new HashSet<>();

  /**
   * Source paths of rename operations to time out before the rename request is issued.
   */
  private final Set<Path> renamePathsToTimeoutBeforeRename = new HashSet<>();

  /**
   * Source paths of rename operations to time out after the rename request has succeeded.
   */
  private final Set<Path> renamePathsToTimeoutAfterRename = new HashSet<>();

  /**
   * Path of save() to fail.
   */
  private final Set<Path> saveToFail = new HashSet<>();

  /**
   * timeout sleep.
   */
  private int timeoutSleepTimeMillis;

  /**
   * Should rename thrown an exception or just return false.
   */
  private boolean renameToFailWithException = true;

  /**
   * How many failures before an operation is passed through.
   */
  private final AtomicInteger failureLimit = new AtomicInteger(DEFAULT_FAILURE_LIMIT);

  /**
   * Constructor.
   * @param wrappedOperations operations to wrap.
   */
  public UnreliableManifestStoreOperations(final ManifestStoreOperations wrappedOperations) {
    this.wrappedOperations = wrappedOperations;
  }

  /**
   * Reset everything.
   */
  public void reset() {
    deletePathsToFail.clear();
    deletePathsToTimeOut.clear();
    failureLimit.set(DEFAULT_FAILURE_LIMIT);
    pathNotFound.clear();
    renameSourceFilesToFail.clear();
    renameDestDirsToFail.clear();
    renamePathsToTimeoutBeforeRename.clear();
    renamePathsToTimeoutAfterRename.clear();
    saveToFail.clear();
    timeoutSleepTimeMillis = 0;
  }

  public int getTimeoutSleepTimeMillis() {
    return timeoutSleepTimeMillis;
  }

  public void setTimeoutSleepTimeMillis(final int timeoutSleepTimeMillis) {
    this.timeoutSleepTimeMillis = timeoutSleepTimeMillis;
  }

  public boolean getRenameToFailWithException() {
    return renameToFailWithException;
  }

  public void setRenameToFailWithException(
      final boolean renameToFailWithException) {
    this.renameToFailWithException = renameToFailWithException;
  }

  /**
   * Add a path to the list of delete paths to fail.
   * @param path path to add.
   */
  public void addDeletePathToFail(Path path) {
    deletePathsToFail.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of delete paths to time out.
   * @param path path to add.
   */
  public void addDeletePathToTimeOut(Path path) {
    deletePathsToTimeOut.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of paths where list will fail.
   * @param path path to add.
   */
  public void addListToFail(Path path) {
    listToFail.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of mkdir calls to fail.
   * @param path path to add.
   */
  public void addMkdirsToFail(Path path) {
    mkdirsToFail.add(requireNonNull(path));
  }

  /**
   * Add a path not found.
   * @param path path
   */
  public void addPathNotFound(Path path) {
    pathNotFound.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of rename source paths to fail.
   * @param path path to add.
   */
  public void addRenameSourceFilesToFail(Path path) {
    renameSourceFilesToFail.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of dest dirs to fail.
   * @param path path to add.
   */
  public void addRenameDestDirsFail(Path path) {
    renameDestDirsToFail.add(requireNonNull(path));
  }

  /**
   * Add a source path to timeout before the rename.
   * @param path path to add.
   */
  public void addTimeoutBeforeRename(Path path) {
    renamePathsToTimeoutBeforeRename.add(requireNonNull(path));
  }
  /**
   * Add a source path to timeout after the rename.
   * @param path path to add.
   */
  public void addTimeoutAfterRename(Path path) {
    renamePathsToTimeoutAfterRename.add(requireNonNull(path));
  }

  /**
   * Add a path to the list of paths where save will fail.
   * @param path path to add.
   */
  public void addSaveToFail(Path path) {
    saveToFail.add(requireNonNull(path));
  }

  /**
   * Set the failure limit.
   * @param limit limit
   */
  public void setFailureLimit(int limit) {
    failureLimit.set(limit);
  }

  /**
   * Raise an exception if the path is in the set of target paths
   * and the failure limit is not exceeded.
   * @param operation operation which failed.
   * @param path path to check
   * @param paths paths to probe for {@code path} being in.
   * @throws IOException simulated failure
   */
  private void maybeRaiseIOE(String operation, Path path, Set<Path> paths)
      throws IOException {
    if (paths.contains(path) && decrementAndCheckFailureLimit()) {
      // hand off to the inner check.
      maybeRaiseIOENoFailureLimitCheck(operation, path, paths);
    }
  }

  /**
   * Raise an exception if the path is in the set of target paths.
   * No checks on failure count are performed.
   * @param operation operation which failed.
   * @param path path to check
   * @param paths paths to probe for {@code path} being in.
   * @throws IOException simulated failure
   */
  private void maybeRaiseIOENoFailureLimitCheck(String operation, Path path, Set<Path> paths)
      throws IOException {
    if (paths.contains(path)) {
      LOG.info("Simulating failure of {} with {}", operation, path);
      throw new PathIOException(path.toString(),
          generatedErrorMessage(operation));
    }
  }

  /**
   * Given an operation, return the error message which is used for the simulated
   * {@link PathIOException}.
   * @param operation operation name
   * @return error text
   */
  public static String generatedErrorMessage(final String operation) {
    return SIMULATED_FAILURE + " of " + operation;
  }

  /**
   * Check if the failure limit is exceeded.
   * Call this after any other trigger checks, as it decrements the counter.
   *
   * @return true if the limit is not exceeded.
   */
  private boolean decrementAndCheckFailureLimit() {
    return failureLimit.decrementAndGet() > 0;
  }

  /**
   * Verify that a path is not on the file not found list.
   * @param path path
   * @throws FileNotFoundException if configured to fail.
   */
  private void verifyExists(Path path) throws FileNotFoundException {
    if (pathNotFound.contains(path) && decrementAndCheckFailureLimit()) {
      throw new FileNotFoundException(path.toString());
    }
  }

  /**
   * Time out if the path is in the list of timeout paths.
   * Will sleep first, to help simulate delays.
   * @param operation operation which failed.
   * @param path path to check
   * @param paths paths to probe for {@code path} being in.
   * @throws SocketTimeoutException simulated timeout
   * @throws InterruptedIOException if the sleep is interrupted.
   */
  private void maybeTimeout(String operation, Path path, Set<Path> paths)
      throws SocketTimeoutException, InterruptedIOException  {
    if (paths.contains(path) && decrementAndCheckFailureLimit()) {
      LOG.info("Simulating timeout of {} with {}", operation, path);
      try {
        if (timeoutSleepTimeMillis > 0) {
          Thread.sleep(timeoutSleepTimeMillis);
        }
      } catch (InterruptedException e) {
        throw new InterruptedIOException(e.toString());
      }
      throw new SocketTimeoutException(
          path.toString() + ": " + operation
              + " ErrorCode=" + OPERATION_TIMED_OUT
              + " ErrorMessage=" + E_TIMEOUT);
    }
  }

  @Override
  public FileStatus getFileStatus(final Path path) throws IOException {
    maybeTimeout("getFileStatus()", path, pathNotFound);
    verifyExists(path);
    return wrappedOperations.getFileStatus(path);
  }

  @Override
  public boolean delete(final Path path, final boolean recursive)
      throws IOException {
    String op = "delete";
    maybeTimeout(op, path, deletePathsToTimeOut);
    maybeRaiseIOE(op, path, deletePathsToFail);
    return wrappedOperations.delete(path, recursive);
  }

  @Override
  public boolean mkdirs(final Path path) throws IOException {
    maybeRaiseIOE("mkdirs", path, mkdirsToFail);
    return wrappedOperations.mkdirs(path);
  }

  @Override
  public boolean renameFile(final Path source, final Path dest)
      throws IOException {
    String op = "rename";
    maybeTimeout(op, source, renamePathsToTimeoutBeforeRename);
    if (renameToFailWithException) {
      maybeRaiseIOE(op, source, renameSourceFilesToFail);
      maybeRaiseIOE(op, dest.getParent(), renameDestDirsToFail);
    } else {
      // logic to determine whether rename should just return false.
      if ((renameSourceFilesToFail.contains(source)
          || renameDestDirsToFail.contains(dest.getParent())
          && decrementAndCheckFailureLimit())) {
        LOG.info("Failing rename({}, {})", source, dest);
        return false;
      }
    }
    final boolean b = wrappedOperations.renameFile(source, dest);
    // post rename timeout.
    maybeTimeout(op, source, renamePathsToTimeoutAfterRename);
    return b;
  }

  @Override
  public RemoteIterator<FileStatus> listStatusIterator(final Path path)
      throws IOException {
    verifyExists(path);
    maybeRaiseIOE("listStatus", path, listToFail);
    return wrappedOperations.listStatusIterator(path);
  }

  @Override
  public TaskManifest loadTaskManifest(JsonSerialization<TaskManifest> serializer,
      final FileStatus st) throws IOException {
    verifyExists(st.getPath());
    return wrappedOperations.loadTaskManifest(serializer, st);
  }

  @Override
  public <T extends AbstractManifestData<T>> void save(T manifestData,
      final Path path,
      final boolean overwrite) throws IOException {
    maybeRaiseIOE("save", path, saveToFail);
    wrappedOperations.save(manifestData, path, overwrite);
  }

  @Override
  public void msync(Path path) throws IOException {
    wrappedOperations.msync(path);
  }

  @Override
  public String getEtag(FileStatus status) {
    return wrappedOperations.getEtag(status);
  }

  @Override
  public boolean storeSupportsResilientCommit() {
    return wrappedOperations.storeSupportsResilientCommit();
  }

  @Override
  public CommitFileResult commitFile(final FileEntry entry)
      throws IOException {
    final String op = "commitFile";
    final Path source = entry.getSourcePath();
    maybeTimeout(op, source, renamePathsToTimeoutBeforeRename);
    if (renameToFailWithException) {
      maybeRaiseIOE(op,
          source, renameSourceFilesToFail);
      maybeRaiseIOE(op,
          entry.getDestPath().getParent(), renameDestDirsToFail);
    }
    final CommitFileResult result = wrappedOperations.commitFile(entry);
    // post rename timeout.
    maybeTimeout(op, source, renamePathsToTimeoutAfterRename);
    return result;
  }

  @Override
  public boolean storePreservesEtagsThroughRenames(Path path) {
    return wrappedOperations.storePreservesEtagsThroughRenames(path);
  }

  @Override
  public void close() throws IOException {
    wrappedOperations.close();
  }

}