OpenFileSupport.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FSBuilderSupport;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
 * Helper class for openFile() logic, especially processing file status
 * args and length/etag/versionID.
 * <p>
 *  This got complex enough it merited removal from S3AFileSystem -which
 *  also permits unit testing.
 * </p>
 * <p>
 *   The default values are those from the FileSystem configuration.
 *   in openFile(), they can all be changed by specific options;
 *   in FileSystem.open(path, buffersize) only the buffer size is
 *   set.
 * </p>
 */
public class OpenFileSupport {

  private static final Logger LOG =
      LoggerFactory.getLogger(OpenFileSupport.class);

  public static final LogExactlyOnce LOG_NO_SQL_SELECT = new LogExactlyOnce(LOG);
  /**
   * For use when a value of an split/file length is unknown.
   */
  private static final int LENGTH_UNKNOWN = -1;

  /**  Default change detection policy. */
  private final ChangeDetectionPolicy changePolicy;

  /** Default read ahead range. */
  private final long defaultReadAhead;

  /** Username. */
  private final String username;

  /** Default buffer size. */
  private final int defaultBufferSize;

  /**
   * Threshold for stream reads to switch to
   * asynchronous draining.
   */
  private final long defaultAsyncDrainThreshold;

  /**
   * Default input policy; may be overridden in
   * {@code openFile()}.
   */
  private final S3AInputPolicy defaultInputPolicy;

  /**
   * Instantiate with the default options from the filesystem.
   * @param changePolicy change detection policy
   * @param defaultReadAhead read ahead range
   * @param username username
   * @param defaultBufferSize buffer size
   * @param defaultAsyncDrainThreshold drain threshold
   * @param defaultInputPolicy input policy
   */
  public OpenFileSupport(
      final ChangeDetectionPolicy changePolicy,
      final long defaultReadAhead,
      final String username,
      final int defaultBufferSize,
      final long defaultAsyncDrainThreshold,
      final S3AInputPolicy defaultInputPolicy) {
    this.changePolicy = changePolicy;
    this.defaultReadAhead = defaultReadAhead;
    this.username = username;
    this.defaultBufferSize = defaultBufferSize;
    this.defaultAsyncDrainThreshold = defaultAsyncDrainThreshold;
    this.defaultInputPolicy = defaultInputPolicy;
  }

  public ChangeDetectionPolicy getChangePolicy() {
    return changePolicy;
  }

  public long getDefaultReadAhead() {
    return defaultReadAhead;
  }

  public int getDefaultBufferSize() {
    return defaultBufferSize;
  }

  public long getDefaultAsyncDrainThreshold() {
    return defaultAsyncDrainThreshold;
  }

  /**
   * Propagate the default options to the operation context
   * being built up.
   * @param roc context
   * @return the context
   */
  public S3AReadOpContext applyDefaultOptions(S3AReadOpContext roc) {
    return roc
        .withInputPolicy(defaultInputPolicy)
        .withChangeDetectionPolicy(changePolicy)
        .withAsyncDrainThreshold(defaultAsyncDrainThreshold)
        .withReadahead(defaultReadAhead);
  }

