MagicCommitIntegration.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;

import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;

/**
 * Adds the code needed for S3A to support magic committers.
 * It's pulled out to keep S3A FS class slightly less complex.
 * This class can be instantiated even when magic commit is disabled;
 * in this case:
 * <ol>
 *   <li>{@link #isMagicCommitPath(Path)} will always return false.</li>
 *   <li>{@link #isUnderMagicPath(Path)} will always return false.</li>
 *   <li>{@link #createTracker(Path, String, PutTrackerStatistics)} will always
 *   return an instance of {@link PutTracker}.</li>
 * </ol>
 *
 * <p>Important</p>: must not directly or indirectly import a class which
 * uses any datatype in hadoop-mapreduce.
 */
public class MagicCommitIntegration extends AbstractStoreOperation {
  private static final Logger LOG =
      LoggerFactory.getLogger(MagicCommitIntegration.class);
  private final S3AFileSystem owner;
  private final boolean magicCommitEnabled;

  /**
   * Instantiate.
   * @param owner owner class
   * @param magicCommitEnabled is magic commit enabled.
   */
  public MagicCommitIntegration(S3AFileSystem owner,
      boolean magicCommitEnabled) {
    super(owner.createStoreContext());
    this.owner = owner;
    this.magicCommitEnabled = magicCommitEnabled;
  }

  /**
   * Given an (elements, key) pair, return the key of the final destination of
   * the PUT, that is: where the final path is expected to go?
   * @param elements path split to elements
   * @param key key
   * @return key for final put. If this is not a magic commit, the
   * same as the key in.
   */
  public String keyOfFinalDestination(List<String> elements, String key) {
    if (isMagicCommitPath(elements)) {
      return elementsToKey(finalDestination(elements));
    } else {
      return key;
    }
  }

  /**
   * Given a path and a key to that same path, create a tracker for it.
   * This specific tracker will be chosen based on whether or not
   * the path is a magic one.
   * Auditing: the span used to invoke
   * this method will be the one used to create the write operation helper
   * for the commit tracker.
   * @param path path of nominal write
   * @param key key of path of nominal write
   * @param trackerStatistics tracker statistics
   * @return the tracker for this operation.
   */
  public PutTracker createTracker(Path path, String key,
      PutTrackerStatistics trackerStatistics) {
    final List<String> elements = splitPathToElements(path);
    PutTracker tracker;

    if(isMagicFile(elements)) {
      // path is of a magic file
      if (isMagicCommitPath(elements)) {
        final String destKey = keyOfFinalDestination(elements, key);
        String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
        getStoreContext().incrementStatistic(
            Statistic.COMMITTER_MAGIC_FILES_CREATED);
        if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
          tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
              key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
              trackerStatistics);
        } else {
          tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
              key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
              trackerStatistics);
        }
        LOG.debug("Created {}", tracker);
      } else {
        LOG.warn("File being created has a \"magic\" path, but the filesystem"
            + " has magic file support disabled: {}", path);
        // downgrade to standard multipart tracking
        tracker = new PutTracker(key);
      }
    } else {
      // standard multipart tracking
      tracker = new PutTracker(key);
    }
    return tracker;
  }

  /**
   * This performs the calculation of the final destination of a set
   * of elements.
   *
   * @param elements original (do not edit after this call)
   * @return a list of elements, possibly empty
   */
  private List<String> finalDestination(List<String> elements) {
    return magicCommitEnabled ?
        MagicCommitPaths.finalDestination(elements)
        : elements;
  }

  /**
   * Is magic commit enabled?
   * @return true if magic commit is turned on.
   */
  public boolean isMagicCommitEnabled() {
    return magicCommitEnabled;
  }

  /**
   * Predicate: is a path a magic commit path?
   * @param path path to examine
   * @return true if the path is or is under a magic directory
   */
  public boolean isMagicCommitPath(Path path) {
    return isMagicCommitPath(splitPathToElements(path));
  }

  /**
   * Is this path a magic commit path in this filesystem?
   * True if magic commit is enabled, the path is magic
   * and the path is not actually a commit metadata file.
   * @param elements element list
   * @return true if writing path is to be uprated to a magic file write
   */
  private boolean isMagicCommitPath(List<String> elements) {
    return magicCommitEnabled && isMagicFile(elements);
  }

  /**
   * Is the file a magic file: this predicate doesn't check
   * for the FS actually having the magic bit being set.
   * @param elements path elements
   * @return true if the path is one a magic file write expects.
   */
  private boolean isMagicFile(List<String> elements) {
    return isMagicPath(elements) &&
        !isCommitMetadataFile(elements);
  }

  /**
   * Does this file contain all the commit metadata?
   * @param elements path element list
   * @return true if this file is one of the commit metadata files.
   */
  private boolean isCommitMetadataFile(List<String> elements) {
    String last = elements.get(elements.size() - 1);
    return last.endsWith(CommitConstants.PENDING_SUFFIX)
        || last.endsWith(CommitConstants.PENDINGSET_SUFFIX);
  }

  /**
   * Is this path in/under a magic path...regardless of file type.
   * This is used to optimize create() operations.
   * @param path path to check
   * @return true if the path is one a magic file write expects.
   */
  public boolean isUnderMagicPath(Path path) {
    return magicCommitEnabled && isMagicPath(splitPathToElements(path));
  }
}