OBSListing.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.obs;

import com.obs.services.exception.ObsException;
import com.obs.services.model.ListObjectsRequest;
import com.obs.services.model.ObjectListing;
import com.obs.services.model.ObsObject;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;

/**
 * OBS listing implementation.
 */
class OBSListing {
  /**
   * A Path filter which accepts all filenames.
   */
  static final PathFilter ACCEPT_ALL =
      new PathFilter() {
        @Override
        public boolean accept(final Path file) {
          return true;
        }

        @Override
        public String toString() {
          return "ACCEPT_ALL";
        }
      };

  /**
   * Class logger.
   */
  private static final Logger LOG = LoggerFactory.getLogger(OBSListing.class);

  /**
   * OBS File System instance.
   */
  private final OBSFileSystem owner;

  OBSListing(final OBSFileSystem ownerFS) {
    this.owner = ownerFS;
  }

  /**
   * Create a FileStatus iterator against a path, with a given list object
   * request.
   *
   * @param listPath path of the listing
   * @param request  initial request to make
   * @param filter   the filter on which paths to accept
   * @param acceptor the class/predicate to decide which entries to accept in
   *                 the listing based on the full file status.
   * @return the iterator
   * @throws IOException IO Problems
   */
  FileStatusListingIterator createFileStatusListingIterator(
      final Path listPath,
      final ListObjectsRequest request,
      final PathFilter filter,
      final FileStatusAcceptor acceptor)
      throws IOException {
    return new FileStatusListingIterator(
        new ObjectListingIterator(listPath, request), filter, acceptor);
  }

  /**
   * Create a located status iterator over a file status iterator.
   *
   * @param statusIterator an iterator over the remote status entries
   * @return a new remote iterator
   */
  LocatedFileStatusIterator createLocatedFileStatusIterator(
      final RemoteIterator<FileStatus> statusIterator) {
    return new LocatedFileStatusIterator(statusIterator);
  }

  /**
   * Interface to implement by the logic deciding whether to accept a summary
   * entry or path as a valid file or directory.
   */
  interface FileStatusAcceptor {

    /**
     * Predicate to decide whether or not to accept a summary entry.
     *
     * @param keyPath qualified path to the entry
     * @param summary summary entry
     * @return true if the entry is accepted (i.e. that a status entry should be
     * generated.
     */
    boolean accept(Path keyPath, ObsObject summary);

    /**
     * Predicate to decide whether or not to accept a prefix.
     *
     * @param keyPath      qualified path to the entry
     * @param commonPrefix the prefix
     * @return true if the entry is accepted (i.e. that a status entry should be
     * generated.)
     */
    boolean accept(Path keyPath, String commonPrefix);
  }

  /**
   * A remote iterator which only iterates over a single `LocatedFileStatus`
   * value.
   *
   * <p>If the status value is null, the iterator declares that it has no
   * data. This iterator is used to handle
   * {@link OBSFileSystem#listStatus(Path)}calls where the path handed in
   * refers to a file, not a directory: this is
   * the iterator returned.
   */
  static final class SingleStatusRemoteIterator
      implements RemoteIterator<LocatedFileStatus> {

    /**
     * The status to return; set to null after the first iteration.
     */
    private LocatedFileStatus status;

    /**
     * Constructor.
     *
     * @param locatedFileStatus status value: may be null, in which case the
     *                          iterator is empty.
     */
    SingleStatusRemoteIterator(final LocatedFileStatus locatedFileStatus) {
      this.status = locatedFileStatus;
    }

    /**
     * {@inheritDoc}
     *
     * @return true if there is a file status to return: this is always false
     * for the second iteration, and may be false for the first.
     */
    @Override
    public boolean hasNext() {
      return status != null;
    }

    /**
     * {@inheritDoc}
     *
     * @return the non-null status element passed in when the instance was
     * constructed, if it ha not already been retrieved.
     * @throws NoSuchElementException if this is the second call, or it is the
     *                                first call and a null
     *                                {@link LocatedFileStatus}
     *                                entry was passed to the constructor.
     */
    @Override
    public LocatedFileStatus next() {
      if (hasNext()) {
        LocatedFileStatus s = this.status;
        status = null;
        return s;
      } else {
        throw new NoSuchElementException();
      }
    }
  }

