BucketTool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.tools;

import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.BucketType;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DataRedundancy;
import software.amazon.awssdk.services.s3.model.LocationInfo;
import software.amazon.awssdk.services.s3.model.LocationType;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ExitUtil;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.PRODUCT_NAME;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Invoker.once;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.hasS3ExpressSuffix;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_NOT_ACCEPTABLE;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE;

/**
 * Bucket operations, e.g. create/delete/probe.
 */
public final class BucketTool extends S3GuardTool {

  private static final Logger LOG = LoggerFactory.getLogger(BucketTool.class);

  /**
   * Name of this tool: {@value}.
   */
  public static final String NAME = "bucket";

  /**
   * Purpose of this tool: {@value}.
   */
  public static final String PURPOSE =
      "View and manipulate S3 buckets";

  /**
   * create command.
   */
  public static final String CREATE = "create";

  /**
   * region {@value}.
   */
  public static final String OPT_REGION = "region";

  /**
   * endpoint {@value}.
   */
  public static final String OPT_ENDPOINT = "endpoint";

  /**
   * Zone for a store.
   */
  public static final String OPT_ZONE = "zone";

  /**
   * Error message if -zone is set but the name doesn't match.
   * Value {@value}.
   */
  static final String UNSUPPORTED_ZONE_ARG =
      "The -zone option is only supported for " + PRODUCT_NAME;

  /**
   * Error message if the bucket is S3 Express but -zone wasn't set.
   * Value {@value}.
   */
  static final String NO_ZONE_SUPPLIED = "Required option -zone missing for "
      + PRODUCT_NAME + " bucket";

  /**
   * Error Message logged/thrown when the tool could not start as
   * the bucket probe was not disabled and the probe (inevitably)
   * failed.
   */
  public static final String PROBE_FAILURE =
      "Initialization failed because the bucket existence probe"
          + S3A_BUCKET_PROBE + " was not disabled. Check core-site settings.";

  public BucketTool(final Configuration conf) {
    super(conf, 1, 1,
        CREATE);
    CommandFormat format = getCommandFormat();
    format.addOptionWithValue(OPT_REGION);
    format.addOptionWithValue(OPT_ENDPOINT);
    format.addOptionWithValue(OPT_ZONE);
  }

  public String getUsage() {
    return "bucket "
        + "-" + CREATE + " "
        + "[-" + OPT_ENDPOINT + " <endpoint>] "
        + "[-" + OPT_REGION + " <region>] "
        + "[-" + OPT_ZONE + " <zone>] "
        + " <s3a-URL>";
  }

  public String getName() {
    return NAME;
  }

  private Optional<String> getOptionalString(String key) {
    String value = getCommandFormat().getOptValue(key);
    return isNotEmpty(value) ? Optional.of(value) : Optional.empty();
  }

  @VisibleForTesting
  int exec(final String...args) throws Exception {
    return run(args, System.out);
  }

