StreamIntegration.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl.streams;

import java.lang.reflect.Constructor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_ACTIVE_RANGE_READS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_CUSTOM_FACTORY;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
import static org.apache.hadoop.fs.s3a.S3AUtils.longBytesOption;
import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
 * Stream integration, including factory construction.
 */
public final class StreamIntegration {

  /**
   * Enum/config name of a classic S3AInputStream: {@value}.
   */
  public static final String CLASSIC = "classic";

  /**
   * Enum/config name of of the Pinterest S3APrefetchingInputStream: {@value}.
   */
  public static final String PREFETCH = "prefetch";

  /**
   * Enum/config name of the analytics input stream: {@value}.
   */
  public static final String ANALYTICS = "analytics";

  /**
   * Reads in a classname : {@value}.
   */
  public static final String CUSTOM = "custom";

  /**
   * Special string for configuration only; is
   * mapped to the default stream type: {@value}.
   */
  public static final String DEFAULT = "default";

  /**
   * What is the default type?
   */
  public static final InputStreamType DEFAULT_STREAM_TYPE = InputStreamType.Classic;

  /**
   * Configuration deprecation log for warning about use of the
   * now deprecated {@code "fs.s3a.prefetch.enabled"} option..
   */
  private static final Logger LOG_DEPRECATION =
      LoggerFactory.getLogger(
          "org.apache.hadoop.conf.Configuration.deprecation");

  /**
   * Warn once on use of prefetch configuration option.
   */
  private static final LogExactlyOnce WARN_PREFETCH_KEY = new LogExactlyOnce(LOG_DEPRECATION);

  public static final String E_EMPTY_CUSTOM_CLASSNAME =
      "Configuration option " + INPUT_STREAM_CUSTOM_FACTORY
          + " is required when the input stream type is \"custom\"";

  public static final String E_INVALID_STREAM_TYPE = "Invalid stream type:";

  private StreamIntegration() {
  }

  /**
   * Create the input stream factory the configuration asks for.
   * <p>
   * This does not initialize the factory.
   * <p>
   * See {@link #determineInputStreamType(Configuration)} for the
   * resolution algorithm.
   * @param conf configuration
   * @return a stream factory.
   * @throws RuntimeException any binding/loading/instantiation problem
   */
  public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) {
    // Construct the factory.
    return determineInputStreamType(conf)
        .factory()
        .apply(conf);
  }

  /**
   * Determine the input stream type for the supplied configuration.
   * <p>
   * This does not perform any instantiation.
   * <p>
   * If the option {@code "fs.s3a.prefetch.enabled"} is set, the
   * prefetch stream is selected, after printing a
   * warning the first time this happens.
   * <p>
   * If the input stream type is declared as "default", then whatever
   * the current default stream type is returned, as defined by
   * {@link #DEFAULT_STREAM_TYPE}.
   * @param conf configuration
   * @return a stream factory.
   */
  public static InputStreamType determineInputStreamType(final Configuration conf) {
    // work out the default stream; this includes looking at the
    // deprecated prefetch enabled key to see if it is set.
    if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
      // prefetch enabled, warn (once) then change it to be the default.
      WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate stream in {}",
          PREFETCH_ENABLED_KEY, INPUT_STREAM_TYPE);
      return InputStreamType.Prefetch;
    }

    // retrieve the enum value, returning the configured value or
    // the (calculated) default
    return ConfigurationHelper.resolveEnum(conf,
        INPUT_STREAM_TYPE,
        InputStreamType.class,
        s -> {
          if (isEmpty(s) || DEFAULT.equalsIgnoreCase(s)) {
            // return default type.
            return DEFAULT_STREAM_TYPE;
          } else {
            // any other value
            throw new IllegalArgumentException(E_INVALID_STREAM_TYPE
                + " \"" + s + "\"");
          }
        });
  }

  /**
   * Load the input stream factory defined in the option
   * {@link Constants#INPUT_STREAM_CUSTOM_FACTORY}.
   * @param conf configuration to use
   * @return the custom factory
   * @throws RuntimeException any binding/loading/instantiation problem
   */
  static ObjectInputStreamFactory loadCustomFactory(Configuration conf) {

    // make sure the classname option is actually set
    final String name = conf.getTrimmed(INPUT_STREAM_CUSTOM_FACTORY, "");
    checkArgument(!isEmpty(name), E_EMPTY_CUSTOM_CLASSNAME);

    final Class<? extends ObjectInputStreamFactory> factoryClass =
        conf.getClass(INPUT_STREAM_CUSTOM_FACTORY,
            null,
            ObjectInputStreamFactory.class);

    try {
      final Constructor<? extends ObjectInputStreamFactory> ctor =
          factoryClass.getConstructor();
      return ctor.newInstance();
    } catch (Exception e) {
      throw new RuntimeException("Failed to instantiate custom class "
          + name + " " + e, e);
    }
  }

  /**
   * Populates the configurations related to vectored IO operations.
   * The context is still mutable at this point.
   * @param conf configuration object.
   * @return VectoredIOContext.
   */
  public static VectoredIOContext populateVectoredIOContext(Configuration conf) {
    final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
        DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
    final int maxReadSizeVectored =
        (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
            DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
    final int vectoredActiveRangeReads = intOption(conf,
        AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
    return new VectoredIOContext()
        .setMinSeekForVectoredReads(minSeekVectored)
        .setMaxReadSizeForVectoredReads(maxReadSizeVectored)
        .setVectoredActiveRangeReads(vectoredActiveRangeReads);
  }

}