  /**
   * Accept all entries except the base path and those which map to OBS pseudo
   * directory markers.
   */
  static class AcceptFilesOnly implements FileStatusAcceptor {
    /**
     * path to qualify.
     */
    private final Path qualifiedPath;

    AcceptFilesOnly(final Path path) {
      this.qualifiedPath = path;
    }

    /**
     * Reject a summary entry if the key path is the qualified Path, or it ends
     * with {@code "_$folder$"}.
     *
     * @param keyPath key path of the entry
     * @param summary summary entry
     * @return true if the entry is accepted (i.e. that a status entry should be
     * generated.
     */
    @Override
    public boolean accept(final Path keyPath, final ObsObject summary) {
      return !keyPath.equals(qualifiedPath)
          && !summary.getObjectKey()
          .endsWith(OBSConstants.OBS_FOLDER_SUFFIX)
          && !OBSCommonUtils.objectRepresentsDirectory(
          summary.getObjectKey(),
          summary.getMetadata().getContentLength());
    }

    /**
     * Accept no directory paths.
     *
     * @param keyPath qualified path to the entry
     * @param prefix  common prefix in listing.
     * @return false, always.
     */
    @Override
    public boolean accept(final Path keyPath, final String prefix) {
      return false;
    }
  }

  /**
   * Accept all entries except the base path and those which map to OBS pseudo
   * directory markers.
   */
  static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor {

    /**
     * Base path.
     */
    private final Path qualifiedPath;

    /**
     * Constructor.
     *
     * @param path an already-qualified path.
     */
    AcceptAllButSelfAndS3nDirs(final Path path) {
      this.qualifiedPath = path;
    }

    /**
     * Reject a summary entry if the key path is the qualified Path, or it ends
     * with {@code "_$folder$"}.
     *
     * @param keyPath key path of the entry
     * @param summary summary entry
     * @return true if the entry is accepted (i.e. that a status entry should be
     * generated.)
     */
    @Override
    public boolean accept(final Path keyPath, final ObsObject summary) {
      return !keyPath.equals(qualifiedPath) && !summary.getObjectKey()
          .endsWith(OBSConstants.OBS_FOLDER_SUFFIX);
    }

    /**
     * Accept all prefixes except the one for the base path, "self".
     *
     * @param keyPath qualified path to the entry
     * @param prefix  common prefix in listing.
     * @return true if the entry is accepted (i.e. that a status entry should be
     * generated.
     */
    @Override
    public boolean accept(final Path keyPath, final String prefix) {
      return !keyPath.equals(qualifiedPath);
    }
  }

  /**
   * Wraps up object listing into a remote iterator which will ask for more
   * listing data if needed.
   *
   * <p>This is a complex operation, especially the process to determine if
   * there are more entries remaining. If there are no more results remaining in
   * the (filtered) results of the current listing request, then another request
   * is made
   * <i>and those results filtered</i> before the iterator can declare that
   * there is more data available.
   *
   * <p>The need to filter the results precludes the iterator from simply
   * declaring that if the {@link ObjectListingIterator#hasNext()} is true then
   * there are more results. Instead the next batch of results must be retrieved
   * and filtered.
   *
   * <p>What does this mean? It means that remote requests to retrieve new
   * batches of object listings are made in the {@link #hasNext()} call; the
   * {@link #next()} call simply returns the filtered results of the last
   * listing processed. However, do note that {@link #next()} calls {@link
   * #hasNext()} during its operation. This is critical to ensure that a listing
   * obtained through a sequence of {@link #next()} will complete with the same
   * set of results as a classic {@code while(it.hasNext()} loop.
   *
   * <p>Thread safety: None.
   */
  class FileStatusListingIterator implements RemoteIterator<FileStatus> {

    /**
     * Source of objects.
     */
    private final ObjectListingIterator source;

    /**
     * Filter of paths from API call.
     */
    private final PathFilter filter;

    /**
     * Filter of entries from file status.
     */
    private final FileStatusAcceptor acceptor;

    /**
     * Request batch size.
     */
    private int batchSize;

    /**
     * Iterator over the current set of results.
     */
    private ListIterator<FileStatus> statusBatchIterator;

