ITestUploadPurgeOnDirectoryOperations.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.MultipartUpload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.store.audit.AuditSpan;

import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;

/**
 * Test behavior of purging uploads in rename and delete.
 * S3 Express tests automatically set this; it is explicitly set for the rest.
 */
public class ITestUploadPurgeOnDirectoryOperations extends AbstractS3ACostTest {

  @Override
  public Configuration createConfiguration() {
    final Configuration conf = super.createConfiguration();
    removeBaseAndBucketOverrides(conf,
        DIRECTORY_OPERATIONS_PURGE_UPLOADS,
        MAGIC_COMMITTER_ENABLED);
    conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
    return conf;
  }

  @BeforeEach
  @Override
  public void setup() throws Exception {
    super.setup();
    final S3AFileSystem fs = getFileSystem();
    assertHasPathCapabilities(fs, new Path("/"),
        DIRECTORY_OPERATIONS_PURGE_UPLOADS);
    clearAnyUploads(fs, methodPath());
  }

  @Test
  public void testDeleteWithPendingUpload() throws Throwable {

    final S3AFileSystem fs = getFileSystem();
    final Path dir = methodPath();

    // create a magic file.
    createMagicFile(fs, dir);

    // and there's a pending upload
    assertUploadCount(dir, 1);

    // delete the dir, with a cost of 1 abort, 1 list.
    verifyMetrics(() -> fs.delete(dir, true),
        with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort
        with(OBJECT_MULTIPART_UPLOAD_LIST, 1),    // HTTP request inside iterator
        with(MULTIPART_UPLOAD_LIST, 0));          // api list call


    // and the pending upload is gone
    assertUploadCount(dir, 0);
  }

  @Test
  public void testRenameWithPendingUpload() throws Throwable {

    final S3AFileSystem fs = getFileSystem();
    final Path base = methodPath();
    final Path dir = new Path(base, "src");
    final Path dest = new Path(base, "dest");

    // create a magic file.
    createMagicFile(fs, dir);

    // and there's a pending upload
    assertUploadCount(dir, 1);

    // rename the dir, with a cost of 1 abort, 1 list.
    verifyMetrics(() -> fs.rename(dir, dest),
        with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort
        with(OBJECT_MULTIPART_UPLOAD_LIST, 1),    // HTTP request inside iterator
        with(MULTIPART_UPLOAD_LIST, 0));          // api list call

    // and there isn't
    assertUploadCount(dir, 0);
  }

  /**
   * Assert the upload count under a dir is the expected value.
   * Failure message will include the list of entries.
   * @param dir dir
   * @param expected expected count
   * @throws IOException listing problem
   */
  private void assertUploadCount(final Path dir, final int expected) throws IOException {
    Assertions.assertThat(toList(listUploads(dir)))
        .describedAs("uploads under %s", dir)
        .hasSize(expected);
  }

  /**
   * List uploads; use the same APIs that the directory operations use,
   * so implicitly validating them.
   * @param dir directory to list
   * @return full list of entries
   * @throws IOException listing problem
   */
  private RemoteIterator<MultipartUpload> listUploads(Path dir) throws IOException {
    final S3AFileSystem fs = getFileSystem();
    try (AuditSpan ignored = span()) {
      final StoreContext sc = fs.createStoreContext();
      return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir));
    }
  }
}