GoogleHadoopFileSystemConfiguration.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.gs;

import java.time.Duration;
import java.util.regex.Pattern;

import static java.lang.Math.toIntExact;

import org.apache.hadoop.conf.Configuration;

/**
 * This class provides a configuration for the {@link GoogleHadoopFileSystem} implementations.
 */
class GoogleHadoopFileSystemConfiguration {
  private static final Long GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT = 8 * 1024 * 1024L;

  /**
   * Configuration key for default block size of a file.
   *
   * <p>Note that this is the size that is reported to Hadoop FS clients. It does not modify the
   * actual block size of an underlying GCS object, because GCS JSON API does not allow modifying or
   * querying the value. Modifying this value allows one to control how many mappers are used to
   * process a given file.
   */
  static final HadoopConfigurationProperty<Long> BLOCK_SIZE =
      new HadoopConfigurationProperty<>("fs.gs.block.size", 64 * 1024 * 1024L);

  /**
   * Configuration key for GCS project ID. Default value: none
   */
  private static final HadoopConfigurationProperty<String> GCS_PROJECT_ID =
      new HadoopConfigurationProperty<>("fs.gs.project.id");

  /**
   * Configuration key for initial working directory of a GHFS instance. Default value: '/'
   */
  static final HadoopConfigurationProperty<String> GCS_WORKING_DIRECTORY =
      new HadoopConfigurationProperty<>("fs.gs.working.dir", "/");

  /**
   * Configuration key for setting write buffer size.
   */
  private static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE =
      new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 1024 * 1024);


  /**
   * If forward seeks are within this many bytes of the current position, seeks are performed by
   * reading and discarding bytes in-place rather than opening a new underlying stream.
   */
  private static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT =
      new HadoopConfigurationProperty<>(
          "fs.gs.inputstream.inplace.seek.limit",
          GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT);

  /** Tunes reading objects behavior to optimize HTTP GET requests for various use cases. */
  private static final HadoopConfigurationProperty<Fadvise> GCS_INPUT_STREAM_FADVISE =
      new HadoopConfigurationProperty<>("fs.gs.inputstream.fadvise", Fadvise.RANDOM);

  /**
   * If false, reading a file with GZIP content encoding (HTTP header "Content-Encoding: gzip") will
   * result in failure (IOException is thrown).
   */
  private static final HadoopConfigurationProperty<Boolean>
      GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE =
      new HadoopConfigurationProperty<>(
          "fs.gs.inputstream.support.gzip.encoding.enable",
          false);

  /**
   * Minimum size in bytes of the HTTP Range header set in GCS request when opening new stream to
   * read an object.
   */
  private static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE =
      new HadoopConfigurationProperty<>(
          "fs.gs.inputstream.min.range.request.size",
          2 * 1024 * 1024L);

  /**
   * Configuration key for number of request to track for adapting the access pattern i.e. fadvise:
   * AUTO & AUTO_RANDOM.
   */
  private static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT =
      new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3);

  /**
   * Configuration key for specifying max number of bytes rewritten in a single rewrite request when
   * fs.gs.copy.with.rewrite.enable is set to 'true'.
   */
  private static final HadoopConfigurationProperty<Long> GCS_REWRITE_MAX_CHUNK_SIZE =
      new HadoopConfigurationProperty<>(
          "fs.gs.rewrite.max.chunk.size",
          512 * 1024 * 1024L);

  /** Configuration key for marker file pattern. Default value: none */
  private static final HadoopConfigurationProperty<String> GCS_MARKER_FILE_PATTERN =
      new HadoopConfigurationProperty<>("fs.gs.marker.file.pattern");

  /**
   * Configuration key for enabling check to ensure that conflicting directories do not exist when
   * creating files and conflicting files do not exist when creating directories.
   */
  private static final HadoopConfigurationProperty<Boolean> GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE =
      new HadoopConfigurationProperty<>(
          "fs.gs.create.items.conflict.check.enable",
          true);

  /**
   * Configuration key for the minimal time interval between consecutive sync/hsync/hflush calls.
   */
  private static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL =
      new HadoopConfigurationProperty<>(
          "fs.gs.outputstream.sync.min.interval",
          0L);

  /**
   * If true, recursive delete on a path that refers to a GCS bucket itself ('/' for any
   * bucket-rooted GoogleHadoopFileSystem) or delete on that path when it's empty will result in
   * fully deleting the GCS bucket. If false, any operation that normally would have deleted the
   * bucket will be ignored instead. Setting to 'false' preserves the typical behavior of "rm -rf /"
   * which translates to deleting everything inside of root, but without clobbering the filesystem
   * authority corresponding to that root path in the process.
   */
  static final HadoopConfigurationProperty<Boolean> GCE_BUCKET_DELETE_ENABLE =
          new HadoopConfigurationProperty<>(
                  "fs.gs.bucket.delete.enable",
                  false);

  private final String workingDirectory;
  private final String projectId;
  private final Configuration config;
  private Pattern fileMarkerFilePattern;

  int getOutStreamBufferSize() {
    return outStreamBufferSize;
  }

  private final int outStreamBufferSize;

  GoogleHadoopFileSystemConfiguration(Configuration conf) {
    this.workingDirectory = GCS_WORKING_DIRECTORY.get(conf, conf::get);
    this.outStreamBufferSize =
        toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(conf, conf::getLongBytes));
    this.projectId = GCS_PROJECT_ID.get(conf, conf::get);
    this.config = conf;
  }

  String getWorkingDirectory() {
    return this.workingDirectory;
  }

  String getProjectId() {
    return this.projectId;
  }

  long getMaxListItemsPerCall() {
    return 5000L; //TODO: Make this configurable
  }

  Fadvise getFadvise() {
    return GCS_INPUT_STREAM_FADVISE.get(config, config::getEnum);
  }

  long getInplaceSeekLimit() {
    return GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(config, config::getLongBytes);
  }

  int getFadviseRequestTrackCount() {
    return GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt);
  }

  boolean isGzipEncodingSupportEnabled() {
    return GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.get(config, config::getBoolean);
  }

  long getMinRangeRequestSize() {
    return GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(config, config::getLongBytes);
  }

  long getBlockSize() {
    return BLOCK_SIZE.get(config, config::getLong);
  }

  boolean isReadExactRequestedBytesEnabled() {
    return false; //TODO: Remove this option?
  }

  long getMaxRewriteChunkSize() {
    return GCS_REWRITE_MAX_CHUNK_SIZE.get(config, config::getLong);
  }

  Pattern getMarkerFilePattern() {
    String pattern =  GCS_MARKER_FILE_PATTERN.get(config, config::get);
    if (pattern == null) {
      return null;
    }

    if (fileMarkerFilePattern == null) {
      // Caching the pattern since compile step can be expensive
      fileMarkerFilePattern =  Pattern.compile("^(.+/)?" + pattern + "$");
    }

    return fileMarkerFilePattern;
  }

  boolean isEnsureNoConflictingItems() {
    return GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE.get(config, config::getBoolean);
  }

  Duration getMinSyncInterval() {
    return GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL.getTimeDuration(config);
  }

  Configuration getConfig() {
    return config;
  }

  boolean isBucketDeleteEnabled() {
    return GCE_BUCKET_DELETE_ENABLE.get(config, config::getBoolean);
  }
}