UploadContentProviders.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.ContentStreamProvider;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.store.ByteBufferInputStream;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkState;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;

/**
 * Implementations of {@code software.amazon.awssdk.http.ContentStreamProvider}.
 * <p>
 * These are required to ensure that retry of multipart uploads are reliable,
 * while also avoiding memory copy/consumption overhead.
 * <p>
 * For these reasons the providers built in to the AWS SDK are not used.
 * <p>
 * See HADOOP-19221 for details.
 */
public final class UploadContentProviders {

  public static final Logger LOG = LoggerFactory.getLogger(UploadContentProviders.class);

  private UploadContentProviders() {
  }

  /**
   * Create a content provider from a file.
   * @param file file to read.
   * @param offset offset in file.
   * @param size of data.
   * @return the provider
   * @throws IllegalArgumentException if the offset is negative.
   */
  public static BaseContentProvider<BufferedInputStream> fileContentProvider(
      File file,
      long offset,
      final int size) {

    return new FileWithOffsetContentProvider(file, offset, size);
  }

  /**
   * Create a content provider from a file.
   * @param file file to read.
   * @param offset offset in file.
   * @param size of data.
   * @param isOpen optional predicate to check if the stream is open.
   * @return the provider
   * @throws IllegalArgumentException if the offset is negative.
   */
  public static BaseContentProvider<BufferedInputStream> fileContentProvider(
      File file,
      long offset,
      final int size,
      final Supplier<Boolean> isOpen) {

    return new FileWithOffsetContentProvider(file, offset, size, isOpen);
  }

  /**
   * Create a content provider from a byte buffer.
   * The buffer is not copied and MUST NOT be modified while
   * the upload is taking place.
   * @param byteBuffer buffer to read.
   * @param size size of the data.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null
   */
  public static BaseContentProvider<ByteBufferInputStream> byteBufferContentProvider(
      final ByteBuffer byteBuffer,
      final int size) {
    return new ByteBufferContentProvider(byteBuffer, size);
  }

  /**
   * Create a content provider from a byte buffer.
   * The buffer is not copied and MUST NOT be modified while
   * the upload is taking place.
   * @param byteBuffer buffer to read.
   * @param size size of the data.
   * @param isOpen optional predicate to check if the stream is open.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null
   */
  public static BaseContentProvider<ByteBufferInputStream> byteBufferContentProvider(
      final ByteBuffer byteBuffer,
      final int size,
      final @Nullable Supplier<Boolean> isOpen) {

    return new ByteBufferContentProvider(byteBuffer, size, isOpen);
  }

  /**
   * Create a content provider for all or part of a byte array.
   * The buffer is not copied and MUST NOT be modified while
   * the upload is taking place.
   * @param bytes buffer to read.
   * @param offset offset in buffer.
   * @param size size of the data.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null.
   */
  public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider(
      final byte[] bytes, final int offset, final int size) {
    return new ByteArrayContentProvider(bytes, offset, size);
  }

  /**
   * Create a content provider for all or part of a byte array.
   * The buffer is not copied and MUST NOT be modified while
   * the upload is taking place.
   * @param bytes buffer to read.
   * @param offset offset in buffer.
   * @param size size of the data.
   * @param isOpen optional predicate to check if the stream is open.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null.
   */
  public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider(
      final byte[] bytes,
      final int offset,
      final int size,
      final @Nullable Supplier<Boolean> isOpen) {
    return new ByteArrayContentProvider(bytes, offset, size, isOpen);
  }

  /**
   * Create a content provider for all of a byte array.
   * @param bytes buffer to read.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null.
   */
  public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider(
      final byte[] bytes) {
    return byteArrayContentProvider(bytes, 0, bytes.length);
  }

  /**
   * Create a content provider for all of a byte array.
   * @param bytes buffer to read.
   * @param isOpen optional predicate to check if the stream is open.
   * @return the provider
   * @throws IllegalArgumentException if the arguments are invalid.
   * @throws NullPointerException if the buffer is null.
   */
  public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider(
      final byte[] bytes,
      final @Nullable Supplier<Boolean> isOpen) {
    return byteArrayContentProvider(bytes, 0, bytes.length, isOpen);
  }

