ITestRenameDeleteRace.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;

import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
 * HADOOP-16721: race condition with delete and rename underneath the
 * same destination directory.
 * This test suite recreates the failure using semaphores to
 * guarantee the failure condition is encountered
 * -then verifies that the rename operation is successful.
 */
public class ITestRenameDeleteRace extends AbstractS3ATestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(ITestRenameDeleteRace.class);


  /** Many threads for scale performance: {@value}. */
  public static final int EXECUTOR_THREAD_COUNT = 2;

  /**
   * For submitting work.
   */
  private static final BlockingThreadPoolExecutorService EXECUTOR =
      BlockingThreadPoolExecutorService.newInstance(
          EXECUTOR_THREAD_COUNT,
          EXECUTOR_THREAD_COUNT * 2,
          30, TimeUnit.SECONDS,
          "test-operations");

  /**
   * This test uses a subclass of S3AFileSystem to recreate the race between
   * subdirectory delete and rename.
   * The JUnit thread performs the rename, while an executor-submitted
   * thread performs the delete.
   * Semaphores are used to
   * -block the JUnit thread from initiating the rename until the delete
   * has finished the delete phase, and has reached the
   * {@code maybeCreateFakeParentDirectory()} call.
   * A second semaphore is used to block the delete thread from
   * listing and recreating the deleted directory until after
   * the JUnit thread has completed.
   * Together, the two semaphores guarantee that the rename()
   * call will be made at exactly the moment when the destination
   * directory no longer exists.
   */
  @Test
  public void testDeleteRenameRaceCondition() throws Throwable {
    describe("verify no race between delete and rename");

    // the normal FS is used for path setup, verification
    // and the rename call.
    final S3AFileSystem fs = getFileSystem();
    final Path path = path(getMethodName());
    Path srcDir = new Path(path, "src");

    // this dir must exist throughout the rename
    Path destDir = new Path(path, "dest");
    // this dir tree will be deleted in a thread which does not
    // complete before the rename exists
    Path destSubdir1 = new Path(destDir, "subdir1");
    Path subfile1 = new Path(destSubdir1, "subfile1");

    // this is the directory we want to copy over under the dest dir
    Path srcSubdir2 = new Path(srcDir, "subdir2");
    Path srcSubfile = new Path(srcSubdir2, "subfile2");
    Path destSubdir2 = new Path(destDir, "subdir2");

    // creates subfile1 and all parents, so that
    // dest/subdir1/subfile1 exists as a file;
    // dest/subdir1 and dest are directories without markers
    ContractTestUtils.touch(fs, subfile1);
    assertIsDirectory(destDir);

    // source subfile
    ContractTestUtils.touch(fs, srcSubfile);

    // this is the FS used for delete()
    final BlockingFakeDirMarkerFS blockingFS
        = new BlockingFakeDirMarkerFS();
    blockingFS.initialize(fs.getUri(), fs.getConf());
    // get the semaphore; this ensures that the next attempt to create
    // a fake marker blocks
    blockingFS.blockFakeDirCreation();
    try {
      final CompletableFuture<Path> future = submit(EXECUTOR, () -> {
        LOG.info("deleting {}", destSubdir1);
        blockingFS.delete(destSubdir1, true);
        return destSubdir1;
      });

      // wait for the blocking FS to return from the DELETE call.
      blockingFS.awaitFakeDirCreation();

      try {
        // there is now no destination directory
        assertPathDoesNotExist("should have been implicitly deleted",
            destDir);

        // attempt the rename in the normal FS.
        LOG.info("renaming {} to {}", srcSubdir2, destSubdir2);
        Assertions.assertThat(fs.rename(srcSubdir2, destSubdir2))
            .describedAs("rename(%s, %s)", srcSubdir2, destSubdir2)
            .isTrue();
        // dest dir implicitly exists.
        assertPathExists("must now exist", destDir);
      } finally {
        // release the remaining semaphore so that the deletion thread exits.
        blockingFS.allowFakeDirCreationToProceed();
      }

      // now let the delete complete
      LOG.info("Waiting for delete {} to finish", destSubdir1);
      waitForCompletion(future);

      // everything still exists
      assertPathExists("must now exist", destDir);
      assertPathExists("must now exist", new Path(destSubdir2, "subfile2"));
      assertPathDoesNotExist("Src dir deleted", srcSubdir2);

    } finally {
      cleanupWithLogger(LOG, blockingFS);
    }

  }

  /**
   * Subclass of S3A FS whose execution of maybeCreateFakeParentDirectory
   * can be choreographed with another thread so as to reliably
   * create the delete/rename race condition.
   * This class is only intended for "single shot" API calls.
   */
  private final class BlockingFakeDirMarkerFS extends S3AFileSystem {

    /**
     * Block for entry into maybeCreateFakeParentDirectory(); will be released
     * then.
     */
    private final Semaphore signalCreatingFakeParentDirectory =
        new Semaphore(1);

    /**
     * Semaphore to acquire before the marker can be listed/created.
     */
    private final Semaphore blockBeforeCreatingMarker = new Semaphore(1);

    private BlockingFakeDirMarkerFS() {
      signalCreatingFakeParentDirectory.acquireUninterruptibly();
    }

    @Override
    protected void maybeCreateFakeParentDirectory(final Path path)
        throws IOException, SdkException {
      LOG.info("waking anything blocked on the signal semaphore");
      // notify anything waiting
      signalCreatingFakeParentDirectory.release();
      // acquire the semaphore and then create any fake directory
      LOG.info("blocking for creation");
      blockBeforeCreatingMarker.acquireUninterruptibly();
      try {
        LOG.info("probing for/creating markers");
        super.maybeCreateFakeParentDirectory(path);
      } finally {
        // and release the marker for completeness.
        blockBeforeCreatingMarker.release();
      }
    }

    /**
     * Block until fake dir creation is invoked.
     */
    public void blockFakeDirCreation() throws InterruptedException {
      blockBeforeCreatingMarker.acquire();
    }

    /**
     * wait for the blocking FS to return from the DELETE call.
     */
    public void awaitFakeDirCreation() throws InterruptedException {
      LOG.info("Blocking until maybeCreateFakeParentDirectory() is reached");
      signalCreatingFakeParentDirectory.acquire();
    }

    public void allowFakeDirCreationToProceed() {
      LOG.info("Allowing the fake directory LIST/PUT to proceed.");
      blockBeforeCreatingMarker.release();
    }
  }

}