  /**
   * Prepare to open a file from the openFile parameters.
   * S3Select SQL is rejected if a mandatory opt, ignored if optional.
   * @param path path to the file
   * @param parameters open file parameters from the builder.
   * @param blockSize for fileStatus
   * @return open file options
   * @throws IOException failure to resolve the link.
   * @throws IllegalArgumentException unknown mandatory key
   * @throws UnsupportedOperationException for S3 Select options.
   */
  @SuppressWarnings("ChainOfInstanceofChecks")
  public OpenFileInformation prepareToOpenFile(
      final Path path,
      final OpenFileParameters parameters,
      final long blockSize) throws IOException {
    Configuration options = parameters.getOptions();
    Set<String> mandatoryKeys = parameters.getMandatoryKeys();
    // S3 Select is not supported in this release
    if (options.get(SelectConstants.SELECT_SQL, null) != null) {
      if (mandatoryKeys.contains(SelectConstants.SELECT_SQL)) {
        // mandatory option: fail with a specific message.
        throw new UnsupportedOperationException(SelectConstants.SELECT_UNSUPPORTED);
      } else {
        // optional; log once and continue
        LOG_NO_SQL_SELECT.warn(SelectConstants.SELECT_UNSUPPORTED);
      }
    }
    // choice of keys depends on open type
    rejectUnknownMandatoryKeys(
        mandatoryKeys,
        InternalConstants.S3A_OPENFILE_KEYS,
        "for " + path + " in file I/O");

    // where does a read end?
    long fileLength = LENGTH_UNKNOWN;

    // was a status passed in via a withStatus() invocation in
    // the builder API?
    FileStatus providedStatus = parameters.getStatus();
    S3AFileStatus fileStatus = null;
    if (providedStatus != null) {
      // there's a file status

      // make sure the file name matches -the rest of the path
      // MUST NOT be checked.
      Path providedStatusPath = providedStatus.getPath();
      checkArgument(path.getName().equals(providedStatusPath.getName()),
          "Filename mismatch between file being opened %s and"
              + " supplied filestatus %s",
          path, providedStatusPath);

      // make sure the status references a file
      if (providedStatus.isDirectory()) {
        throw new FileNotFoundException(
            "Supplied status references a directory " + providedStatus);
      }
      // build up the values
      long len = providedStatus.getLen();
      long modTime = providedStatus.getModificationTime();
      String versionId;
      String eTag;
      // can use this status to skip our own probes,
      LOG.debug("File was opened with a supplied FileStatus;"
              + " skipping getFileStatus call in open() operation: {}",
          providedStatus);

      // what type is the status (and hence: what information does it contain?)
      if (providedStatus instanceof S3AFileStatus) {
        // is it an S3AFileSystem status?
        S3AFileStatus st = (S3AFileStatus) providedStatus;
        versionId = st.getVersionId();
        eTag = st.getEtag();
      } else if (providedStatus instanceof S3ALocatedFileStatus) {

        //  S3ALocatedFileStatus instance may supply etag and version.
        S3ALocatedFileStatus st = (S3ALocatedFileStatus) providedStatus;
        versionId = st.getVersionId();
        eTag = st.getEtag();
      } else {
        // it is another type.
        // build a status struct without etag or version.
        LOG.debug("Converting file status {}", providedStatus);
        versionId = null;
        eTag = null;
      }
      // Construct a new file status with the real path of the file.
      fileStatus = new S3AFileStatus(
          len,
          modTime,
          path,
          blockSize,
          username,
          eTag,
          versionId);
      // set the end of the read to the file length
      fileLength = fileStatus.getLen();
    }
    FSBuilderSupport builderSupport = new FSBuilderSupport(options);
    // determine start and end of file.
    long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);

    // split end
    long splitEnd = builderSupport.getLong(
        FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);

    if (splitStart > 0 && splitStart > splitEnd) {
      LOG.warn("Split start {} is greater than split end {}, resetting",
          splitStart, splitEnd);
      splitStart = 0;
    }