  /**
   * Base class for content providers; tracks the number of times a stream
   * has been opened.
   * @param <T> type of stream created.
   */
  @VisibleForTesting
  public static abstract class BaseContentProvider<T extends InputStream>
      implements ContentStreamProvider, Closeable {

    /**
     * Size of the data.
     */
    private final int size;

    /**
     * Probe to check if the stream is open.
     * Invoked in {@link #checkOpen()}, which is itself
     * invoked in {@link #newStream()}.
     */
    private final Supplier<Boolean> isOpen;

    /**
     * How many times has a stream been created?
     */
    private int streamCreationCount;

    /**
     * Current stream. Null if not opened yet.
     * When {@link #newStream()} is called, this is set to the new value,
     * Note: when the input stream itself is closed, this reference is not updated.
     * Therefore this field not being null does not imply that the stream is open.
     */
    private T currentStream;

    /**
     * When did this upload start?
     * Use in error messages.
     */
    private final LocalDateTime startTime;

    /**
     * Constructor.
     * @param size size of the data. Must be non-negative.
     */
    protected BaseContentProvider(int size) {
      this(size, null);
    }

    /**
     * Constructor.
     * @param size size of the data. Must be non-negative.
     * @param isOpen optional predicate to check if the stream is open.
     */
    protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) {
      checkArgument(size >= 0, "size is negative: %s", size);
      this.size = size;
      this.isOpen = isOpen;
      this.startTime = LocalDateTime.now();
    }

    /**
     * Check if the stream is open.
     * If the stream is not open, raise an exception
     * @throws IllegalStateException if the stream is not open.
     */
    private void checkOpen() {
      checkState(isOpen == null || isOpen.get(), "Stream is closed: %s", this);
    }

    /**
     * Close the current stream.
     */
    @Override
    public void close() {
      cleanupWithLogger(LOG, getCurrentStream());
      setCurrentStream(null);
    }

    /**
     * Create a new stream.
     * <p>
     * Calls {@link #close()} to ensure that any existing stream is closed,
     * then {@link #checkOpen()} to verify that the data source is still open.
     * Logs if this is a subsequent event as it implies a failure of the first attempt.
     * @return the new stream
     */
    @Override
    public final InputStream newStream() {
      close();
      checkOpen();
      streamCreationCount++;
      if (streamCreationCount == 2) {
        // the stream has been recreated for the first time.
        // notify only once for this stream, so as not to flood
        // the logs.
        LOG.info("Stream recreated: {}", this);
      }
      return setCurrentStream(createNewStream());
    }

    /**
     * Override point for subclasses to create their new streams.
     * @return a stream
     */
    protected abstract T createNewStream();

    /**
     * How many times has a stream been created?
     * @return stream creation count
     */
    public int getStreamCreationCount() {
      return streamCreationCount;
    }

    /**
     * Size as set by constructor parameter.
     * @return size of the data
     */
    public int getSize() {
      return size;
    }

    /**
     * When did this upload start?
     * @return start time
     */
    public LocalDateTime getStartTime() {
      return startTime;
    }

    /**
     * Current stream.
     * When {@link #newStream()} is called, this is set to the new value,
     * after closing the previous one.
     * <p>
     * Why? The AWS SDK implementations do this, so there
     * is an implication that it is needed to avoid keeping streams
     * open on retries.
     * @return the current stream, or null if none is open.
     */
    protected T getCurrentStream() {
      return currentStream;
    }

    /**
     * Set the current stream.
     * @param stream the new stream
     * @return the current stream.
     */
    protected T setCurrentStream(T stream) {
      this.currentStream = stream;
      return stream;
    }

