S3AStore.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.service.Service;

/**
 * Interface for the S3A Store;
 * S3 client interactions should be via this; mocking
 * is possible for unit tests.
 * <p>
 * The {@link ClientManager} interface is used to create the AWS clients;
 * the base implementation forwards to the implementation of this interface
 * passed in at construction time.
 * <p>
 * The interface extends the Hadoop {@link Service} interface
 * and follows its lifecycle: it MUST NOT be used until
 * {@link Service#init(Configuration)} has been invoked.
 */
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
public interface S3AStore extends
    ClientManager,
    IOStatisticsSource,
    ObjectInputStreamFactory,
    PathCapabilities,
    Service {

  /**
   * Acquire write capacity for operations.
   * This should be done within retry loops.
   * @param capacity capacity to acquire.
   * @return time spent waiting for output.
   */
  Duration acquireWriteCapacity(int capacity);

  /**
   * Acquire read capacity for operations.
   * This should be done within retry loops.
   * @param capacity capacity to acquire.
   * @return time spent waiting for output.
   */
  Duration acquireReadCapacity(int capacity);

  StoreContext getStoreContext();

  DurationTrackerFactory getDurationTrackerFactory();

  S3AStatisticsContext getStatisticsContext();

  RequestFactory getRequestFactory();

  ClientManager clientManager();

  /**
   * Increment read operations.
   */
  void incrementReadOperations();

  /**
   * Increment the write operation counter.
   * This is somewhat inaccurate, as it appears to be invoked more
   * often than needed in progress callbacks.
   */
  void incrementWriteOperations();

  /**
   * At the start of a put/multipart upload operation, update the
   * relevant counters.
   *
   * @param bytes bytes in the request.
   */
  void incrementPutStartStatistics(long bytes);

  /**
   * At the end of a put/multipart upload operation, update the
   * relevant counters and gauges.
   *
   * @param success did the operation succeed?
   * @param bytes bytes in the request.
   */
  void incrementPutCompletedStatistics(boolean success, long bytes);

  /**
   * Callback for use in progress callbacks from put/multipart upload events.
   * Increments those statistics which are expected to be updated during
   * the ongoing upload operation.
   * @param key key to file that is being written (for logging)
   * @param bytes bytes successfully uploaded.
   */
  void incrementPutProgressStatistics(String key, long bytes);

  /**
   * Given a possibly null duration tracker factory, return a non-null
   * one for use in tracking durations -either that or the FS tracker
   * itself.
   *
   * @param factory factory.
   * @return a non-null factory.
   */
  DurationTrackerFactory nonNullDurationTrackerFactory(
      DurationTrackerFactory factory);

  /**
   * Perform a bulk object delete operation against S3.
   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
   * operation statistics
   * <p>
   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
   * of objects deleted in the request.
   * <p>
   * Retry policy: retry untranslated; delete considered idempotent.
   * If the request is throttled, this is logged in the throttle statistics,
   * with the counter set to the number of keys, rather than the number
   * of invocations of the delete operation.
   * This is because S3 considers each key as one mutating operation on
   * the store when updating its load counters on a specific partition
   * of an S3 bucket.
   * If only the request was measured, this operation would under-report.
   * A write capacity will be requested proportional to the number of keys
   * preset in the request and will be re-requested during retries such that
   * retries throttle better. If the request is throttled, the time spent is
   * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
   * @param deleteRequest keys to delete on the s3-backend
   * @return the AWS response
   * @throws MultiObjectDeleteException one or more of the keys could not
   * be deleted.
   * @throws SdkException amazon-layer failure.
   * @throws IOException IO problems.
   */
  @Retries.RetryRaw
  Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(DeleteObjectsRequest deleteRequest)
      throws MultiObjectDeleteException, SdkException, IOException;

  /**
   * Delete an object.
   * Increments the {@code OBJECT_DELETE_REQUESTS} statistics.
   * <p>
   * Retry policy: retry untranslated; delete considered idempotent.
   * 404 errors other than bucket not found are swallowed;
   * this can be raised by third party stores (GCS).
   * <p>
   * A write capacity of 1 ( as it is signle object delete) will be requested before
   * the delete call and will be re-requested during retries such that
   * retries throttle better. If the request is throttled, the time spent is
   * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
   * If an exception is caught and swallowed, the response will be empty;
   * otherwise it will be the response from the delete operation.
   * @param request request to make
   * @return the total duration and response.
   * @throws SdkException problems working with S3
   * @throws IllegalArgumentException if the request was rejected due to
   * a mistaken attempt to delete the root directory.
   */
  @Retries.RetryRaw
  Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
      DeleteObjectRequest request) throws SdkException;

  /**
   * Performs a HEAD request on an S3 object to retrieve its metadata.
   *
   * @param key           The S3 object key to perform the HEAD operation on
   * @param changeTracker Tracks changes to the object's metadata across operations
   * @param changeInvoker The invoker responsible for executing the HEAD request with retries
   * @param fsHandler     Handler for filesystem-level operations and configurations
   * @param operation     Description of the operation being performed for tracking purposes
   * @return              HeadObjectResponse containing the object's metadata
   * @throws IOException  If the HEAD request fails, object doesn't exist, or other I/O errors occur
   */
  @Retries.RetryRaw
  HeadObjectResponse headObject(String key,
      ChangeTracker changeTracker,
      Invoker changeInvoker,
      S3AFileSystemOperations fsHandler,
      String operation) throws IOException;

  /**
   * Retrieves a specific byte range of an S3 object as a stream.
   *
   * @param key    The S3 object key to retrieve
   * @param start  The starting byte position (inclusive) of the range to retrieve
   * @param end    The ending byte position (inclusive) of the range to retrieve
   * @return       A ResponseInputStream containing the requested byte range of the S3 object
   * @throws IOException  If the object cannot be retrieved other I/O errors occur
   * @see GetObjectResponse  For additional metadata about the retrieved object
   */
  @Retries.RetryRaw
  ResponseInputStream<GetObjectResponse> getRangedS3Object(String key,
      long start,
      long end) throws IOException;

  /**
   * Upload part of a multi-partition file.
   * Increments the write and put counters.
   * <i>Important: this call does not close any input stream in the body.</i>
   * <p>
   * Retry Policy: none.
   * @param durationTrackerFactory duration tracker factory for operation
   * @param request the upload part request.
   * @param body the request body.
   * @return the result of the operation.
   * @throws AwsServiceException on problems
   * @throws UncheckedIOException failure to instantiate the s3 client
   */
  @Retries.OnceRaw
  UploadPartResponse uploadPart(
      UploadPartRequest request,
      RequestBody body,
      DurationTrackerFactory durationTrackerFactory)
      throws AwsServiceException, UncheckedIOException;

  /**
   * Start a transfer-manager managed async PUT of an object,
   * incrementing the put requests and put bytes
   * counters.
   * <p>
   * It does not update the other counters,
   * as existing code does that as progress callbacks come in.
   * Byte length is calculated from the file length, or, if there is no
   * file, from the content length of the header.
   * <p>
   * Because the operation is async, any stream supplied in the request
   * must reference data (files, buffers) which stay valid until the upload
   * completes.
   * Retry policy: N/A: the transfer manager is performing the upload.
   * Auditing: must be inside an audit span.
   * @param putObjectRequest the request
   * @param file the file to be uploaded
   * @param listener the progress listener for the request
   * @return the upload initiated
   * @throws IOException if transfer manager creation failed.
   */
  @Retries.OnceRaw
  UploadInfo putObject(
      PutObjectRequest putObjectRequest,
      File file,
      ProgressableProgressListener listener) throws IOException;

  /**
   * Wait for an upload to complete.
   * If the upload (or its result collection) failed, this is where
   * the failure is raised as an AWS exception.
   * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)}
   * to update the statistics.
   * @param key destination key
   * @param uploadInfo upload to wait for
   * @return the upload result
   * @throws IOException IO failure
   * @throws CancellationException if the wait() was cancelled
   */
  @Retries.OnceTranslated
  CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo)
      throws IOException;

  /**
   * Complete a multipart upload.
   * @param request request
   * @return the response
   */
  @Retries.OnceRaw
  CompleteMultipartUploadResponse completeMultipartUpload(
      CompleteMultipartUploadRequest request);

  /**
   * Get the directory allocator.
   * @return the directory allocator
   */
  LocalDirAllocator getDirectoryAllocator();

  /**
   * Demand create the directory allocator, then create a temporary file.
   * This does not mark the file for deletion when a process exits.
   * Pass in a file size of {@link LocalDirAllocator#SIZE_UNKNOWN} if the
   * size is unknown.
   * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
   * @param pathStr prefix for the temporary file
   * @param size the size of the file that is going to be written
   * @param conf the Configuration object
   * @return a unique temporary file
   * @throws IOException IO problems
   */
  File createTemporaryFileForWriting(String pathStr,
      long size,
      Configuration conf) throws IOException;


  /*
   =============== BEGIN ObjectInputStreamFactory ===============
   */

  /**
   * Return the capabilities of input streams created
   * through the store.
   * @param capability string to query the stream support for.
   * @return capabilities declared supported in streams.
   */
  boolean inputStreamHasCapability(String capability);

  /**
   * The StreamCapabilities is part of ObjectInputStreamFactory.
   * To avoid confusion with any other streams which may
   * be added here: always return false.
   * @param capability string to query the stream support for.
   * @return false, always.
   */
  default boolean hasCapability(String capability) {
    return false;
  }

  /*
   =============== END ObjectInputStreamFactory ===============
   */
}