    /**
     * Create an iterator over file status entries.
     *
     * @param listPath           the listing iterator from a listObjects call.
     * @param pathFilter         the filter on which paths to accept
     * @param fileStatusAcceptor the class/predicate to decide which entries to
     *                           accept in the listing based on the full file
     *                           status.
     * @throws IOException IO Problems
     */
    FileStatusListingIterator(
        final ObjectListingIterator listPath, final PathFilter pathFilter,
        final FileStatusAcceptor fileStatusAcceptor)
        throws IOException {
      this.source = listPath;
      this.filter = pathFilter;
      this.acceptor = fileStatusAcceptor;
      // build the first set of results. This will not trigger any
      // remote IO, assuming the source iterator is in its initial
      // iteration
      requestNextBatch();
    }

    /**
     * Report whether or not there is new data available. If there is data in
     * the local filtered list, return true. Else: request more data util that
     * condition is met, or there is no more remote listing data.
     *
     * @return true if a call to {@link #next()} will succeed.
     * @throws IOException on any failure to request next batch
     */
    @Override
    public boolean hasNext() throws IOException {
      return statusBatchIterator.hasNext() || requestNextBatch();
    }

    @Override
    public FileStatus next() throws IOException {
      if (!hasNext()) {
        throw new NoSuchElementException();
      }
      return statusBatchIterator.next();
    }

    /**
     * Try to retrieve another batch. Note that for the initial batch, {@link
     * ObjectListingIterator} does not generate a request; it simply returns the
     * initial set.
     *
     * @return true if a new batch was created.
     * @throws IOException IO problems
     */
    private boolean requestNextBatch() throws IOException {
      // look for more object listing batches being available
      while (source.hasNext()) {
        // if available, retrieve it and build the next status
        if (buildNextStatusBatch(source.next())) {
          // this batch successfully generated entries matching
          // the filters/acceptors;
          // declare that the request was successful
          return true;
        } else {
          LOG.debug(
              "All entries in batch were filtered...continuing");
        }
      }
      // if this code is reached, it means that all remaining
      // object lists have been retrieved, and there are no new entries
      // to return.
      return false;
    }

    /**
     * Build the next status batch from a listing.
     *
     * @param objects the next object listing
     * @return true if this added any entries after filtering
     */
    private boolean buildNextStatusBatch(final ObjectListing objects) {
      // counters for debug logs
      int added = 0;
      int ignored = 0;
      // list to fill in with results. Initial size will be list maximum.
      List<FileStatus> stats =
          new ArrayList<>(
              objects.getObjects().size() + objects.getCommonPrefixes()
                  .size());
      // objects
      for (ObsObject summary : objects.getObjects()) {
        String key = summary.getObjectKey();
        Path keyPath = OBSCommonUtils.keyToQualifiedPath(owner, key);
        if (LOG.isDebugEnabled()) {
          LOG.debug("{}: {}", keyPath,
              OBSCommonUtils.stringify(summary));
        }
        // Skip over keys that are ourselves and old OBS _$folder$ files
        if (acceptor.accept(keyPath, summary) && filter.accept(
            keyPath)) {
          FileStatus status =
              OBSCommonUtils.createFileStatus(
                  keyPath, summary,
                  owner.getDefaultBlockSize(keyPath),
                  owner.getUsername());
          LOG.debug("Adding: {}", status);
          stats.add(status);
          added++;
        } else {
          LOG.debug("Ignoring: {}", keyPath);
          ignored++;
        }
      }

      // prefixes: always directories
      for (ObsObject prefix : objects.getExtenedCommonPrefixes()) {
        String key = prefix.getObjectKey();
        Path keyPath = OBSCommonUtils.keyToQualifiedPath(owner, key);
        if (acceptor.accept(keyPath, key) && filter.accept(keyPath)) {
          long lastModified =
              prefix.getMetadata().getLastModified() == null
                  ? System.currentTimeMillis()
                  : OBSCommonUtils.dateToLong(
                      prefix.getMetadata().getLastModified());
          FileStatus status = new OBSFileStatus(keyPath, lastModified,
              lastModified, owner.getUsername());
          LOG.debug("Adding directory: {}", status);
          added++;
          stats.add(status);
        } else {
          LOG.debug("Ignoring directory: {}", keyPath);
          ignored++;
        }
      }

      // finish up
      batchSize = stats.size();
      statusBatchIterator = stats.listIterator();
      boolean hasNext = statusBatchIterator.hasNext();
      LOG.debug(
          "Added {} entries; ignored {}; hasNext={}; hasMoreObjects={}",
          added,
          ignored,
          hasNext,
          objects.isTruncated());
      return hasNext;
    }