    // read end is the open file value
    fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);

    // if the read end has come from options, use that
    // in creating a file status
    if (fileLength >= 0 && fileStatus == null) {
      fileStatus = createStatus(path, fileLength, blockSize);
    }

    // Build up the input policy.
    // seek policy from default, s3a opt or standard option
    // read from the FS standard option.
    Collection<String> policies =
        options.getStringCollection(FS_OPTION_OPENFILE_READ_POLICY);
    if (policies.isEmpty()) {
      // fall back to looking at the S3A-specific option.
      policies = options.getStringCollection(INPUT_FADVISE);
    }

    return new OpenFileInformation()
        .withAsyncDrainThreshold(
            builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
                defaultReadAhead))
        .withBufferSize(
            (int)builderSupport.getPositiveLong(
                FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
        .withChangePolicy(changePolicy)
        .withFileLength(fileLength)
        .withInputPolicy(
            S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
        .withReadAheadRange(
            builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
        .withSplitStart(splitStart)
        .withSplitEnd(splitEnd)
        .withStatus(fileStatus)
        .build();

  }

  /**
   * Create a minimal file status.
   * @param path path
   * @param length file length/read end
   * @param blockSize block size
   * @return a new status
   */
  private S3AFileStatus createStatus(Path path, long length, long blockSize) {
    return new S3AFileStatus(
        length,
        0,
        path,
        blockSize,
        username,
        null,
        null);
  }

  /**
   * Open a simple file, using all the default
   * options.
   * @return the parameters needed to open a file through
   * {@code open(path, bufferSize)}.
   * @param bufferSize  buffer size
   */
  public OpenFileInformation openSimpleFile(final int bufferSize) {
    return new OpenFileInformation()
        .withAsyncDrainThreshold(defaultAsyncDrainThreshold)
        .withBufferSize(bufferSize)
        .withChangePolicy(changePolicy)
        .withFileLength(LENGTH_UNKNOWN)
        .withInputPolicy(defaultInputPolicy)
        .withReadAheadRange(defaultReadAhead)
        .withSplitStart(0)
        .withSplitEnd(LENGTH_UNKNOWN)
        .build();
  }

  @Override
  public String toString() {
    return "OpenFileSupport{" +
        "changePolicy=" + changePolicy +
        ", defaultReadAhead=" + defaultReadAhead +
        ", defaultBufferSize=" + defaultBufferSize +
        ", defaultAsyncDrainThreshold=" + defaultAsyncDrainThreshold +
        ", defaultInputPolicy=" + defaultInputPolicy +
        '}';
  }

  /**
   * The information on a file needed to open it.
   */
  public static final class OpenFileInformation {

    /** File status; may be null. */
    private S3AFileStatus status;

    /** Active input policy. */
    private S3AInputPolicy inputPolicy;

    /** Change detection policy. */
    private ChangeDetectionPolicy changePolicy;

    /** Read ahead range. */
    private long readAheadRange;

    /** Buffer size. Currently ignored. */
    private int bufferSize;

    /**
     * Where does the read start from. 0 unless known.
     */
    private long splitStart;

    /**
     * What is the split end?
     * Negative if not known.
     */
    private long splitEnd = -1;

    /**
     * What is the file length?
     * Negative if not known.
     */
    private long fileLength = -1;

    /**
     * Threshold for stream reads to switch to
     * asynchronous draining.
     */
    private long asyncDrainThreshold;

    /**
     * Constructor.
     */
    public OpenFileInformation() {
    }

    /**
     * Build.
     * @return this object
     */
    public OpenFileInformation build() {
      return this;
    }

    public S3AFileStatus getStatus() {
      return status;
    }

    public S3AInputPolicy getInputPolicy() {
      return inputPolicy;
    }

    public ChangeDetectionPolicy getChangePolicy() {
      return changePolicy;
    }

    public long getReadAheadRange() {
      return readAheadRange;
    }

    public int getBufferSize() {
      return bufferSize;
    }

    public long getSplitStart() {
      return splitStart;
    }

    public long getSplitEnd() {
      return splitEnd;
    }

    @Override
    public String toString() {
      return "OpenFileInformation{" +
          "status=" + status +
          ", inputPolicy=" + inputPolicy +
          ", changePolicy=" + changePolicy +
          ", readAheadRange=" + readAheadRange +
          ", splitStart=" + splitStart +
          ", splitEnd=" + splitEnd +
          ", bufferSize=" + bufferSize +
          ", drainThreshold=" + asyncDrainThreshold +
          '}';
    }

    /**
     * Get the file length.
     * @return the file length; -1 if not known.
     */
    public long getFileLength() {
      return fileLength;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withStatus(final S3AFileStatus value) {
      status = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withInputPolicy(final S3AInputPolicy value) {
      inputPolicy = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withChangePolicy(final ChangeDetectionPolicy value) {
      changePolicy = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withReadAheadRange(final long value) {
      readAheadRange = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withBufferSize(final int value) {
      bufferSize = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withSplitStart(final long value) {
      splitStart = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withSplitEnd(final long value) {
      splitEnd = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withFileLength(final long value) {
      fileLength = value;
      return this;
    }

    /**
     * Set builder value.
     * @param value new value
     * @return the builder
     */
    public OpenFileInformation withAsyncDrainThreshold(final long value) {
      asyncDrainThreshold = value;
      return this;
    }

    /**
     * Propagate the options to the operation context
     * being built up.
     * @param roc context
     * @return the context
     */
    public S3AReadOpContext applyOptions(S3AReadOpContext roc) {
      return roc
          .withInputPolicy(inputPolicy)
          .withChangeDetectionPolicy(changePolicy)
          .withAsyncDrainThreshold(asyncDrainThreshold)
          .withReadahead(readAheadRange);
    }

  }

}