CommitterTestHelper.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit;

import java.io.IOException;
import java.util.List;

import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.MultipartTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitOperations.extractMagicFileLength;

/**
 * Helper for committer tests: extra assertions and the like.
 */
public class CommitterTestHelper {

  private static final Logger LOG =
      LoggerFactory.getLogger(CommitterTestHelper.class);

  /**
   * Filesystem under test.
   */
  private final S3AFileSystem fileSystem;

  public static final String JOB_ID = "123";

  /**
   * Constructor.
   * @param fileSystem filesystem to work with.
   */
  public CommitterTestHelper(S3AFileSystem fileSystem) {
    this.fileSystem = requireNonNull(fileSystem);
  }

  /**
   * Get the filesystem.
   * @return the filesystem.
   */
  public S3AFileSystem getFileSystem() {
    return fileSystem;
  }

  /**
   * Assert a path refers to a marker file of an expected length;
   * the length is extracted from the custom header.
   * @param path magic file.
   * @param dataSize expected data size
   * @throws IOException IO failure
   */
  public void assertIsMarkerFile(Path path, long dataSize) throws IOException {
    final S3AFileSystem fs = getFileSystem();
    FileStatus status = verifyPathExists(fs,
        "uploaded file commit", path);
    Assertions.assertThat(status.getLen())
        .describedAs("Marker File file %s: %s", path, status)
        .isEqualTo(0);
    Assertions.assertThat(extractMagicFileLength(fs, path))
        .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + path)
        .isNotEmpty()
        .hasValue(dataSize);
  }

  /**
   * Assert a file does not have the magic marker header.
   * @param path magic file.
   * @throws IOException IO failure
   */
  public void assertFileLacksMarkerHeader(Path path) throws IOException {
    Assertions.assertThat(extractMagicFileLength(getFileSystem(),
            path))
        .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + path)
        .isEmpty();
  }

  /**
   * Create a new path which has the same filename as the dest file, but
   * is in a magic directory under the destination dir.
   * @param destFile final destination file
   * @return magic path
   */
  public static Path makeMagic(Path destFile) {
    return new Path(destFile.getParent(),
        MAGIC_PATH_PREFIX + JOB_ID + '/' + BASE + "/" + destFile.getName());
  }

  /**
   * Assert that an output stream is magic.
   * @param stream stream to probe.
   */
  public static void assertIsMagicStream(final FSDataOutputStream stream) {
    Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT))
        .describedAs("Stream capability %s in stream %s",
            STREAM_CAPABILITY_MAGIC_OUTPUT, stream)
        .isTrue();
  }

  /**
   * Abort all multipart uploads under a path.
   * @param path path for uploads to abort; may be null
   * @return a count of aborts
   * @throws IOException trouble.
   */
  public void abortMultipartUploadsUnderPath(Path path) {

    MultipartTestUtils.clearAnyUploads(getFileSystem(), path);
  }

  /**
   * Get a list of all pending uploads under a prefix, one which can be printed.
   * @param prefix prefix to look under
   * @return possibly empty list
   * @throws IOException IO failure.
   */
  public List<String> listMultipartUploads(
      String prefix) throws IOException {

    return MultipartTestUtils.listMultipartUploads(getFileSystem(), prefix);
  }

}