    /**
     * Get the number of entries in the current batch.
     *
     * @return a number, possibly zero.
     */
    public int getBatchSize() {
      return batchSize;
    }
  }

  /**
   * Wraps up OBS `ListObjects` requests in a remote iterator which will ask for
   * more listing data if needed.
   *
   * <p>That is:
   *
   * <p>1. The first invocation of the {@link #next()} call will return the
   * results of the first request, the one created during the construction of
   * the instance.
   *
   * <p>2. Second and later invocations will continue the ongoing listing,
   * calling {@link OBSCommonUtils#continueListObjects} to request the next
   * batch of results.
   *
   * <p>3. The {@link #hasNext()} predicate returns true for the initial call,
   * where {@link #next()} will return the initial results. It declares that it
   * has future results iff the last executed request was truncated.
   *
   * <p>Thread safety: none.
   */
  class ObjectListingIterator implements RemoteIterator<ObjectListing> {

    /**
     * The path listed.
     */
    private final Path listPath;

    /**
     * The most recent listing results.
     */
    private ObjectListing objects;

    /**
     * Indicator that this is the first listing.
     */
    private boolean firstListing = true;

    /**
     * Count of how many listings have been requested (including initial
     * result).
     */
    private int listingCount = 1;

    /**
     * Maximum keys in a request.
     */
    private int maxKeys;

    /**
     * Constructor -calls {@link OBSCommonUtils#listObjects} on the request to
     * populate the initial set of results/fail if there was a problem talking
     * to the bucket.
     *
     * @param path    path of the listing
     * @param request initial request to make
     * @throws IOException on any failure to list objects
     */
    ObjectListingIterator(final Path path,
        final ListObjectsRequest request)
        throws IOException {
      this.listPath = path;
      this.maxKeys = owner.getMaxKeys();
      this.objects = OBSCommonUtils.listObjects(owner, request);
    }

    /**
     * Declare that the iterator has data if it is either is the initial
     * iteration or it is a later one and the last listing obtained was
     * incomplete.
     */
    @Override
    public boolean hasNext() {
      return firstListing || objects.isTruncated();
    }

    /**
     * Ask for the next listing. For the first invocation, this returns the
     * initial set, with no remote IO. For later requests, OBS will be queried,
     * hence the calls may block or fail.
     *
     * @return the next object listing.
     * @throws IOException            if a query made of OBS fails.
     * @throws NoSuchElementException if there is no more data to list.
     */
    @Override
    public ObjectListing next() throws IOException {
      if (firstListing) {
        // on the first listing, don't request more data.
        // Instead just clear the firstListing flag so that it future
        // calls will request new data.
        firstListing = false;
      } else {
        try {
          if (!objects.isTruncated()) {
            // nothing more to request: fail.
            throw new NoSuchElementException(
                "No more results in listing of " + listPath);
          }
          // need to request a new set of objects.
          LOG.debug("[{}], Requesting next {} objects under {}",
              listingCount, maxKeys, listPath);
          objects = OBSCommonUtils.continueListObjects(owner,
              objects);
          listingCount++;
          LOG.debug("New listing status: {}", this);
        } catch (ObsException e) {
          throw OBSCommonUtils.translateException("listObjects()",
              listPath, e);
        }
      }
      return objects;
    }

    @Override
    public String toString() {
      return "Object listing iterator against "
          + listPath
          + "; listing count "
          + listingCount
          + "; isTruncated="
          + objects.isTruncated();
    }

  }

  /**
   * Take a remote iterator over a set of {@link FileStatus} instances and
   * return a remote iterator of {@link LocatedFileStatus} instances.
   */
  class LocatedFileStatusIterator
      implements RemoteIterator<LocatedFileStatus> {
    /**
     * File status.
     */
    private final RemoteIterator<FileStatus> statusIterator;

    /**
     * Constructor.
     *
     * @param statusRemoteIterator an iterator over the remote status entries
     */
    LocatedFileStatusIterator(
        final RemoteIterator<FileStatus> statusRemoteIterator) {
      this.statusIterator = statusRemoteIterator;
    }

    @Override
    public boolean hasNext() throws IOException {
      return statusIterator.hasNext();
    }

    @Override
    public LocatedFileStatus next() throws IOException {
      return OBSCommonUtils.toLocatedFileStatus(owner,
          statusIterator.next());
    }
  }
}