    @Override
    public String toString() {
      return "BaseContentProvider{" +
          "size=" + size +
          ", initiated at " + startTime +
          ", streamCreationCount=" + streamCreationCount +
          ", currentStream=" + currentStream +
          '}';
    }
  }

  /**
   * Content provider for a file with an offset.
   */
  private static final class FileWithOffsetContentProvider
      extends BaseContentProvider<BufferedInputStream> {

    /**
     * File to read.
     */
    private final File file;

    /**
     * Offset in file.
     */
    private final long offset;

    /**
     * Constructor.
     * @param file file to read.
     * @param offset offset in file.
     * @param size of data.
     * @param isOpen optional predicate to check if the stream is open.
     * @throws IllegalArgumentException if the offset is negative.
     */
    private FileWithOffsetContentProvider(
        final File file,
        final long offset,
        final int size,
        @Nullable final Supplier<Boolean> isOpen) {
      super(size, isOpen);
      this.file = requireNonNull(file);
      checkArgument(offset >= 0, "Offset is negative: %s", offset);
      this.offset = offset;
    }

    /**
     * Constructor.
     * @param file file to read.
     * @param offset offset in file.
     * @param size of data.
     * @throws IllegalArgumentException if the offset is negative.
     */
    private FileWithOffsetContentProvider(final File file,
        final long offset,
        final int size) {
      this(file, offset, size, null);
    }

    /**
     * Create a new stream.
     * @return a stream at the start of the offset in the file
     * @throws UncheckedIOException on IO failure.
     */
    @Override
    protected BufferedInputStream createNewStream() throws UncheckedIOException {
      // create the stream, seek to the offset.
      final FileInputStream fis = uncheckIOExceptions(() -> {
        final FileInputStream f = new FileInputStream(file);
        f.getChannel().position(offset);
        return f;
      });
      return setCurrentStream(new BufferedInputStream(fis));
    }

    @Override
    public String toString() {
      return "FileWithOffsetContentProvider{" +
          "file=" + file +
          ", offset=" + offset +
          "} " + super.toString();
    }

  }

  /**
   * Create a content provider for a byte buffer.
   * Uses {@link ByteBufferInputStream} to read the data.
   */
  private static final class ByteBufferContentProvider
      extends BaseContentProvider<ByteBufferInputStream> {

    /**
     * The buffer which will be read; on or off heap.
     */
    private final ByteBuffer blockBuffer;

    /**
     * The position in the buffer at the time the provider was created.
     */
    private final int initialPosition;

    /**
     * Constructor.
     * @param blockBuffer buffer to read.
     * @param size size of the data.
     * @throws IllegalArgumentException if the arguments are invalid.
     * @throws NullPointerException if the buffer is null
     */
    private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) {
      this(blockBuffer, size, null);
    }

    /**
     * Constructor.
     * @param blockBuffer buffer to read.
     * @param size size of the data.
     * @param isOpen optional predicate to check if the stream is open.
     * @throws IllegalArgumentException if the arguments are invalid.
     * @throws NullPointerException if the buffer is null
     */
    private ByteBufferContentProvider(
        final ByteBuffer blockBuffer,
        int size,
        @Nullable final Supplier<Boolean> isOpen) {
      super(size, isOpen);
      this.blockBuffer = blockBuffer;
      this.initialPosition = blockBuffer.position();
    }

    @Override
    protected ByteBufferInputStream createNewStream() {
      // set the buffer up from reading from the beginning
      blockBuffer.limit(initialPosition);
      blockBuffer.position(0);
      return new ByteBufferInputStream(getSize(), blockBuffer);
    }

    @Override
    public String toString() {
      return "ByteBufferContentProvider{" +
          "blockBuffer=" + blockBuffer +
          ", initialPosition=" + initialPosition +
          "} " + super.toString();
    }
  }

  /**
   * Simple byte array content provider.
   * <p>
   * The array is not copied; if it is changed during the write the outcome
   * of the upload is undefined.
   */
  private static final class ByteArrayContentProvider
      extends BaseContentProvider<ByteArrayInputStream> {

    /**
     * The buffer where data is stored.
     */
    private final byte[] bytes;

    /**
     * Offset in the buffer.
     */
    private final int offset;

    /**
     * Constructor.
     * @param bytes buffer to read.
     * @param offset offset in buffer.
     * @param size length of the data.
     * @throws IllegalArgumentException if the arguments are invalid.
     * @throws NullPointerException if the buffer is null
     */
    private ByteArrayContentProvider(
        final byte[] bytes,
        final int offset,
        final int size) {
      this(bytes, offset, size, null);
    }

    /**
     * Constructor.
     * @param bytes buffer to read.
     * @param offset offset in buffer.
     * @param size length of the data.
     * @param isOpen optional predicate to check if the stream is open.
     * @throws IllegalArgumentException if the arguments are invalid.
     * @throws NullPointerException if the buffer is null
     */
    private ByteArrayContentProvider(
        final byte[] bytes,
        final int offset,
        final int size,
        final Supplier<Boolean> isOpen) {

      super(size, isOpen);
      this.bytes = bytes;
      this.offset = offset;
      checkArgument(offset >= 0, "Offset is negative: %s", offset);
      final int length = bytes.length;
      checkArgument((offset + size) <= length,
          "Data to read [%d-%d] is past end of array %s",
          offset,
          offset + size, length);
    }

    @Override
    protected ByteArrayInputStream createNewStream() {
      return new ByteArrayInputStream(bytes, offset, getSize());
    }

    @Override
    public String toString() {
      return "ByteArrayContentProvider{" +
          "buffer with length=" + bytes.length +
          ", offset=" + offset +
          "} " + super.toString();
    }
  }

}