GoogleHadoopFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.gs;

import static org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.gs.Constants.GCS_CONFIG_PREFIX;
import static org.apache.hadoop.fs.gs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;

import com.google.auth.oauth2.GoogleCredentials;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.thirdparty.com.google.common.base.Ascii;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.DirectoryNotEmptyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;

import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * GoogleHadoopFileSystem is rooted in a single bucket at initialization time; in this case, Hadoop
 * paths no longer correspond directly to general GCS paths, and all Hadoop operations going through
 * this FileSystem will never touch any GCS bucket other than the bucket on which this FileSystem is
 * rooted.
 *
 * <p>This implementation sacrifices a small amount of cross-bucket interoperability in favor of
 * more straightforward FileSystem semantics and compatibility with existing Hadoop applications. In
 * particular, it is not subject to bucket-naming constraints, and files are allowed to be placed in
 * root.
 */
public class GoogleHadoopFileSystem extends FileSystem implements IOStatisticsSource {

  public static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFileSystem.class);

  /**
   * URI scheme for GoogleHadoopFileSystem.
   */
  private static final String SCHEME = Constants.SCHEME;

  /**
   * Default value of replication factor.
   */
  static final short REPLICATION_FACTOR_DEFAULT = 3;

  // TODO: Take this from config
  private static final int PERMISSIONS_TO_REPORT = 700;

  /**
   * The URI the File System is passed in initialize.
   */
  private URI initUri;

  /**
   * Default block size. Note that this is the size that is reported to Hadoop FS clients. It does
   * not modify the actual block size of an underlying GCS object, because GCS JSON API does not
   * allow modifying or querying the value. Modifying this value allows one to control how many
   * mappers are used to process a given file.
   */
  private final long defaultBlockSize = GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.getDefault();

  // The bucket the file system is rooted in used for default values of:
  // -- working directory
  // -- user home directories (only for Hadoop purposes).
  private Path fsRoot;

  /**
   * Current working directory; overridden in initialize() if {@link
   * GoogleHadoopFileSystemConfiguration#GCS_WORKING_DIRECTORY} is set.
   */
  private Path workingDirectory;
  private GoogleCloudStorageFileSystem gcsFs;
  private boolean isClosed;
  private FsPermission reportedPermissions;

  /**
   * Setting this to static inorder to have a singleton instance. This will help us get the JVM
   * level metrics. Note that we use this to generate Global Storage Statistics. If we make this
   * an instance field, only the first filesystem instance metrics will be updated since while
   * initializing GlobalStorageStatistics (refer initialize()) only the first instance will be
   * registered.
   *
   * For filesystem instance level instrumentation, one more per instance object can be created
   * and both be updated.
   */
  private static GcsInstrumentation instrumentation = new GcsInstrumentation();
  private GcsStorageStatistics storageStatistics;


  GoogleHadoopFileSystemConfiguration getFileSystemConfiguration() {
    return fileSystemConfiguration;
  }

  private GoogleHadoopFileSystemConfiguration fileSystemConfiguration;

  @Override
  public void initialize(final URI path, Configuration config) throws IOException {
    LOG.trace("initialize(path: {}, config: {})", path, config);

    checkArgument(path != null, "path must not be null");
    checkArgument(config != null, "config must not be null");
    checkArgument(path.getScheme() != null, "scheme of path must not be null");
    checkArgument(path.getScheme().equals(getScheme()), "URI scheme not supported: {}", path);

    config =
        ProviderUtils.excludeIncompatibleCredentialProviders(config, GoogleHadoopFileSystem.class);
    super.initialize(path, config);

    initUri = path;

    // Set this configuration as the default config for this instance; configure()
    // will perform some file-system-specific adjustments, but the original should
    // be sufficient (and is required) for the delegation token binding initialization.
    setConf(config);

    storageStatistics = createStorageStatistics(
            requireNonNull(getIOStatistics()));

    this.reportedPermissions = new FsPermission(PERMISSIONS_TO_REPORT);

    initializeFsRoot();

    this.fileSystemConfiguration = new GoogleHadoopFileSystemConfiguration(config);
    initializeWorkingDirectory(fileSystemConfiguration);
    initializeGcsFs(fileSystemConfiguration);
  }

  private static GcsStorageStatistics createStorageStatistics(
          final IOStatistics ioStatistics) {
    return (GcsStorageStatistics) GlobalStorageStatistics.INSTANCE
            .put(GcsStorageStatistics.NAME, () -> new GcsStorageStatistics(ioStatistics));
  }

  private void initializeFsRoot() {
    String rootBucket = initUri.getAuthority();
    checkArgument(rootBucket != null, "No bucket specified in GCS URI: {}", initUri);
    // Validate root bucket name
    URI rootUri = UriPaths.fromStringPathComponents(rootBucket, /* objectName= */
        null, /* allowEmptyObjectName= */ true);
    fsRoot = new Path(rootUri);
    LOG.trace("Configured FS root: '{}'", fsRoot);
  }

  private void initializeWorkingDirectory(final GoogleHadoopFileSystemConfiguration config) {
    String configWorkingDirectory = config.getWorkingDirectory();
    if (isNullOrEmpty(configWorkingDirectory)) {
      LOG.warn("No working directory configured, using default: '{}'", workingDirectory);
    }
    // Use the public method to ensure proper behavior of normalizing and resolving the new
    // working directory relative to the initial filesystem-root directory.
    setWorkingDirectory(
        isNullOrEmpty(configWorkingDirectory) ? fsRoot : new Path(configWorkingDirectory));
    LOG.trace("Configured working directory: {} = {}", GCS_WORKING_DIRECTORY.getKey(),
        getWorkingDirectory());
  }

  private synchronized void initializeGcsFs(final GoogleHadoopFileSystemConfiguration config)
      throws IOException {
    this.gcsFs = createGcsFs(config);
  }

  private GoogleCloudStorageFileSystem createGcsFs(final GoogleHadoopFileSystemConfiguration config)
      throws IOException {
    GoogleCredentials credentials = getCredentials(config);
    return new GoogleCloudStorageFileSystem(config, credentials);
  }

  private GoogleCredentials getCredentials(GoogleHadoopFileSystemConfiguration config)
      throws IOException {
    return getCredentials(config, GCS_CONFIG_PREFIX);
  }

  static GoogleCredentials getCredentials(GoogleHadoopFileSystemConfiguration config,
      String... keyPrefixesVararg) throws IOException {
    return HadoopCredentialsConfiguration.getCredentials(config.getConfig(), keyPrefixesVararg);
  }

  @Override
  protected void checkPath(final Path path) {
    LOG.trace("checkPath(path: {})", path);
    // Validate scheme
    URI uri = path.toUri();

    String scheme = uri.getScheme();
    if (scheme != null && !scheme.equalsIgnoreCase(getScheme())) {
      throw new IllegalArgumentException(
          String.format("Wrong scheme: %s, in path: %s, expected scheme: %s", scheme, path,
              getScheme()));
    }

    String bucket = uri.getAuthority();
    String rootBucket = fsRoot.toUri().getAuthority();

    // Bucket-less URIs will be qualified later
    if (bucket == null || bucket.equals(rootBucket)) {
      return;
    }

    throw new IllegalArgumentException(
        String.format("Wrong bucket: %s, in path: %s, expected bucket: %s", bucket, path,
            rootBucket));
  }

  /**
   * Validates that GCS path belongs to this file system. The bucket must match the root bucket
   * provided at initialization time.
   */
  Path getHadoopPath(final URI gcsPath) {
    LOG.trace("getHadoopPath(gcsPath: {})", gcsPath);

    // Handle root. Delegate to getGcsPath on "gs:/" to resolve the appropriate gs://<bucket> URI.
    if (gcsPath.equals(getGcsPath(fsRoot))) {
      return fsRoot;
    }

    StorageResourceId resourceId = StorageResourceId.fromUriPath(gcsPath, true);

    checkArgument(!resourceId.isRoot(), "Missing authority in gcsPath '{}'", gcsPath);
    String rootBucket = fsRoot.toUri().getAuthority();
    checkArgument(resourceId.getBucketName().equals(rootBucket),
        "Authority of URI '{}' doesn't match root bucket '{}'", resourceId.getBucketName(),
        rootBucket);

    Path hadoopPath = new Path(fsRoot,
        new Path(/* schema= */ null, /* authority= */ null, resourceId.getObjectName()));
    LOG.trace("getHadoopPath(gcsPath: {}): {}", gcsPath, hadoopPath);
    return hadoopPath;
  }

  /**
   * Translates a "gs:/" style hadoopPath (or relative path which is not fully-qualified) into the
   * appropriate GCS path which is compatible with the underlying GcsFs.
   */
  URI getGcsPath(final Path hadoopPath) {
    LOG.trace("getGcsPath(hadoopPath: {})", hadoopPath);

    // Convert to fully qualified absolute path; the Path object will call back to get our current
    // workingDirectory as part of fully resolving the path.
    Path resolvedPath = makeQualified(hadoopPath);

    String objectName = resolvedPath.toUri().getPath();
    if (objectName != null && resolvedPath.isAbsolute()) {
      // Strip off leading '/' because GoogleCloudStorageFileSystem.getPath appends it explicitly
      // between bucket and objectName.
      objectName = objectName.substring(1);
    }

    // Construct GCS path URI
    String rootBucket = fsRoot.toUri().getAuthority();
    URI gcsPath =
        UriPaths.fromStringPathComponents(rootBucket, objectName, /* allowEmptyObjectName= */ true);
    LOG.trace("getGcsPath(hadoopPath: {}): {}", hadoopPath, gcsPath);
    return gcsPath;
  }

  @Override
  public String getScheme() {
    return SCHEME;
  }

  @Override
  public FSDataInputStream open(final Path hadoopPath, final int bufferSize) throws IOException {
    return runOperation(
      GcsStatistics.INVOCATION_OPEN,
      () -> {
        LOG.trace("open({})", hadoopPath);
        URI gcsPath = getGcsPath(hadoopPath);
        return new FSDataInputStream(GoogleHadoopFSInputStream.create(this, gcsPath, statistics));
      },
      String.format("open(%s)", hadoopPath));
  }

  @Override
  public FSDataOutputStream create(
          Path hadoopPath, FsPermission permission, boolean overwrite, int bufferSize,
          short replication, long blockSize, Progressable progress) throws IOException {
    return runOperation(
      GcsStatistics.INVOCATION_CREATE,
      () -> {
        checkArgument(hadoopPath != null, "hadoopPath must not be null");
        checkArgument(replication > 0, "replication must be a positive integer: %s", replication);
        checkArgument(blockSize > 0, "blockSize must be a positive integer: %s", blockSize);

        checkOpen();

        LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} [ignored])", hadoopPath,
                overwrite, bufferSize);

        CreateFileOptions.WriteMode writeMode = overwrite ?
                CreateFileOptions.WriteMode.OVERWRITE : CreateFileOptions.WriteMode.CREATE_NEW;

        CreateFileOptions fileOptions = CreateFileOptions.builder().setWriteMode(writeMode).build();
        return new FSDataOutputStream(new GoogleHadoopOutputStream(
                this, getGcsPath(hadoopPath), fileOptions, statistics), statistics);
      },
      String.format("create(%s, %s)", hadoopPath, overwrite));
  }

  @Override
  public FSDataOutputStream createNonRecursive(
      Path hadoopPath,
      FsPermission permission,
      EnumSet<CreateFlag> flags,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress)
      throws IOException {
    URI gcsPath = getGcsPath(checkNotNull(hadoopPath, "hadoopPath must not be null"));
    URI parentGcsPath = UriPaths.getParentPath(gcsPath);
    if (!getGcsFs().getFileInfo(parentGcsPath).exists()) {
      throw new FileNotFoundException(
          String.format(
              "Can not create '%s' file, because parent folder does not exist: %s",
              gcsPath, parentGcsPath));
    }

    return create(
        hadoopPath,
        permission,
        flags.contains(CreateFlag.OVERWRITE),
        bufferSize,
        replication,
        blockSize,
        progress);
  }

  /**
   * Appends to an existing file (optional operation). Not supported.
   *
   * @param hadoopPath The existing file to be appended.
   * @param bufferSize The size of the buffer to be used.
   * @param progress For reporting progress if it is not null.
   * @return A writable stream.
   * @throws IOException if an error occurs.
   */
  @Override
  public FSDataOutputStream append(Path hadoopPath, int bufferSize, Progressable progress)
      throws IOException {
    Preconditions.checkArgument(hadoopPath != null, "hadoopPath must not be null");
    LOG.trace("append(hadoopPath: {}, bufferSize: {} [ignored])", hadoopPath, bufferSize);
    URI filePath = getGcsPath(hadoopPath);
    return new FSDataOutputStream(
        new GoogleHadoopOutputStream(
            this,
            filePath,
            CreateFileOptions.builder()
                .setWriteMode(CreateFileOptions.WriteMode.APPEND)
                .build(),
            statistics),
        statistics);
  }

  /**
   * Concat existing files into one file.
   *
   * @param tgt the path to the target destination.
   * @param srcs the paths to the sources to use for the concatenation.
   * @throws IOException IO failure
   */
  @Override
  public void concat(Path tgt, Path[] srcs) throws IOException {
    LOG.trace("concat(tgt: {}, srcs.length: {})", tgt, srcs.length);

    Preconditions.checkArgument(srcs.length > 0, "srcs must have at least one source");

    URI tgtPath = getGcsPath(tgt);
    List<URI> srcPaths = Arrays.stream(srcs).map(this::getGcsPath).collect(toImmutableList());

    Preconditions.checkArgument(
        !srcPaths.contains(tgtPath),
        "target must not be contained in sources");

    List<List<URI>> partitions =
        Lists.partition(srcPaths, Constants.MAX_COMPOSE_OBJECTS - 1);
    LOG.trace("concat(tgt: {}, {} partitions: {})", tgt, partitions.size(), partitions);
    for (List<URI> partition : partitions) {
      // We need to include the target in the list of sources to compose since
      // the GCS FS compose operation will overwrite the target, whereas the Hadoop
      // concat operation appends to the target.
      List<URI> sources = Lists.newArrayList(tgtPath);
      sources.addAll(partition);
      getGcsFs().compose(sources, tgtPath, CreateFileOptions.DEFAULT.getContentType());
    }
  }

  @Override
  public boolean rename(final Path src, final Path dst) throws IOException {
    return runOperation(GcsStatistics.INVOCATION_RENAME,
      () -> {
        LOG.trace("rename({}, {})", src, dst);

        checkArgument(src != null, "src must not be null");
        checkArgument(dst != null, "dst must not be null");

        // Even though the underlying GCSFS will also throw an IAE if src is root, since our
        // filesystem root happens to equal the global root, we want to explicitly check it
        // here since derived classes may not have filesystem roots equal to the global root.
        if (this.makeQualified(src).equals(fsRoot)) {
          LOG.trace("rename(src: {}, dst: {}): false [src is a root]", src, dst);
          return false;
        }

        try {
          checkOpen();

          URI srcPath = getGcsPath(src);
          URI dstPath = getGcsPath(dst);
          getGcsFs().rename(srcPath, dstPath);

          LOG.trace("rename(src: {}, dst: {}): true", src, dst);
        } catch (IOException e) {
          if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
            throw e;
          }
          LOG.trace("rename(src: {}, dst: {}): false [failed]", src, dst, e);
          return false;
        }

        return true;
      },
      String.format("rename(%s, %s)", src, dst));
  }

  @Override
  public boolean delete(final Path hadoopPath, final boolean recursive) throws IOException {
    return runOperation(GcsStatistics.INVOCATION_DELETE,
      () -> {
        LOG.trace("delete({}, {})", hadoopPath, recursive);
        checkArgument(hadoopPath != null, "hadoopPath must not be null");

        checkOpen();

        URI gcsPath = getGcsPath(hadoopPath);
        try {
          getGcsFs().delete(gcsPath, recursive);
        } catch (DirectoryNotEmptyException e) {
          throw e;
        } catch (IOException e) {
          if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
            throw e;
          }

          LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]",
                  hadoopPath, recursive, e);
          return false;
        }

        LOG.trace("delete(hadoopPath: {}, recursive: {}): true",
                hadoopPath, recursive);
        return true;
      },
      String.format("delete(%s,%s", hadoopPath, recursive));
  }

  private <B> B runOperation(GcsStatistics stat, CallableRaisingIOE<B> operation, String context)
          throws IOException {
    LOG.trace("{}({})", stat, context);
    return trackDuration(instrumentation.getIOStatistics(), stat.getSymbol(), operation);
  }

  @Override
  public FileStatus[] listStatus(final Path hadoopPath) throws IOException {
    return runOperation(
      GcsStatistics.INVOCATION_LIST_STATUS,
      () -> {
        checkArgument(hadoopPath != null, "hadoopPath must not be null");

        checkOpen();

        LOG.trace("listStatus(hadoopPath: {})", hadoopPath);

        URI gcsPath = getGcsPath(hadoopPath);
        List<FileStatus> status;

        try {
          List<FileInfo> fileInfos = getGcsFs().listDirectory(gcsPath);
          status = new ArrayList<>(fileInfos.size());
          String userName = getUgiUserName();
          for (FileInfo fileInfo : fileInfos) {
            status.add(getFileStatus(fileInfo, userName));
          }
        } catch (FileNotFoundException fnfe) {
          throw (FileNotFoundException)
                  new FileNotFoundException(
                          String.format(
                                  "listStatus(hadoopPath: %s): '%s' does not exist.",
                                  hadoopPath, gcsPath))
                          .initCause(fnfe);
        }

        return status.toArray(new FileStatus[0]);
      },
      String.format("listStatus(%s", hadoopPath));
  }

  /**
   * Overridden to make root its own parent. This is POSIX compliant, but more importantly guards
   * against poor directory accounting in the PathData class of Hadoop 2's FsShell.
   */
  @Override
  public Path makeQualified(final Path path) {
    Path qualifiedPath = super.makeQualified(path);

    URI uri = qualifiedPath.toUri();

    checkState("".equals(uri.getPath()) || qualifiedPath.isAbsolute(),
        "Path '{}' must be fully qualified.", qualifiedPath);

    Path result;
    String upath = uri.getPath();

    // Strip initial '..'s to make root is its own parent.
    int i = 0;
    while (upath.startsWith("/../", i)) {
      // Leave a preceding slash, so path is still absolute.
      i += 3;
    }
    if (i == upath.length() || upath.substring(i).equals("/..")) {
      // Allow a Path of gs://someBucket to map to gs://someBucket/
      result = new Path(uri.getScheme(), uri.getAuthority(), "/");
    } else if (i == 0) {
      result = qualifiedPath;
    } else {
      result = new Path(uri.getScheme(), uri.getAuthority(), upath.substring(i));
    }

    LOG.trace("makeQualified(path: {}): {}", path, result);
    return result;
  }

  /**
   * Returns a URI of the root of this FileSystem.
   */
  @Override
  public URI getUri() {
    return fsRoot.toUri();
  }

  /**
   * The default port is listed as -1 as an indication that ports are not used.
   */
  @Override
  protected int getDefaultPort() {
    int result = -1;
    LOG.trace("getDefaultPort(): {}", result);
    return result;
  }

  @Override
  public boolean hasPathCapability(final Path path, final String capability) {
    checkNotNull(path, "path must not be null");
    checkArgument(!isNullOrEmpty(capability), "capability must not be null or empty string for {}",
        path);
    switch (Ascii.toLowerCase(capability)) {
    case CommonPathCapabilities.FS_APPEND:
    case CommonPathCapabilities.FS_CONCAT:
      return true;
    default:
      return false;
    }
  }

  /**
   * Gets the current working directory.
   *
   * @return The current working directory.
   */
  @Override
  public Path getWorkingDirectory() {
    LOG.trace("getWorkingDirectory(): {}", workingDirectory);
    return workingDirectory;
  }

  @Override
  public boolean mkdirs(final Path hadoopPath, final FsPermission permission) throws IOException {
    return runOperation(
      GcsStatistics.INVOCATION_MKDIRS,
      () -> {
        checkArgument(hadoopPath != null, "hadoopPath must not be null");

        LOG.trace("mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, permission);

        checkOpen();

        URI gcsPath = getGcsPath(hadoopPath);
        try {
          getGcsFs().mkdirs(gcsPath);
        } catch (java.nio.file.FileAlreadyExistsException faee) {
          // Need to convert to the Hadoop flavor of FileAlreadyExistsException.
          throw (FileAlreadyExistsException)
                  new FileAlreadyExistsException(
                          String.format(
                                  "mkdirs(hadoopPath: %s, permission: %s): failed",
                                  hadoopPath, permission))
                          .initCause(faee);
        }

        return true;
      },
      String.format("mkdirs(%s)", hadoopPath));
  }

  @Override
  public FileStatus getFileStatus(final Path path) throws IOException {
    return runOperation(
      GcsStatistics.INVOCATION_GET_FILE_STATUS,
      () -> {
        checkArgument(path != null, "path must not be null");

        checkOpen();

        URI gcsPath = getGcsPath(path);
        FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
        if (!fileInfo.exists()) {
          throw new FileNotFoundException(
                  String.format(
                          "%s not found: %s", fileInfo.isDirectory() ? "Directory" : "File", path));
        }
        String userName = getUgiUserName();
        return getFileStatus(fileInfo, userName);
      },
      String.format("getFileStatus(%s)", path));
  }

  /**
   * Returns home directory of the current user.
   *
   * <p>Note: This directory is only used for Hadoop purposes. It is not the same as a user's OS
   * home directory.
   */
  @Override
  public Path getHomeDirectory() {
    Path result = new Path(fsRoot, "user/" + System.getProperty("user.name"));
    LOG.trace("getHomeDirectory(): {}", result);
    return result;
  }

  /**
   * {@inheritDoc}
   *
   * <p>Returns the service if delegation tokens are configured, otherwise, null.
   */
  @Override
  public String getCanonicalServiceName() {
    // TODO: Add delegation token support
    return null;
  }

  /**
   * Gets GCS FS instance.
   */
  GoogleCloudStorageFileSystem getGcsFs() {
    return gcsFs;
  }

  /**
   * Assert that the FileSystem has been initialized and not close()d.
   */
  private void checkOpen() throws IOException {
    if (isClosed) {
      throw new IOException("GoogleHadoopFileSystem has been closed or not initialized.");
    }
  }

  @Override
  public void close() throws IOException {
    LOG.trace("close()");
    if (isClosed) {
      return;
    }

    super.close();

    getGcsFs().close();

    this.isClosed = true;
  }

  @Override
  public long getUsed() throws IOException {
    long result = super.getUsed();
    LOG.trace("getUsed(): {}", result);
    return result;
  }

  @Override
  public void setWorkingDirectory(final Path hadoopPath) {
    checkArgument(hadoopPath != null, "hadoopPath must not be null");
    URI gcsPath = UriPaths.toDirectory(getGcsPath(hadoopPath));
    workingDirectory = getHadoopPath(gcsPath);
    LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, workingDirectory);
  }

  /**
   * Get the instrumentation's IOStatistics.
   * @return statistics
   */
  @Override
  public IOStatistics getIOStatistics() {
    return instrumentation != null
            ? instrumentation.getIOStatistics()
            : null;
  }

  /**
   * Get the storage statistics of this filesystem.
   * @return the storage statistics
   */
  @Override
  public GcsStorageStatistics getStorageStatistics() {
    return this.storageStatistics;
  }

  private static String getUgiUserName() throws IOException {
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    return ugi.getShortUserName();
  }

  private FileStatus getFileStatus(FileInfo fileInfo, String userName) {
    checkNotNull(fileInfo, "fileInfo should not be null");
    // GCS does not provide modification time. It only provides creation time.
    // It works for objects because they are immutable once created.
    FileStatus status = new FileStatus(
        fileInfo.getSize(),
        fileInfo.isDirectory(),
        REPLICATION_FACTOR_DEFAULT,
        defaultBlockSize,
        fileInfo.getModificationTime(),
        fileInfo.getModificationTime(),
        reportedPermissions,
        userName,
        userName,
        getHadoopPath(fileInfo.getPath()));
    LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), userName, status);
    return status;
  }
}