  @Override
  public int run(final String[] args, final PrintStream out)
      throws Exception, ExitUtil.ExitException {

    LOG.debug("Supplied arguments: {}", String.join(", ", args));
    final List<String> parsedArgs = parseArgsWithErrorReporting(args);

    CommandFormat command = getCommandFormat();
    boolean create = command.getOpt(CREATE);

    Optional<String> endpoint = getOptionalString(OPT_ENDPOINT);
    Optional<String> region = getOptionalString(OPT_REGION);
    Optional<String> zone = getOptionalString(OPT_ZONE);


    final String bucketPath = parsedArgs.get(0);
    final Path source = new Path(bucketPath);
    URI fsURI = source.toUri();
    String bucket = fsURI.getHost();

    println(out, "Filesystem %s", fsURI);
    if (!"s3a".equals(fsURI.getScheme())) {
      throw new ExitUtil.ExitException(EXIT_USAGE, "Filesystem is not S3A URL: " + fsURI);
    }
    println(out, "Options region=%s endpoint=%s zone=%s s3a://%s",
        region.orElse("(unset)"),
        endpoint.orElse("(unset)"),
        zone.orElse("(unset)"),
        bucket);
    if (!create) {
      errorln(getUsage());
      println(out, "Supplied arguments: ["
          + String.join(", ", parsedArgs)
          + "]");
      throw new ExitUtil.ExitException(EXIT_USAGE,
          "required option not found: -create");
    }

    final Configuration conf = getConf();

    removeBucketOverrides(bucket, conf,
        S3A_BUCKET_PROBE,
        REJECT_OUT_OF_SPAN_OPERATIONS,
        AWS_REGION,
        ENDPOINT);
    // stop any S3 calls taking place against a bucket which
    // may not exist
    String bucketPrefix = "fs.s3a.bucket." + bucket + '.';
    conf.setInt(bucketPrefix + S3A_BUCKET_PROBE, 0);
    conf.setBoolean(bucketPrefix + REJECT_OUT_OF_SPAN_OPERATIONS, false);
    // propagate an option
    BiFunction<String, Optional<String>, Boolean> propagate = (key, opt) ->
        opt.map(v -> {
          conf.set(key, v);
          LOG.info("{} = {}", key, v);
          return true;
        }).orElse(false);


    propagate.apply(AWS_REGION, region);
    propagate.apply(ENDPOINT, endpoint);

    // fail fast on third party store
    if (hasS3ExpressSuffix(bucket) && !isAwsEndpoint(endpoint.orElse(""))) {
      throw new ExitUtil.ExitException(EXIT_NOT_ACCEPTABLE, UNSUPPORTED_ZONE_ARG);
    }
    S3AFileSystem fs;
    try {
      fs = (S3AFileSystem) FileSystem.newInstance(fsURI, conf);
    } catch (FileNotFoundException e) {
      // this happens if somehow the probe wasn't disabled.
      errorln(PROBE_FAILURE);
      throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION, PROBE_FAILURE);
    }

    try {

      // now build the configuration
      final CreateBucketConfiguration.Builder builder = CreateBucketConfiguration.builder();

      if (fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE)) {
        //  S3 Express store requires a zone and some other other settings
        final String az = zone.orElseThrow(() ->
            new ExitUtil.ExitException(EXIT_USAGE, NO_ZONE_SUPPLIED + bucket));
        builder.location(LocationInfo.builder()
                .type(LocationType.AVAILABILITY_ZONE).name(az).build())
            .bucket(software.amazon.awssdk.services.s3.model.BucketInfo.builder()
                .type(BucketType.DIRECTORY)
                .dataRedundancy(DataRedundancy.SINGLE_AVAILABILITY_ZONE).build());

      } else {
        if (zone.isPresent()) {
          throw new ExitUtil.ExitException(EXIT_USAGE, UNSUPPORTED_ZONE_ARG + " not " + bucket);
        }
        region.ifPresent(builder::locationConstraint);
      }

      final CreateBucketRequest request = CreateBucketRequest.builder()
          .bucket(bucket)
          .createBucketConfiguration(builder.build())
          .build();

      println(out, "Creating bucket %s", bucket);

      final S3Client s3Client = fs.getS3AInternals().getAmazonS3Client(NAME);
      try (DurationInfo ignored = new DurationInfo(LOG,
          "Create %sbucket %s in region %s",
          (fs.hasPathCapability(new Path("/"),
              STORE_CAPABILITY_S3_EXPRESS_STORAGE) ? (PRODUCT_NAME + " "): ""),
          bucket, region.orElse("(unset)"))) {
        once("create", source.toString(), () ->
            s3Client.createBucket(request));
      }
    } finally {
      IOUtils.closeStream(fs);
    }

    return 0;
  }

  /**
   * Remove any values from a bucket.
   * @param bucket bucket whose overrides are to be removed. Can be null/empty
   * @param conf config
   * @param options list of fs.s3a options to remove
   */
  public static void removeBucketOverrides(final String bucket,
      final Configuration conf,
      final String... options) {

    if (StringUtils.isEmpty(bucket)) {
      return;
    }
    final String bucketPrefix = "fs.s3a.bucket." + bucket + '.';
    for (String option : options) {
      final String stripped = option.substring("fs.s3a.".length());
      String target = bucketPrefix + stripped;
      String v = conf.get(target);
      if (v != null) {
        conf.unset(target);
      }
      String extended = bucketPrefix + option;
      if (conf.get(extended) != null) {
        conf.unset(extended);
      }
    }
  }
}