NativeAzureFileSystem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
import java.util.HashMap;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;

import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.VisibleForTesting;
import com.microsoft.azure.storage.StorageException;

/**
 * A {@link FileSystem} for reading and writing files stored on <a
 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
 * blob-based and stores files on Azure in their native form so they can be read
 * by other Azure tools.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class NativeAzureFileSystem extends FileSystem {
  private static final int USER_WX_PERMISION = 0300;
  private static final String USER_HOME_DIR_PREFIX_DEFAULT = "/user";
  /**
   * A description of a folder rename operation, including the source and
   * destination keys, and descriptions of the files in the source folder.
   */

  public static class FolderRenamePending {
    private SelfRenewingLease folderLease;
    private String srcKey;
    private String dstKey;
    private FileMetadata[] fileMetadata = null;    // descriptions of source files
    private ArrayList<String> fileStrings = null;
    private NativeAzureFileSystem fs;
    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
    private static final int FORMATTING_BUFFER = 10000;
    private boolean committed;
    public static final String SUFFIX = "-RenamePending.json";
    private static final ObjectReader READER = new ObjectMapper()
        .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
        .readerFor(JsonNode.class);

    // Prepare in-memory information needed to do or redo a folder rename.
    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
        NativeAzureFileSystem fs) throws IOException {
      this.srcKey = srcKey;
      this.dstKey = dstKey;
      this.folderLease = lease;
      this.fs = fs;

      // List all the files in the folder.
      long start = Time.monotonicNow();
      fileMetadata = fs.getStoreInterface().list(srcKey, AZURE_LIST_ALL,
          AZURE_UNBOUNDED_DEPTH);

      long end = Time.monotonicNow();
      LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));

      this.committed = true;
    }

    // Prepare in-memory information needed to do or redo folder rename from
    // a -RenamePending.json file read from storage. This constructor is to use during
    // redo processing.
    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
        throws IllegalArgumentException, IOException {

      this.fs = fs;

      // open redo file
      Path f = redoFile;
      int l;
      byte[] bytes;
      try (FSDataInputStream input = fs.open(f)) {
        bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
        l = input.read(bytes);
      }
      if (l <= 0) {
        // Jira HADOOP-12678 -Handle empty rename pending metadata file during
        // atomic rename in redo path. If during renamepending file is created
        // but not written yet, then this means that rename operation
        // has not started yet. So we should delete rename pending metadata file.
        LOG.error("Deleting empty rename pending file "
            + redoFile + " -- no data available");
        deleteRenamePendingFile(fs, redoFile);
        return;
      }
      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
        throw new IOException(
            "Error reading pending rename file contents -- "
                + "maximum file size exceeded");
      }
      String contents = new String(bytes, 0, l, StandardCharsets.UTF_8);

      // parse the JSON
      JsonNode json = null;
      try {
        json = READER.readValue(contents);
        this.committed = true;
      } catch (JsonMappingException e) {

        // The -RedoPending.json file is corrupted, so we assume it was
        // not completely written
        // and the redo operation did not commit.
        this.committed = false;
      } catch (JsonParseException e) {
        this.committed = false;
      } catch (IOException e) {
        this.committed = false;
      }
      
      if (!this.committed) {
        LOG.error("Deleting corruped rename pending file {} \n {}",
            redoFile, contents);

        // delete the -RenamePending.json file
        deleteRenamePendingFile(fs, redoFile);
        return;
      }

      // initialize this object's fields
      ArrayList<String> fileStrList = new ArrayList<String>();
      JsonNode oldFolderName = json.get("OldFolderName");
      JsonNode newFolderName = json.get("NewFolderName");
      if (oldFolderName == null || newFolderName == null) {
        this.committed = false;
      } else {
        this.srcKey = oldFolderName.textValue();
        this.dstKey = newFolderName.textValue();
        if (this.srcKey == null || this.dstKey == null) {
          this.committed = false;
        } else {
          JsonNode fileList = json.get("FileList");
          if (fileList == null) {
            this.committed = false;
          } else {
            for (int i = 0; i < fileList.size(); i++) {
              fileStrList.add(fileList.get(i).textValue());
            }
          }
        }
      }
      this.fileStrings = fileStrList;
    }

    public FileMetadata[] getFiles() {
      return fileMetadata;
    }

    public SelfRenewingLease getFolderLease() {
      return folderLease;
    }

    /**
     * Deletes rename pending metadata file
     * @param fs -- the file system
     * @param redoFile - rename pending metadata file path
     * @throws IOException - If deletion fails
     */
    @VisibleForTesting
    void deleteRenamePendingFile(FileSystem fs, Path redoFile)
        throws IOException {
      try {
        fs.delete(redoFile, false);
      } catch (IOException e) {
        // If the rename metadata was not found then somebody probably
        // raced with us and finished the delete first
        Throwable t = e.getCause();
        if (t != null && t instanceof StorageException
            && "BlobNotFound".equals(((StorageException) t).getErrorCode())) {
          LOG.warn("rename pending file " + redoFile + " is already deleted");
        } else {
          throw e;
        }
      }
    }

    /**
     * Write to disk the information needed to redo folder rename,
     * in JSON format. The file name will be
     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
     * The file format will be:
     * <pre>{@code
     * {
     *   FormatVersion: "1.0",
     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
     *   OldFolderName: "<key>",
     *   NewFolderName: "<key>",
     *   FileList: [ <string> , <string> , ... ]
     * }
     *
     * Here's a sample:
     * {
     *  FormatVersion: "1.0",
     *  OperationUTCTime: "2014-07-01 23:50:35.572",
     *  OldFolderName: "user/ehans/folderToRename",
     *  NewFolderName: "user/ehans/renamedFolder",
     *  FileList: [
     *    "innerFile",
     *    "innerFile2"
     *  ]
     * } }</pre>
     * @param fs file system on which a file is written.
     * @throws IOException Thrown when fail to write file.
     */
    public void writeFile(NativeAzureFileSystem fs) throws IOException {
      Path path = getRenamePendingFilePath();
      LOG.debug("Preparing to write atomic rename state to {}", path.toString());
      OutputStream output = null;

      String contents = makeRenamePendingFileContents();

      // Write file.
      try {
        output = fs.createInternal(path, FsPermission.getFileDefault(), false, null);
        output.write(contents.getBytes(StandardCharsets.UTF_8));
      } catch (IOException e) {
        throw new IOException("Unable to write RenamePending file for folder rename from "
            + srcKey + " to " + dstKey, e);
      } finally {
        NativeAzureFileSystemHelper.cleanup(LOG, output);
      }
    }

    /**
     * Return the contents of the JSON file to represent the operations
     * to be performed for a folder rename.
     *
     * @return JSON string which represents the operation.
     */
    public String makeRenamePendingFileContents() {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
      String time = sdf.format(new Date());

      // Make file list string
      StringBuilder builder = new StringBuilder();
      builder.append("[\n");
      for (int i = 0; i != fileMetadata.length; i++) {
        if (i > 0) {
          builder.append(",\n");
        }
        builder.append("    ");
        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");

        // Quote string file names, escaping any possible " characters or other
        // necessary characters in the name.
        builder.append(quote(noPrefix));
        if (builder.length() >=
            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {

          // Give up now to avoid using too much memory.
          LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.",
              MAX_RENAME_PENDING_FILE_SIZE);

          // return some bad JSON with an error message to make it human readable
          return "exceeded maximum rename pending file size";
        }
      }
      builder.append("\n  ]");
      String fileList = builder.toString();

      // Make file contents as a string. Again, quote file names, escaping
      // characters as appropriate.
      String contents = "{\n"
          + "  FormatVersion: \"1.0\",\n"
          + "  OperationUTCTime: \"" + time + "\",\n"
          + "  OldFolderName: " + quote(srcKey) + ",\n"
          + "  NewFolderName: " + quote(dstKey) + ",\n"
          + "  FileList: " + fileList + "\n"
          + "}\n";

      return contents;
    }

    /**
     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
     * method.
     *
     * Produce a string in double quotes with backslash sequences in all the
     * right places. A backslash will be inserted within </, allowing JSON
     * text to be delivered in HTML. In JSON text, a string cannot contain a
     * control character or an unescaped quote or backslash.
     * @param string A String
     * @return  A String correctly formatted for insertion in a JSON text.
     */
    private String quote(String string) {
        if (string == null || string.length() == 0) {
            return "\"\"";
        }

        char c = 0;
        int  i;
        int  len = string.length();
        StringBuilder sb = new StringBuilder(len + 4);
        String t;

        sb.append('"');
        for (i = 0; i < len; i += 1) {
            c = string.charAt(i);
            switch (c) {
            case '\\':
            case '"':
                sb.append('\\');
                sb.append(c);
                break;
            case '/':
                sb.append('\\');
                sb.append(c);
                break;
            case '\b':
                sb.append("\\b");
                break;
            case '\t':
                sb.append("\\t");
                break;
            case '\n':
                sb.append("\\n");
                break;
            case '\f':
                sb.append("\\f");
                break;
            case '\r':
                sb.append("\\r");
                break;
            default:
                if (c < ' ') {
                    t = "000" + Integer.toHexString(c);
                    sb.append("\\u" + t.substring(t.length() - 4));
                } else {
                    sb.append(c);
                }
            }
        }
        sb.append('"');
        return sb.toString();
    }

    public String getSrcKey() {
      return srcKey;
    }

    public String getDstKey() {
      return dstKey;
    }

    public FileMetadata getSourceMetadata() throws IOException {
      return fs.getStoreInterface().retrieveMetadata(srcKey);
    }

    /**
     * Execute a folder rename. This is the execution path followed
     * when everything is working normally. See redo() for the alternate
     * execution path for the case where we're recovering from a folder rename
     * failure.
     * @throws IOException Thrown when fail to renaming.
     */
    public void execute() throws IOException {

      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
        @Override
        public boolean execute(FileMetadata file) throws IOException{
          renameFile(file);
          return true;
        }
      };

      AzureFileSystemThreadPoolExecutor executor = this.fs.getThreadPoolExecutor(this.fs.renameThreadCount,
          "AzureBlobRenameThread", "Rename", getSrcKey(), AZURE_RENAME_THREADS);

      executor.executeParallel(this.getFiles(), task);

      // Rename the source folder 0-byte root file itself.
      FileMetadata srcMetadata2 = this.getSourceMetadata();
      if (srcMetadata2.getBlobMaterialization() ==
          BlobMaterialization.Explicit) {

        // It already has a lease on it from the "prepare" phase so there's no
        // need to get one now. Pass in existing lease to allow file delete.
        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
            false, folderLease);
      }

      // Update the last-modified time of the parent folders of both source and
      // destination.
      fs.updateParentFolderLastModifiedTime(srcKey);
      fs.updateParentFolderLastModifiedTime(dstKey);
    }

    // Rename a single file
    @VisibleForTesting
    void renameFile(FileMetadata file) throws IOException{
      // Rename all materialized entries under the folder to point to the
      // final destination.
      if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
        String srcName = file.getKey();
        String suffix  = srcName.substring((this.getSrcKey()).length());
        String dstName = this.getDstKey() + suffix;

        // Rename gets exclusive access (via a lease) for files
        // designated for atomic rename.
        // The main use case is for HBase write-ahead log (WAL) and data
        // folder processing correctness.  See the rename code for details.
        boolean acquireLease = this.fs.getStoreInterface().isAtomicRenameKey(srcName);
        this.fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
      }
    }

    /** Clean up after execution of rename.
     * @throws IOException Thrown when fail to clean up.
     * */
    public void cleanup() throws IOException {

      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {

        // Remove RenamePending file
        fs.delete(getRenamePendingFilePath(), false);

        // Freeing source folder lease is not necessary since the source
        // folder file was deleted.
      }
    }

    private Path getRenamePendingFilePath() {
      String fileName = srcKey + SUFFIX;
      Path fileNamePath = keyToPath(fileName);
      Path path = fs.makeAbsolute(fileNamePath);
      return path;
    }

    /**
     * Recover from a folder rename failure by redoing the intended work,
     * as recorded in the -RenamePending.json file.
     * 
     * @throws IOException Thrown when fail to redo.
     */
    public void redo() throws IOException {

      if (!committed) {

        // Nothing to do. The -RedoPending.json file should have already been
        // deleted.
        return;
      }

      // Try to get a lease on source folder to block concurrent access to it.
      // It may fail if the folder is already gone. We don't check if the
      // source exists explicitly because that could recursively trigger redo
      // and give an infinite recursion.
      SelfRenewingLease lease = null;
      boolean sourceFolderGone = false;
      try {
        lease = fs.leaseSourceFolder(srcKey);
      } catch (AzureException e) {

        // If the source folder was not found then somebody probably
        // raced with us and finished the rename first, or the
        // first rename failed right before deleting the rename pending
        // file.
        String errorCode = "";
        try {
          StorageException se = (StorageException) e.getCause();
          errorCode = se.getErrorCode();
        } catch (Exception e2) {
          ; // do nothing -- could not get errorCode
        }
        if (errorCode.equals("BlobNotFound")) {
          sourceFolderGone = true;
        } else {
          throw new IOException(
              "Unexpected error when trying to lease source folder name during "
              + "folder rename redo",
              e);
        }
      }

      if (!sourceFolderGone) {
        // Make sure the target folder exists.
        Path dst = fullPath(dstKey);
        if (!fs.existsInternal(dst)) {
          fs.mkdirs(dst);
        }

        // For each file inside the folder to be renamed,
        // make sure it has been renamed.
        for(String fileName : fileStrings) {
          finishSingleFileRename(fileName);
        }

        // Remove the source folder. Don't check explicitly if it exists,
        // to avoid triggering redo recursively.
        try {
          // Rename the source folder 0-byte root file
          // as destination folder 0-byte root file.
          FileMetadata srcMetaData = this.getSourceMetadata();
          if (srcMetaData.getBlobMaterialization() == BlobMaterialization.Explicit) {
            // We already have a lease. So let's just rename the source blob
            // as destination blob under same lease.
            fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), false, lease);
          }

          // Now we can safely delete the source folder.
          fs.getStoreInterface().delete(srcKey, lease);
        } catch (Exception e) {
          LOG.info("Unable to delete source folder during folder rename redo. "
              + "If the source folder is already gone, this is not an error "
              + "condition. Continuing with redo.", e);
        }

        // Update the last-modified time of the parent folders of both source
        // and destination.
        fs.updateParentFolderLastModifiedTime(srcKey);
        fs.updateParentFolderLastModifiedTime(dstKey);
      }

      // Remove the -RenamePending.json file.
      fs.delete(getRenamePendingFilePath(), false);
    }

    // See if the source file is still there, and if it is, rename it.
    private void finishSingleFileRename(String fileName)
        throws IOException {
      Path srcFile = fullPath(srcKey, fileName);
      Path dstFile = fullPath(dstKey, fileName);
      String srcName = fs.pathToKey(srcFile);
      String dstName = fs.pathToKey(dstFile);
      boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName);
      boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName);
      if(srcExists) {
        // Rename gets exclusive access (via a lease) for HBase write-ahead log
        // (WAL) file processing correctness.  See the rename code for details.
        fs.getStoreInterface().rename(srcName, dstName, true, null);
      } else if (!srcExists && dstExists) {
        // The rename already finished, so do nothing.
        ;
      } else {
        // HADOOP-14512
        LOG.warn(
          "Attempting to complete rename of file " + srcKey + "/" + fileName
          + " during folder rename redo, and file was not found in source "
          + "or destination " + dstKey + "/" + fileName + ". "
          + "This must mean the rename of this file has already completed");
      }
    }

    // Return an absolute path for the specific fileName within the folder
    // specified by folderKey.
    private Path fullPath(String folderKey, String fileName) {
      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
    }

    private Path fullPath(String fileKey) {
      return new Path(new Path(fs.getUri()), "/" + fileKey);
    }
  }

  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");

  @Override
  public String getScheme() {
    return "wasb";
  }

  /**
   * If fs.azure.override.canonical.service.name is set as true, return URI of
   * the WASB filesystem, otherwise use the default implementation.
   *
   * @return a service string that uniquely identifies this file system
   */
  @Override
  public String getCanonicalServiceName() {
    if (returnUriAsCanonicalServiceName) {
      return getUri().toString();
    }
    return super.getCanonicalServiceName();
  }


  /**
   * <p>
   * A {@link FileSystem} for reading and writing files stored on <a
   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
   * blob-based and stores files on Azure in their native form so they can be read
   * by other Azure tools. This implementation uses HTTPS for secure network communication.
   * </p>
   */
  public static class Secure extends NativeAzureFileSystem {
    @Override
    public String getScheme() {
      return "wasbs";
    }
  }

  public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);

  /**
   * The time span in seconds before which we consider a temp blob to be
   * dangling (not being actively uploaded to) and up for reclamation.
   * 
   * So e.g. if this is 60, then any temporary blobs more than a minute old
   * would be considered dangling.
   */
  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
  static final String PATH_DELIMITER = Path.SEPARATOR;
  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";

  private static final int AZURE_LIST_ALL = -1;
  private static final int AZURE_UNBOUNDED_DEPTH = -1;

  /**
   * The configuration property that determines which group owns files created
   * in WASB.
   */
  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
  /**
   * The default value for fs.azure.permissions.supergroup. Chosen as the same
   * default as DFS.
   */
  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";

  /**
   * Configuration property used to specify list of users that can perform
   * chown operation when authorization is enabled in WASB.
   */
  public static final String AZURE_CHOWN_USERLIST_PROPERTY_NAME =
      "fs.azure.chown.allowed.userlist";

  static final String AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE = "*";

  /**
   * Configuration property used to specify list of daemon users that can
   * perform chmod operation when authorization is enabled in WASB.
   */
  public static final String AZURE_DAEMON_USERLIST_PROPERTY_NAME =
      "fs.azure.daemon.userlist";

  static final String AZURE_DAEMON_USERLIST_PROPERTY_DEFAULT_VALUE = "*";

  /**
   * Configuration property used to specify list of users that can perform
   * chmod operation when authorization is enabled in WASB.
   */
  public static final String AZURE_CHMOD_USERLIST_PROPERTY_NAME =
          "fs.azure.chmod.allowed.userlist";

  static final String AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE = "*";

  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
      "fs.azure.ring.buffer.capacity";
  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
      "fs.azure.output.stream.buffer.size";

  public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";

  /*
   * Property to enable Append API.
   */
  public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";

  /*
   * Property to override canonical service name with filesystem's URI.
   */
  public static final String RETURN_URI_AS_CANONICAL_SERVICE_NAME_PROPERTY_NAME = "fs.azure.override.canonical.service.name";

  /**
   * The configuration property to set number of threads to be used for rename operation.
   */
  public static final String AZURE_RENAME_THREADS = "fs.azure.rename.threads";

  /**
   * The default number of threads to be used for rename operation.
   */
  public static final int DEFAULT_AZURE_RENAME_THREADS = 0;

  /**
   * The configuration property to set number of threads to be used for delete operation.
   */
  public static final String AZURE_DELETE_THREADS = "fs.azure.delete.threads";

  /**
   * The default number of threads to be used for delete operation.
   */
  public static final int DEFAULT_AZURE_DELETE_THREADS = 0;

  /**
   * The number of threads to be used for delete operation after reading user configuration.
   */
  private int deleteThreadCount = 0;

  /**
   * The number of threads to be used for rename operation after reading user configuration.
   */
  private int renameThreadCount = 0;

  private class NativeAzureFsInputStream extends FSInputStream {
    private InputStream in;
    private final String key;
    private long pos = 0;
    private boolean closed = false;
    private boolean isPageBlob;

    // File length, valid only for streams over block blobs.
    private long fileLength;

    NativeAzureFsInputStream(InputStream in, String key, long fileLength) {
      this.in = in;
      this.key = key;
      this.isPageBlob = store.isPageBlobKey(key);
      this.fileLength = fileLength;
    }

    /**
     * Return the size of the remaining available bytes
     * if the size is less than or equal to {@link Integer#MAX_VALUE},
     * otherwise, return {@link Integer#MAX_VALUE}.
     *
     * This is to match the behavior of DFSInputStream.available(),
     * which some clients may rely on (HBase write-ahead log reading in
     * particular).
     */
    @Override
    public synchronized int available() throws IOException {
      if (isPageBlob) {
        return in.available();
      } else {
        if (closed) {
          throw new IOException("Stream closed");
        }
        final long remaining = this.fileLength - pos;
        return remaining <= Integer.MAX_VALUE ?
            (int) remaining : Integer.MAX_VALUE;
      }
    }

    /*
     * Reads the next byte of data from the input stream. The value byte is
     * returned as an integer in the range 0 to 255. If no byte is available
     * because the end of the stream has been reached, the value -1 is returned.
     * This method blocks until input data is available, the end of the stream
     * is detected, or an exception is thrown.
     *
     * @returns int An integer corresponding to the byte read.
     */
    @Override
    public synchronized int read() throws FileNotFoundException, IOException {
      try {
        int result = 0;
        result = in.read();
        if (result != -1) {
          pos++;
          if (statistics != null) {
            statistics.incrementBytesRead(1);
          }
        }
      // Return to the caller with the result.
      //
        return result;
      } catch(EOFException e) {
        return -1;
      } catch(IOException e) {

        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);

        if (innerException instanceof StorageException) {

          LOG.error("Encountered Storage Exception for read on Blob : {}"
              + " Exception details: {} Error Code : {}",
              key, e, ((StorageException) innerException).getErrorCode());

          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
            throw new FileNotFoundException(String.format("%s is not found", key));
          }
        }

       throw e;
      }
    }

    /*
     * Reads up to len bytes of data from the input stream into an array of
     * bytes. An attempt is made to read as many as len bytes, but a smaller
     * number may be read. The number of bytes actually read is returned as an
     * integer. This method blocks until input data is available, end of file is
     * detected, or an exception is thrown. If len is zero, then no bytes are
     * read and 0 is returned; otherwise, there is an attempt to read at least
     * one byte. If no byte is available because the stream is at end of file,
     * the value -1 is returned; otherwise, at least one byte is read and stored
     * into b.
     *
     * @param b -- the buffer into which data is read
     *
     * @param off -- the start offset in the array b at which data is written
     *
     * @param len -- the maximum number of bytes read
     *
     * @ returns int The total number of byes read into the buffer, or -1 if
     * there is no more data because the end of stream is reached.
     */
    @Override
    public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
      try {
        int result = 0;
        result = in.read(b, off, len);
        if (result > 0) {
          pos += result;
        }

        if (null != statistics && result > 0) {
          statistics.incrementBytesRead(result);
        }

        // Return to the caller with the result.
        return result;
      } catch(IOException e) {

        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);

        if (innerException instanceof StorageException) {

          LOG.error("Encountered Storage Exception for read on Blob : {}"
              + " Exception details: {} Error Code : {}",
              key, e, ((StorageException) innerException).getErrorCode());

          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
            throw new FileNotFoundException(String.format("%s is not found", key));
          }
        }

       throw e;
      }
    }

    @Override
    public int read(long position, byte[] buffer, int offset, int length)
        throws IOException {
      // SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
      // This report is not valid here.
      // 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
      // calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
      // is configured false. Super class FSInputStream's implementation is having
      // proper synchronization.
      // When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
      // implementation of blob read. Here we don't use any of the InputStream's
      // shared resource (buffer) and also don't change any cursor position etc.
      // So its safe to go with unsynchronized way of read.
      if (in instanceof PositionedReadable) {
        try {
          int result = ((PositionedReadable) this.in).read(position, buffer,
              offset, length);
          if (null != statistics && result > 0) {
            statistics.incrementBytesRead(result);
          }
          return result;
        } catch (IOException e) {
          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
          if (innerException instanceof StorageException) {
            LOG.error("Encountered Storage Exception for read on Blob : {}"
                + " Exception details: {} Error Code : {}",
                key, e, ((StorageException) innerException).getErrorCode());
            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
              throw new FileNotFoundException(String.format("%s is not found", key));
            }
          }
          throw e;
        }
      }
      return super.read(position, buffer, offset, length);
    }

    @Override
    public synchronized void close() throws IOException {
      if (!closed) {
        closed = true;
        IOUtils.closeStream(in);
        in = null;
      }
    }

    @Override
    public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
      try {
        checkNotClosed();
        if (pos < 0) {
          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
        }
        if (this.pos > pos) {
          if (in instanceof Seekable) {
            ((Seekable) in).seek(pos);
            this.pos = pos;
          } else {
            IOUtils.closeStream(in);
            in = store.retrieve(key);
            this.pos = in.skip(pos);
          }
        } else {
          this.pos += in.skip(pos - this.pos);
        }
        LOG.debug("Seek to position {}. Bytes skipped {}", pos,
          this.pos);
      } catch(IOException e) {

        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);

        if (innerException instanceof StorageException
            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
          throw new FileNotFoundException(String.format("%s is not found", key));
        }

        throw e;
      } catch(IndexOutOfBoundsException e) {
        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
      }
    }

    @Override
    public synchronized long getPos() throws IOException {
      return pos;
    }

    @Override
    public boolean seekToNewSource(long targetPos) throws IOException {
      return false;
    }


    /*
     * Helper method to check if a stream is closed.
     */
    private void checkNotClosed() throws IOException {
      if (closed) {
        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
      }
    }
  }

  /**
   * Azure output stream; wraps an inner stream of different types.
   */
  public class NativeAzureFsOutputStream extends OutputStream
      implements Syncable, StreamCapabilities {
    private String key;
    private String keyEncoded;
    private OutputStream out;

    public NativeAzureFsOutputStream(OutputStream out, String aKey,
        String anEncodedKey) throws IOException {
      // Check input arguments. The output stream should be non-null and the
      // keys
      // should be valid strings.
      if (null == out) {
        throw new IllegalArgumentException(
            "Illegal argument: the output stream is null.");
      }

      if (null == aKey || 0 == aKey.length()) {
        throw new IllegalArgumentException(
            "Illegal argument the key string is null or empty");
      }

      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
        throw new IllegalArgumentException(
            "Illegal argument the encoded key string is null or empty");
      }

      // Initialize the member variables with the incoming parameters.
      this.out = out;

      setKey(aKey);
      setEncodedKey(anEncodedKey);
    }

    /**
     * Get a reference to the wrapped output stream.
     *
     * @return the underlying output stream
     */
    @InterfaceAudience.LimitedPrivate({"HDFS"})
    public OutputStream getOutStream() {
      return out;
    }

    @Override  // Syncable
    public void hflush() throws IOException {
      if (out instanceof Syncable) {
        ((Syncable) out).hflush();
      } else {
        flush();
      }
    }

    @Override  // Syncable
    public void hsync() throws IOException {
      if (out instanceof Syncable) {
        ((Syncable) out).hsync();
      } else {
        flush();
      }
    }

    /**
     * Propagate probe of stream capabilities to nested stream
     * (if supported), else return false.
     * @param capability string to query the stream support for.
     * @return true if the nested stream supports the specific capability.
     */
    @Override // StreamCapability
    public boolean hasCapability(String capability) {
      return StoreImplementationUtils.hasCapability(out, capability);
    }

    @Override
    public synchronized void close() throws IOException {
      if (out != null) {
        // Close the output stream and decode the key for the output stream
        // before returning to the caller.
        //
        out.close();
        try {
          restoreKey();
        } finally {
          out = null;
        }
      }
    }

    /**
     * Writes the specified byte to this output stream. The general contract for
     * write is that one byte is written to the output stream. The byte to be
     * written is the eight low-order bits of the argument b. The 24 high-order
     * bits of b are ignored.
     * 
     * @param b
     *          32-bit integer of block of 4 bytes
     */
    @Override
    public void write(int b) throws IOException {
      checkOpen();
      try {
        out.write(b);
      } catch(IOException e) {
        if (e.getCause() instanceof StorageException) {
          StorageException storageExcp  = (StorageException) e.getCause();
          LOG.error("Encountered Storage Exception for write on Blob : {}"
              + " Exception details: {} Error Code : {}",
              key, e.getMessage(), storageExcp.getErrorCode());
        }
        throw e;
      }
    }

    /**
     * Writes b.length bytes from the specified byte array to this output
     * stream. The general contract for write(b) is that it should have exactly
     * the same effect as the call write(b, 0, b.length).
     * 
     * @param b
     *          Block of bytes to be written to the output stream.
     */
    @Override
    public void write(byte[] b) throws IOException {
      checkOpen();
      try {
        out.write(b);
      } catch(IOException e) {
        if (e.getCause() instanceof StorageException) {
          StorageException storageExcp  = (StorageException) e.getCause();
          LOG.error("Encountered Storage Exception for write on Blob : {}"
              + " Exception details: {} Error Code : {}",
              key, e.getMessage(), storageExcp.getErrorCode());
        }
        throw e;
      }
    }

    /**
     * Writes <code>len</code> from the specified byte array starting at offset
     * <code>off</code> to the output stream. The general contract for write(b,
     * off, len) is that some of the bytes in the array <code>b</code>
     * are written to the output stream in order; element <code>b[off]</code>
     * is the first byte written and <code>b[off+len-1]</code> is the last
     * byte written by this operation.
     * 
     * @param b
     *          Byte array to be written.
     * @param off
     *          Write this offset in stream.
     * @param len
     *          Number of bytes to be written.
     */
    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      checkOpen();
      try {
        out.write(b, off, len);
      } catch(IOException e) {
        if (e.getCause() instanceof StorageException) {
          StorageException storageExcp  = (StorageException) e.getCause();
          LOG.error("Encountered Storage Exception for write on Blob : {}"
              + " Exception details: {} Error Code : {}",
              key, e.getMessage(), storageExcp.getErrorCode());
        }
        throw e;
      }
    }

    /**
     * Get the blob name.
     * 
     * @return String Blob name.
     */
    public String getKey() {
      return key;
    }

    /**
     * Set the blob name.
     * 
     * @param key
     *          Blob name.
     */
    public void setKey(String key) {
      this.key = key;
    }

    /**
     * Get the blob name.
     * 
     * @return String Blob name.
     */
    public String getEncodedKey() {
      return keyEncoded;
    }

    /**
     * Set the blob name.
     * 
     * @param anEncodedKey
     *          Blob name.
     */
    public void setEncodedKey(String anEncodedKey) {
      this.keyEncoded = anEncodedKey;
    }

    /**
     * Restore the original key name from the m_key member variable. Note: The
     * output file stream is created with an encoded blob store key to guarantee
     * load balancing on the front end of the Azure storage partition servers.
     * The create also includes the name of the original key value which is
     * stored in the m_key member variable. This method should only be called
     * when the stream is closed.
     */
    private void restoreKey() throws IOException {
      store.rename(getEncodedKey(), getKey());
    }

    /**
     * Check for the stream being open.
     * @throws IOException if the stream is closed.
     */
    private void checkOpen() throws IOException {
      if (out == null) {
        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
      }
    }

  }

  private URI uri;
  private NativeFileSystemStore store;
  private AzureNativeFileSystemStore actualStore;
  private Path workingDir;
  private AzureFileSystemInstrumentation instrumentation;
  private String metricsSourceName;
  private boolean isClosed = false;
  private static boolean suppressRetryPolicy = false;
  // A counter to create unique (within-process) names for my metrics sources.
  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
  private boolean appendSupportEnabled = false;
  private boolean returnUriAsCanonicalServiceName = false;
  private DelegationTokenAuthenticatedURL authURL;
  private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
  private String credServiceUrl;
  private List<String> chownAllowedUsers;
  private List<String> chmodAllowedUsers;
  private List<String> daemonUsers;
  /**
   * Configuration key to enable authorization support in WASB.
   */
  public static final String KEY_AZURE_AUTHORIZATION =
      "fs.azure.authorization";

  /**
   * Default value for the authorization support in WASB.
   */
  private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;

  /**
   * Flag controlling authorization support in WASB.
   */
  private boolean azureAuthorization = false;

  /**
   * Flag controlling Kerberos support in WASB.
   */
  private boolean kerberosSupportEnabled = false;

  /**
   * Authorizer to use when authorization support is enabled in
   * WASB.
   */
  private WasbAuthorizerInterface authorizer = null;

  private UserGroupInformation ugi;

  private WasbDelegationTokenManager wasbDelegationTokenManager;

  public NativeAzureFileSystem() {
    // set store in initialize()
  }

  public NativeAzureFileSystem(NativeFileSystemStore store) {
    this.store = store;
  }

  /**
   * Suppress the default retry policy for the Storage, useful in unit tests to
   * test negative cases without waiting forever.
   */
  @VisibleForTesting
  static void suppressRetryPolicy() {
    suppressRetryPolicy = true;
  }

  /**
   * Undo the effect of suppressRetryPolicy.
   */
  @VisibleForTesting
  static void resumeRetryPolicy() {
    suppressRetryPolicy = false;
  }

  /**
   * Creates a new metrics source name that's unique within this process.
   * @return metric source name
   */
  @VisibleForTesting
  public static String newMetricsSourceName() {
    int number = metricsSourceNameCounter.incrementAndGet();
    final String baseName = "AzureFileSystemMetrics";
    if (number == 1) { // No need for a suffix for the first one
      return baseName;
    } else {
      return baseName + number;
    }
  }

  /**
   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
   * File System.
   *
   * @param scheme
   *          The URI scheme.
   * @return true iff it's an Azure File System URI scheme.
   */
  private static boolean isWasbScheme(String scheme) {
    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
    // wasb (new name), wasbs (new name over HTTPS).
    return scheme != null
        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
            || scheme.equalsIgnoreCase("wasb") || scheme
              .equalsIgnoreCase("wasbs"));
  }

  /**
   * Puts in the authority of the default file system if it is a WASB file
   * system and the given URI's authority is null.
   *
   * @return The URI with reconstructed authority if necessary and possible.
   */
  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
    if (null == uri.getAuthority()) {
      // If WASB is the default file system, get the authority from there
      URI defaultUri = FileSystem.getDefaultUri(conf);
      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
        try {
          // Reconstruct the URI with the authority from the default URI.
          return new URI(uri.getScheme(), defaultUri.getAuthority(),
              uri.getPath(), uri.getQuery(), uri.getFragment());
        } catch (URISyntaxException e) {
          // This should never happen.
          throw new Error("Bad URI construction", e);
        }
      }
    }
    return uri;
  }

  @Override
  protected void checkPath(Path path) {
    // Make sure to reconstruct the path's authority if needed
    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
        getConf())));
  }

  @Override
  public void initialize(URI uri, Configuration conf)
      throws IOException, IllegalArgumentException {
    // Check authority for the URI to guarantee that it is non-null.
    uri = reconstructAuthorityIfNeeded(uri, conf);
    if (null == uri.getAuthority()) {
      final String errMsg = String
          .format("Cannot initialize WASB file system, URI authority not recognized.");
      throw new IllegalArgumentException(errMsg);
    }
    super.initialize(uri, conf);

    if (store == null) {
      store = createDefaultStore(conf);
    }

    instrumentation = new AzureFileSystemInstrumentation(conf);
    if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
      // Make sure the metrics system is available before interacting with Azure
      AzureFileSystemMetricsSystem.fileSystemStarted();
      metricsSourceName = newMetricsSourceName();
      String sourceDesc = "Azure Storage Volume File System metrics";
      AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
        instrumentation);
    }

    store.initialize(uri, conf, instrumentation);
    setConf(conf);
    this.ugi = UserGroupInformation.getCurrentUser();
    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());

    this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
    LOG.debug("NativeAzureFileSystem. Initializing.");
    LOG.debug("  blockSize  = {}", store.getHadoopBlockSize());

    // Initialize thread counts from user configuration
    deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
    renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);

    boolean useSecureMode = conf.getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE,
        AzureNativeFileSystemStore.DEFAULT_USE_SECURE_MODE);

    this.azureAuthorization = useSecureMode &&
        conf.getBoolean(KEY_AZURE_AUTHORIZATION, DEFAULT_AZURE_AUTHORIZATION);
    this.kerberosSupportEnabled =
        conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);

    if (this.azureAuthorization) {

      this.authorizer =
          new RemoteWasbAuthorizerImpl();
      authorizer.init(conf);

      this.chmodAllowedUsers =
          Arrays.asList(conf.getTrimmedStrings(
              AZURE_CHMOD_USERLIST_PROPERTY_NAME,
                  AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE));
      this.chownAllowedUsers =
          Arrays.asList(conf.getTrimmedStrings(
              AZURE_CHOWN_USERLIST_PROPERTY_NAME,
                  AZURE_CHOWN_USERLIST_PROPERTY_DEFAULT_VALUE));
      this.daemonUsers =
          Arrays.asList(conf.getTrimmedStrings(
              AZURE_DAEMON_USERLIST_PROPERTY_NAME,
                  AZURE_DAEMON_USERLIST_PROPERTY_DEFAULT_VALUE));
    }

    if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
      this.wasbDelegationTokenManager = new RemoteWasbDelegationTokenManager(conf);
    }

    this.returnUriAsCanonicalServiceName = conf.getBoolean(RETURN_URI_AS_CANONICAL_SERVICE_NAME_PROPERTY_NAME, false);
  }

  @Override
  public Path getHomeDirectory() {
    return makeQualified(new Path(
        USER_HOME_DIR_PREFIX_DEFAULT + "/" + this.ugi.getShortUserName()));
  }

  @VisibleForTesting
  public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
    this.authorizer = authorizer;
  }

  private NativeFileSystemStore createDefaultStore(Configuration conf) {
    actualStore = new AzureNativeFileSystemStore();

    if (suppressRetryPolicy) {
      actualStore.suppressRetryPolicy();
    }
    return actualStore;
  }

  /**
   * Azure Storage doesn't allow the blob names to end in a period,
   * so encode this here to work around that limitation.
   */
  private static String encodeTrailingPeriod(String toEncode) {
    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
  }

  /**
   * Reverse the encoding done by encodeTrailingPeriod().
   */
  private static String decodeTrailingPeriod(String toDecode) {
    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
    return matcher.replaceAll(".");
  }

  /**
   * Convert the path to a key. By convention, any leading or trailing slash is
   * removed, except for the special case of a single slash.
   * @param path path converted to a key
   * @return key string
   */
  @VisibleForTesting
  public String pathToKey(Path path) {
    // Convert the path to a URI to parse the scheme, the authority, and the
    // path from the path object.
    URI tmpUri = path.toUri();
    String pathUri = tmpUri.getPath();

    // The scheme and authority is valid. If the path does not exist add a "/"
    // separator to list the root of the container.
    Path newPath = path;
    if ("".equals(pathUri)) {
      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
    }

    // Verify path is absolute if the path refers to a windows drive scheme.
    if (!newPath.isAbsolute()) {
      throw new IllegalArgumentException("Path must be absolute: " + path);
    }

    String key = null;
    key = newPath.toUri().getPath();
    key = removeTrailingSlash(key);
    key = encodeTrailingPeriod(key);
    if (key.length() == 1) {
      return key;
    } else {
      return key.substring(1); // remove initial slash
    }
  }

  // Remove any trailing slash except for the case of a single slash.
  private static String removeTrailingSlash(String key) {
    if (key.length() == 0 || key.length() == 1) {
      return key;
    }
    if (key.charAt(key.length() - 1) == '/') {
      return key.substring(0, key.length() - 1);
    } else {
      return key;
    }
  }

  static Path keyToPath(String key) {
    if (key.equals("/")) {
      return new Path("/"); // container
    }
    return new Path("/" + decodeTrailingPeriod(key));
  }

  /**
   * Get the absolute version of the path (fully qualified).
   * This is public for testing purposes.
   *
   * @param path path to be absolute path.
   * @return fully qualified path
   */
  @VisibleForTesting
  public Path makeAbsolute(Path path) {
    if (path.isAbsolute()) {
      return path;
    }
    return new Path(workingDir, path);
  }

  /**
   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
   * backing this file system.
   *
   * @return The store object.
   */
  @VisibleForTesting
  public AzureNativeFileSystemStore getStore() {
    return actualStore;
  }

  NativeFileSystemStore getStoreInterface() {
    return store;
  }

  /**
   * @param requestingAccessForPath - The path to the ancestor/parent/subtree/file that needs to be
   *                                checked before granting access to originalPath
   * @param accessType - The type of access READ/WRITE being requested
   * @param operation - A string describing the operation being performed ("delete", "create" etc.).
   * @param originalPath - The originalPath that was being accessed
   */
  private void performAuthCheck(Path requestingAccessForPath, WasbAuthorizationOperations accessType,
      String operation, Path originalPath) throws WasbAuthorizationException, IOException {

    if (azureAuthorization && this.authorizer != null) {

      requestingAccessForPath = requestingAccessForPath.makeQualified(getUri(), getWorkingDirectory());
      originalPath = originalPath.makeQualified(getUri(), getWorkingDirectory());

      String owner = getOwnerForPath(requestingAccessForPath);

      if (!this.authorizer.authorize(requestingAccessForPath.toString(), accessType.toString(), owner)) {
        throw new WasbAuthorizationException(operation
            + " operation for Path : " + originalPath.toString() + " not allowed");
      }

   }
  }

  /**
   * Gets the metrics source for this file system.
   * This is mainly here for unit testing purposes.
   *
   * @return the metrics source.
   */
  public AzureFileSystemInstrumentation getInstrumentation() {
    return instrumentation;
  }

  /** This optional operation is not yet supported. */
  @Override
  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
      throws IOException {

    if (!appendSupportEnabled) {
      throw new UnsupportedOperationException("Append Support not enabled");
    }

    LOG.debug("Opening file: {} for append", f);

    Path absolutePath = makeAbsolute(f);

    performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "append", absolutePath);

    String key = pathToKey(absolutePath);
    FileMetadata meta = null;
    try {
      meta = store.retrieveMetadata(key);
    } catch(Exception ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("%s is not found", key));
      } else {
        throw ex;
      }
    }

    if (meta == null) {
      throw new FileNotFoundException(f.toString());
    }

    if (meta.isDirectory()) {
      throw new FileNotFoundException(f.toString()
          + " is a directory not a file.");
    }

    if (store.isPageBlobKey(key)) {
      throw new IOException("Append not supported for Page Blobs");
    }

    DataOutputStream appendStream = null;

    try {
      appendStream = store.retrieveAppendStream(key, bufferSize);
    } catch (Exception ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
        throw new FileNotFoundException(String.format("%s is not found", key));
      } else {
        throw ex;
      }
    }

    return new FSDataOutputStream(appendStream, statistics);
  }

  @Override
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    return create(f, permission, overwrite, true,
        bufferSize, replication, blockSize, progress,
        (SelfRenewingLease) null);
  }

  /**
   * Get a self-renewing lease on the specified file.
   * @param path path whose lease to be renewed.
   * @return Lease
   * @throws AzureException when not being able to acquire a lease on the path
   */
  public SelfRenewingLease acquireLease(Path path) throws AzureException {
    String fullKey = pathToKey(makeAbsolute(path));
    return getStore().acquireLease(fullKey);
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {

    Path parent = f.getParent();

    // Get exclusive access to folder if this is a directory designated
    // for atomic rename. The primary use case of for HBase write-ahead
    // log file management.
    SelfRenewingLease lease = null;
    if (store.isAtomicRenameKey(pathToKey(f))) {
      try {
        lease = acquireLease(parent);
      } catch (AzureException e) {

        String errorCode = "";
        try {
          StorageException e2 = (StorageException) e.getCause();
          errorCode = e2.getErrorCode();
        } catch (Exception e3) {
          // do nothing if cast fails
        }
        if (errorCode.equals("BlobNotFound")) {
          throw new FileNotFoundException("Cannot create file " +
              f.getName() + " because parent folder does not exist.");
        }

        LOG.warn("Got unexpected exception trying to get lease on {} . {}",
          pathToKey(parent), e.getMessage());
        throw e;
      }
    }

    // See if the parent folder exists. If not, throw error.
    // The exists() check will push any pending rename operation forward,
    // if there is one, and return false.
    //
    // At this point, we have exclusive access to the source folder
    // via the lease, so we will not conflict with an active folder
    // rename operation.
    //
    // In the secure case, the call to exists will happen in the context
    // of the user that initiated the operation. In this case, we should
    // do the auth-check against ranger for the path.
    if (!exists(parent)) {
      try {

        // This'll let the keep-alive thread exit as soon as it wakes up.
        lease.free();
      } catch (Exception e) {
        LOG.warn("Unable to free lease because: {}", e.getMessage());
      }
      throw new FileNotFoundException("Cannot create file " +
          f.getName() + " because parent folder does not exist.");
    }

    // Create file inside folder.
    FSDataOutputStream out = null;
    try {
      out = create(f, permission, overwrite, false,
          bufferSize, replication, blockSize, progress, lease);
    } finally {
      // Release exclusive access to folder.
      try {
        if (lease != null) {
          lease.free();
        }
      } catch (Exception e) {
        NativeAzureFileSystemHelper.cleanup(LOG, out);
        String msg = "Unable to free lease on " + parent.toUri();
        LOG.error(msg);
        throw new IOException(msg, e);
      }
    }
    return out;
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {

    // Check if file should be appended or overwritten. Assume that the file
    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
    // that any other combinations of create flags will result in an open new or
    // open with append.
    final EnumSet<CreateFlag> createflags =
        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
    boolean overwrite = flags.containsAll(createflags);

    // Delegate the create non-recursive call.
    return this.createNonRecursive(f, permission, overwrite,
        bufferSize, replication, blockSize, progress);
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    return this.createNonRecursive(f, FsPermission.getFileDefault(),
        overwrite, bufferSize, replication, blockSize, progress);
  }


  /**
   * Create an Azure blob and return an output stream to use
   * to write data to it.
   *
   * @param f
   * @param permission
   * @param overwrite
   * @param createParent
   * @param bufferSize
   * @param replication
   * @param blockSize
   * @param progress
   * @param parentFolderLease Lease on parent folder (or null if
   * no lease).
   * @return
   * @throws IOException
   */
  private FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, boolean createParent, int bufferSize,
      short replication, long blockSize, Progressable progress,
      SelfRenewingLease parentFolderLease)
          throws FileAlreadyExistsException, IOException {

    LOG.debug("Creating file: {}", f.toString());

    if (containsColon(f)) {
      throw new IOException("Cannot create file " + f
          + " through WASB that has colons in the name");
    }

    Path absolutePath = makeAbsolute(f);
    Path ancestor = getAncestor(absolutePath);

    performAuthCheck(ancestor, WasbAuthorizationOperations.WRITE, "create", absolutePath);

    return createInternal(f, permission, overwrite, parentFolderLease);
  }


  /**
   * This is the version of the create call that is meant for internal usage.
   * This version is not public facing and does not perform authorization checks.
   * It is used by the public facing create call and by FolderRenamePending to
   * create the internal -RenamePending.json file.
   * @param f the path to a file to be created.
   * @param permission for the newly created file.
   * @param overwrite specifies if the file should be overwritten.
   * @param parentFolderLease lease on the parent folder.
   * @return the output stream used to write data into the newly created file .
   * @throws IOException if an IO error occurs while attempting to delete the
   * path.
   *
   */
  protected FSDataOutputStream createInternal(Path f, FsPermission permission,
                                    boolean overwrite,
                                    SelfRenewingLease parentFolderLease)
      throws FileAlreadyExistsException, IOException {

    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);

    FileMetadata existingMetadata = store.retrieveMetadata(key);
    if (existingMetadata != null) {
      if (existingMetadata.isDirectory()) {
        throw new FileAlreadyExistsException("Cannot create file " + f
            + "; already exists as a directory.");
      }
      if (!overwrite) {
        throw new FileAlreadyExistsException("File already exists:" + f);
      }
      else {
        performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "create", absolutePath);
      }
    }

    Path parentFolder = absolutePath.getParent();
    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
      // Update the parent folder last modified time if the parent folder
      // already exists.
      String parentKey = pathToKey(parentFolder);
      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
      if (parentMetadata != null && parentMetadata.isDirectory() &&
        parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
        if (parentFolderLease != null) {
          store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
        } else {
          updateParentFolderLastModifiedTime(key);
        }
      } else {
        // Make sure that the parent folder exists.
        // Create it using inherited permissions from the first existing directory going up the path
        Path firstExisting = parentFolder.getParent();
        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
        while(metadata == null) {
          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
          firstExisting = firstExisting.getParent();
          metadata = store.retrieveMetadata(pathToKey(firstExisting));
        }
        mkdirs(parentFolder, metadata.getPermission(), true);
      }
    }

    // Mask the permission first (with the default permission mask as well).
    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
    PermissionStatus permissionStatus = createPermissionStatus(masked);

    OutputStream bufOutStream;
    if (store.isPageBlobKey(key)) {
      // Store page blobs directly in-place without renames.
      bufOutStream = store.storefile(key, permissionStatus, key);
    } else {
      // This is a block blob, so open the output blob stream based on the
      // encoded key.
      //
      String keyEncoded = encodeKey(key);


      // First create a blob at the real key, pointing back to the temporary file
      // This accomplishes a few things:
      // 1. Makes sure we can create a file there.
      // 2. Makes it visible to other concurrent threads/processes/nodes what
      // we're
      // doing.
      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);

      // The key is encoded to point to a common container at the storage server.
      // This reduces the number of splits on the server side when load balancing.
      // Ingress to Azure storage can take advantage of earlier splits. We remove
      // the root path to the key and prefix a random GUID to the tail (or leaf
      // filename) of the key. Keys are thus broadly and randomly distributed over
      // a single container to ease load balancing on the storage server. When the
      // blob is committed it is renamed to its earlier key. Uncommitted blocks
      // are not cleaned up and we leave it to Azure storage to garbage collect
      // these
      // blocks.
      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
          keyEncoded, permissionStatus, key), key, keyEncoded);
    }
    // Construct the data output stream from the buffered output stream.
    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);


    // Increment the counter
    instrumentation.fileCreated();

    // Return data output stream to caller.
    return fsOut;
  }

  @Override
  @Deprecated
  public boolean delete(Path path) throws IOException {
    return delete(path, true);
  }

  @Override
  public boolean delete(Path f, boolean recursive) throws IOException {
    return delete(f, recursive, false);
  }

  /**
   * Delete file or folder with authorization checks. Most of the code
   * is duplicate of the actual delete implementation and will be merged
   * once the performance and funcional aspects are guaranteed not to
   * regress existing delete semantics.
   */
  private boolean deleteWithAuthEnabled(Path f, boolean recursive,
      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {

    LOG.debug("Deleting file: {}", f);

    Path absolutePath = makeAbsolute(f);
    Path parentPath = absolutePath.getParent();

    // If delete is issued for 'root', parentPath will be null
    // In that case, we perform auth check for root itself before
    // proceeding for deleting contents under root.
    if (parentPath != null) {
      performAuthCheck(parentPath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
    } else {
      performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
    }

    String key = pathToKey(absolutePath);

    // Capture the metadata for the path.
    FileMetadata metaFile = null;
    try {
      metaFile = store.retrieveMetadata(key);
    } catch (IOException e) {

      Throwable innerException = checkForAzureStorageException(e);

      if (innerException instanceof StorageException
          && isFileNotFoundException((StorageException) innerException)) {

        return false;
      }
      throw e;
    }

    if (null == metaFile) {
      // The path to be deleted does not exist.
      return false;
    }

    FileMetadata parentMetadata = null;
    String parentKey = null;
    if (parentPath != null) {
      parentKey = pathToKey(parentPath);

      try {
        parentMetadata = store.retrieveMetadata(parentKey);
      } catch (IOException e) {
         Throwable innerException = checkForAzureStorageException(e);
         if (innerException instanceof StorageException) {
           // Invalid State.
           // A FileNotFoundException is not thrown here as the API returns false
           // if the file not present. But not retrieving metadata here is an
           // unrecoverable state and can only happen if there is a race condition
           // hence throwing a IOException
           if (isFileNotFoundException((StorageException) innerException)) {
             throw new IOException("File " + f + " has a parent directory "
               + parentPath + " whose metadata cannot be retrieved. Can't resolve");
           }
         }
         throw e;
      }

      // Same case as unable to retrieve metadata
      if (parentMetadata == null) {
          throw new IOException("File " + f + " has a parent directory "
              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
      }

      if (!parentMetadata.isDirectory()) {
         // Invalid state: the parent path is actually a file. Throw.
         throw new AzureException("File " + f + " has a parent directory "
             + parentPath + " which is also a file. Can't resolve.");
      }
    }

    // The path exists, determine if it is a folder containing objects,
    // an empty folder, or a simple file and take the appropriate actions.
    if (!metaFile.isDirectory()) {
      // The path specifies a file. We need to check the parent path
      // to make sure it's a proper materialized directory before we
      // delete the file. Otherwise we may get into a situation where
      // the file we were deleting was the last one in an implicit directory
      // (e.g. the blob store only contains the blob a/b and there's no
      // corresponding directory blob a) and that would implicitly delete
      // the directory as well, which is not correct.

      if (parentPath != null && parentPath.getParent() != null) {// Not root

        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
          LOG.debug("Found an implicit parent directory while trying to"
              + " delete the file {}. Creating the directory blob for"
              + " it in {}.", f, parentKey);

          store.storeEmptyFolder(parentKey,
              createPermissionStatus(FsPermission.getDefault()));
        } else {
          if (!skipParentFolderLastModifiedTimeUpdate) {
            updateParentFolderLastModifiedTime(key);
          }
        }
      }

      // check if the file can be deleted based on sticky bit check
      // This check will be performed only when authorization is enabled
      if (isStickyBitCheckViolated(metaFile, parentMetadata)) {
        throw new WasbAuthorizationException(String.format("%s has sticky bit set. "
          + "File %s cannot be deleted.", parentPath, f));
      }

      try {
        if (store.delete(key)) {
          instrumentation.fileDeleted();
        } else {
          return false;
        }
      } catch(IOException e) {

        Throwable innerException = checkForAzureStorageException(e);

        if (innerException instanceof StorageException
            && isFileNotFoundException((StorageException) innerException)) {
          return false;
        }

       throw e;
      }
    } else {
      // The path specifies a folder. Recursively delete all entries under the
      // folder.
      LOG.debug("Directory Delete encountered: {}", f);
      if (parentPath != null && parentPath.getParent() != null) {

        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
          LOG.debug("Found an implicit parent directory while trying to"
                  + " delete the directory {}. Creating the directory blob for"
                  + " it in {}. ", f, parentKey);

          store.storeEmptyFolder(parentKey,
                  createPermissionStatus(FsPermission.getDefault()));
        }
      }

      // check if the folder can be deleted based on sticky bit check on parent
      // This check will be performed only when authorization is enabled.
      if (!metaFile.getKey().equals("/")
          && isStickyBitCheckViolated(metaFile, parentMetadata)) {

        throw new WasbAuthorizationException(String.format("%s has sticky bit set. "
          + "File %s cannot be deleted.", parentPath, f));
      }

      // Iterate through folder contents and get the list of files
      // and folders that can be deleted. We might encounter IOException
      // while listing blobs. In such cases, we return false.
      ArrayList<FileMetadata> fileMetadataList = new ArrayList<>();
      boolean isPartialDelete = false;

      // Start time for list operation
      long start = Time.monotonicNow();

      try {
        // Get list of files/folders that can be deleted
        // based on authorization checks and stickybit checks
        isPartialDelete = getFolderContentsToDelete(metaFile, fileMetadataList);
      } catch (IOException e) {
        Throwable innerException = checkForAzureStorageException(e);

        if (innerException instanceof StorageException
            && isFileNotFoundException((StorageException) innerException)) {
            return false;
        }
        throw e;
      }

      long end = Time.monotonicNow();
      LOG.debug("Time taken to list {} blobs for delete operation: {} ms",
        fileMetadataList.size(), (end - start));

      // Here contents holds the list of metadata of the files and folders that can be deleted
      // under the path that is requested for delete(excluding itself).
      final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);

      if (contents.length > 0 && !recursive) {
          // The folder is non-empty and recursive delete was not specified.
          // Throw an exception indicating that a non-recursive delete was
          // specified for a non-empty folder.
          throw new IOException("Non-recursive delete of non-empty directory "
              + f);
      }

      // Delete all files / folders in current directory stored as list in 'contents'.
      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
        @Override
        public boolean execute(FileMetadata file) throws IOException{
          if (!deleteFile(file.getKey(), file.isDirectory())) {
            LOG.warn("Attempt to delete non-existent {} {}",
                file.isDirectory() ? "directory" : "file",
                file.getKey());
          }
          return true;
        }
      };

      AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount,
          "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS);

      if (!executor.executeParallel(contents, task)) {
        LOG.error("Failed to delete files / subfolders in blob {}", key);
        return false;
      }

      if (metaFile.getKey().equals("/")) {
        LOG.error("Cannot delete root directory {}", f);
        return false;
      }

      // Delete the current directory if all underlying contents are deleted
      if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
          && !deleteFile(metaFile.getKey(), metaFile.isDirectory()))) {
        LOG.error("Failed delete directory : {}", f);
        return false;
      }

      // Update parent directory last modified time
      Path parent = absolutePath.getParent();
      if (parent != null && parent.getParent() != null) { // not root
        if (!skipParentFolderLastModifiedTimeUpdate) {
          updateParentFolderLastModifiedTime(key);
        }
      }
    }

    // File or directory was successfully deleted.
    LOG.debug("Delete Successful for : {}", f);
    return true;
  }

  private boolean deleteWithoutAuth(Path f, boolean recursive,
      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {

    LOG.debug("Deleting file: {}", f);

    Path absolutePath = makeAbsolute(f);
    Path parentPath = absolutePath.getParent();

    String key = pathToKey(absolutePath);

    // Capture the metadata for the path.
    //
    FileMetadata metaFile = null;
    try {
      metaFile = store.retrieveMetadata(key);
    } catch (IOException e) {

      Throwable innerException = checkForAzureStorageException(e);

      if (innerException instanceof StorageException
          && isFileNotFoundException((StorageException) innerException)) {

        return false;
      }
      throw e;
    }

    if (null == metaFile) {
      // The path to be deleted does not exist.
      return false;
    }

    // The path exists, determine if it is a folder containing objects,
    // an empty folder, or a simple file and take the appropriate actions.
    if (!metaFile.isDirectory()) {
      // The path specifies a file. We need to check the parent path
      // to make sure it's a proper materialized directory before we
      // delete the file. Otherwise we may get into a situation where
      // the file we were deleting was the last one in an implicit directory
      // (e.g. the blob store only contains the blob a/b and there's no
      // corresponding directory blob a) and that would implicitly delete
      // the directory as well, which is not correct.

      if (parentPath.getParent() != null) {// Not root
        String parentKey = pathToKey(parentPath);

        FileMetadata parentMetadata = null;
        try {
          parentMetadata = store.retrieveMetadata(parentKey);
        } catch (IOException e) {

          Throwable innerException = checkForAzureStorageException(e);

          if (innerException instanceof StorageException) {
            // Invalid State.
            // A FileNotFoundException is not thrown here as the API returns false
            // if the file not present. But not retrieving metadata here is an
            // unrecoverable state and can only happen if there is a race condition
            // hence throwing a IOException
            if (isFileNotFoundException((StorageException) innerException)) {
              throw new IOException("File " + f + " has a parent directory "
                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
            }
          }
          throw e;
        }

        // Invalid State.
        // A FileNotFoundException is not thrown here as the API returns false
        // if the file not present. But not retrieving metadata here is an
        // unrecoverable state and can only happen if there is a race condition
        // hence throwing a IOException
        if (parentMetadata == null) {
          throw new IOException("File " + f + " has a parent directory "
              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
        }

        if (!parentMetadata.isDirectory()) {
          // Invalid state: the parent path is actually a file. Throw.
          throw new AzureException("File " + f + " has a parent directory "
              + parentPath + " which is also a file. Can't resolve.");
        }

        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
          LOG.debug("Found an implicit parent directory while trying to"
              + " delete the file {}. Creating the directory blob for"
              + " it in {}.", f, parentKey);

          store.storeEmptyFolder(parentKey,
              createPermissionStatus(FsPermission.getDefault()));
        } else {
          if (!skipParentFolderLastModifiedTimeUpdate) {
            updateParentFolderLastModifiedTime(key);
          }
        }
      }

      try {
        if (store.delete(key)) {
          instrumentation.fileDeleted();
        } else {
          return false;
        }
      } catch(IOException e) {

        Throwable innerException = checkForAzureStorageException(e);

        if (innerException instanceof StorageException
            && isFileNotFoundException((StorageException) innerException)) {
          return false;
        }

       throw e;
      }
    } else {
      // The path specifies a folder. Recursively delete all entries under the
      // folder.
      LOG.debug("Directory Delete encountered: {}", f);
      if (parentPath.getParent() != null) {
        String parentKey = pathToKey(parentPath);
        FileMetadata parentMetadata = null;

        try {
          parentMetadata = store.retrieveMetadata(parentKey);
        } catch (IOException e) {

          Throwable innerException = checkForAzureStorageException(e);

          if (innerException instanceof StorageException) {
            // Invalid State.
            // A FileNotFoundException is not thrown here as the API returns false
            // if the file not present. But not retrieving metadata here is an
            // unrecoverable state and can only happen if there is a race condition
            // hence throwing a IOException
            if (isFileNotFoundException((StorageException) innerException)) {
              throw new IOException("File " + f + " has a parent directory "
                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
            }
          }
          throw e;
        }

        // Invalid State.
        // A FileNotFoundException is not thrown here as the API returns false
        // if the file not present. But not retrieving metadata here is an
        // unrecoverable state and can only happen if there is a race condition
        // hence throwing a IOException
        if (parentMetadata == null) {
          throw new IOException("File " + f + " has a parent directory "
              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
        }

        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
          LOG.debug("Found an implicit parent directory while trying to"
              + " delete the directory {}. Creating the directory blob for"
              + " it in {}. ", f, parentKey);

          store.storeEmptyFolder(parentKey,
              createPermissionStatus(FsPermission.getDefault()));
        }
      }

      // Start time for list operation
      long start = Time.monotonicNow();
      final FileMetadata[] contents;

      // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
      try {
        contents = store.list(key, AZURE_LIST_ALL,
            AZURE_UNBOUNDED_DEPTH);
      } catch (IOException e) {
        Throwable innerException = checkForAzureStorageException(e);

        if (innerException instanceof StorageException
            && isFileNotFoundException((StorageException) innerException)) {
          return false;
        }

        throw e;
      }

      long end = Time.monotonicNow();
      LOG.debug("Time taken to list {} blobs for delete operation: {} ms", contents.length, (end - start));

      if (contents.length > 0) {
        if (!recursive) {
          // The folder is non-empty and recursive delete was not specified.
          // Throw an exception indicating that a non-recursive delete was
          // specified for a non-empty folder.
          throw new IOException("Non-recursive delete of non-empty directory "+ f);
        }
      }

      // Delete all files / folders in current directory stored as list in 'contents'.
      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
        @Override
        public boolean execute(FileMetadata file) throws IOException{
          if (!deleteFile(file.getKey(), file.isDirectory())) {
            LOG.warn("Attempt to delete non-existent {} {}",
                file.isDirectory() ? "directory" : "file",
                file.getKey());
          }
          return true;
        }
      };

      AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount,
          "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS);

      if (!executor.executeParallel(contents, task)) {
        LOG.error("Failed to delete files / subfolders in blob {}", key);
        return false;
      }

      // Delete the current directory
      if (store.retrieveMetadata(metaFile.getKey()) != null
          && !deleteFile(metaFile.getKey(), metaFile.isDirectory())) {
        LOG.error("Failed delete directory : {}", f);
        return false;
      }

      // Update parent directory last modified time
      Path parent = absolutePath.getParent();
      if (parent != null && parent.getParent() != null) { // not root
        if (!skipParentFolderLastModifiedTimeUpdate) {
          updateParentFolderLastModifiedTime(key);
        }
      }
    }

    // File or directory was successfully deleted.
    LOG.debug("Delete Successful for : {}", f);
    return true;
  }

  /**
   * Delete the specified file or folder. The parameter
   * skipParentFolderLastModifiedTimeUpdate
   * is used in the case of atomic folder rename redo. In that case, there is
   * a lease on the parent folder, so (without reworking the code) modifying
   * the parent folder update time will fail because of a conflict with the
   * lease. Since we are going to delete the folder soon anyway so accurate
   * modified time is not necessary, it's easier to just skip
   * the modified time update.
   *
   * @param f file path to be deleted.
   * @param recursive specify deleting recursively or not.
   * @param skipParentFolderLastModifiedTimeUpdate If true, don't update the folder last
   * modified time.
   * @return true if and only if the file is deleted
   * @throws IOException Thrown when fail to delete file or directory.
   */
  public boolean delete(Path f, boolean recursive,
      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {

    if (this.azureAuthorization) {
      return deleteWithAuthEnabled(f, recursive,
        skipParentFolderLastModifiedTimeUpdate);
    } else {
      return deleteWithoutAuth(f, recursive,
        skipParentFolderLastModifiedTimeUpdate);
    }
  }

  public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount,
      String threadNamePrefix, String operation, String key, String config) {
    return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config);
  }

  /**
   * Gets list of contents that can be deleted based on authorization check calls
   * performed on the sub-tree for the folderToDelete.
   *
   * @param folderToDelete - metadata of the folder whose delete is requested.
   * @param finalList - list of metadata of all files/folders that can be deleted .
   *
   * @return 'true' only if all the contents of the folderToDelete can be deleted
   * @throws IOException Thrown when current user cannot be retrieved.
   */
  private boolean getFolderContentsToDelete(FileMetadata folderToDelete,
      ArrayList<FileMetadata> finalList) throws IOException {

    final int maxListingDepth = 1;
    Stack<FileMetadata> foldersToProcess = new Stack<FileMetadata>();
    HashMap<String, FileMetadata> folderContentsMap = new HashMap<String, FileMetadata>();

    boolean isPartialDelete = false;

    Path pathToDelete = makeAbsolute(folderToDelete.getPath());
    foldersToProcess.push(folderToDelete);

    while (!foldersToProcess.empty()) {

      FileMetadata currentFolder = foldersToProcess.pop();
      Path currentPath = makeAbsolute(currentFolder.getPath());
      boolean canDeleteChildren = true;

      // If authorization is enabled, check for 'write' permission on current folder
      // This check maps to subfolders 'write' check for deleting contents recursively.
      try {
        performAuthCheck(currentPath, WasbAuthorizationOperations.WRITE, "delete", pathToDelete);
      } catch (WasbAuthorizationException we) {
        LOG.debug("Authorization check failed for {}", currentPath);
        // We cannot delete the children of currentFolder since 'write' check on parent failed
        canDeleteChildren = false;
      }

      if (canDeleteChildren) {

        // get immediate children list
        FileMetadata[] fileMetadataList = store.list(currentFolder.getKey(),
            AZURE_LIST_ALL, maxListingDepth);

        // Process children of currentFolder and add them to list of contents
        // that can be deleted. We Perform stickybit check on every file and
        // folder under currentFolder in case stickybit is set on currentFolder.
        for (FileMetadata childItem : fileMetadataList) {
          if (isStickyBitCheckViolated(childItem, currentFolder, false)) {
            // Stickybit check failed for the childItem that is being processed.
            // This file/folder cannot be deleted and neither can the parent paths be deleted.
            // Remove parent paths from list of contents that can be deleted.
            canDeleteChildren = false;
            Path filePath = makeAbsolute(childItem.getPath());
            LOG.error("User does not have permissions to delete {}. "
              + "Parent directory has sticky bit set.", filePath);
          } else {
            // push the child directories to the stack to process their contents
            if (childItem.isDirectory()) {
              foldersToProcess.push(childItem);
            }
            // Add items to list of contents that can be deleted.
            folderContentsMap.put(childItem.getKey(), childItem);
          }
        }

      } else {
        // Cannot delete children since parent permission check has not passed and
        // if there are files/folders under currentFolder they will not be deleted.
        LOG.error("Authorization check failed. Files or folders under {} "
          + "will not be processed for deletion.", currentPath);
      }

      if (!canDeleteChildren) {
        // We reach here if
        // 1. cannot delete children since 'write' check on parent failed or
        // 2. One of the files under the current folder cannot be deleted due to stickybit check.
        // In this case we remove all the parent paths from the list of contents
        // that can be deleted till we reach the original path of delete request
        String pathToRemove = currentFolder.getKey();
        while (!pathToRemove.equals(folderToDelete.getKey())) {
          if (folderContentsMap.containsKey(pathToRemove)) {
            LOG.debug("Cannot delete {} since some of its contents "
              + "cannot be deleted", pathToRemove);
            folderContentsMap.remove(pathToRemove);
          }
          Path parentPath = keyToPath(pathToRemove).getParent();
          pathToRemove = pathToKey(parentPath);
        }
        // Since one or more files/folders cannot be deleted return value should indicate
        // partial delete, so that the delete on the path requested by user is not performed
        isPartialDelete = true;
      }
    }

    // final list of contents that can be deleted
    for (HashMap.Entry<String, FileMetadata> entry : folderContentsMap.entrySet()) {
      finalList.add(entry.getValue());
    }

    return isPartialDelete;
  }

  private boolean isStickyBitCheckViolated(FileMetadata metaData,
    FileMetadata parentMetadata, boolean throwOnException) throws IOException {
      try {
        return isStickyBitCheckViolated(metaData, parentMetadata);
      } catch (FileNotFoundException ex) {
        if (throwOnException) {
          throw ex;
        } else {
          LOG.debug("Encountered FileNotFoundException while performing "
            + "stickybit check operation for {}", metaData.getKey());
          // swallow exception and return that stickyBit check has been violated
          return true;
        }
      }
  }

  /**
   * Checks if the Current user is not permitted access to a file/folder when
   * sticky bit is set on parent path. Only the owner of parent path
   * and owner of the file/folder itself are permitted to perform certain
   * operations on file/folder based on sticky bit check. Sticky bit check will
   * be performed only when authorization is enabled.
   *
   * @param metaData - metadata of the file/folder whose parent has sticky bit set.
   * @param parentMetadata - metadata of the parent.
   *
   * @return true if Current user violates stickybit check
   * @throws IOException Thrown when current user cannot be retrieved.
   */
   private boolean isStickyBitCheckViolated(FileMetadata metaData,
    FileMetadata parentMetadata) throws IOException {

    // In case stickybit check should not be performed,
    // return value should indicate stickybit check is not violated.
    if (!this.azureAuthorization) {
      return false;
    }

    // This should never happen when the sticky bit check is invoked.
    if (parentMetadata == null) {
      throw new FileNotFoundException(
        String.format("Parent metadata for '%s' not found!", metaData.getKey()));
    }

    // stickybit is not set on parent and hence cannot be violated
    if (!parentMetadata.getPermission().getStickyBit()) {
      return false;
    }

    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
    String parentDirectoryOwner = parentMetadata.getOwner();
    String currentFileOwner = metaData.getOwner();

    // Files/Folders with no owner set will not pass stickybit check
    if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
      || currentFileOwner.equalsIgnoreCase(currentUser)) {

      return false;
    }
    return true;
  }

  /**
   * Delete the specified file or directory and increment metrics.
   * If the file or directory does not exist, the operation returns false.
   * @param path the path to a file or directory.
   * @param isDir true if the path is a directory; otherwise false.
   * @return true if delete is successful; otherwise false.
   * @throws IOException if an IO error occurs while attempting to delete the
   * path.
   *
   */
  @VisibleForTesting
  boolean deleteFile(String path, boolean isDir) throws IOException {
    if (!store.delete(path)) {
      return false;
    }

    if (isDir) {
      instrumentation.directoryDeleted();
    } else {
      instrumentation.fileDeleted();
    }
    return true;
  }

  @Override
  public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {

    LOG.debug("Getting the file status for {}", f.toString());
    return getFileStatusInternal(f);
  }

  /**
   * Checks if a given path exists in the filesystem.
   * Calls getFileStatusInternal and has the same costs
   * as the public facing exists call.
   * This internal version of the exists call does not perform
   * authorization checks, and is used internally by various filesystem
   * operations that need to check if the parent/ancestor/path exist.
   * The idea is to avoid having to configure authorization policies for
   * these internal calls.
   * @param f the path to a file or directory.
   * @return true if path exists; otherwise false.
   * @throws IOException if an IO error occurs while attempting to check
   * for existence of the path.
   *
   */
  protected boolean existsInternal(Path f) throws IOException {
    try {
      this.getFileStatusInternal(f);
      return true;
    } catch (FileNotFoundException fnfe) {
      return false;
    }
  }

  /**
   * Inner implementation of {@link #getFileStatus(Path)}.
   * Return a file status object that represents the path.
   * @param f The path we want information from
   * @return a FileStatus object
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException Other failure
   */
  private FileStatus getFileStatusInternal(Path f) throws FileNotFoundException, IOException {

    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);
    if (key.length() == 0) { // root always exists
      return new FileStatus(
          0,
          true,
          1,
          store.getHadoopBlockSize(),
          0,
          0,
          FsPermission.getDefault(), "", "",
          absolutePath.makeQualified(getUri(), getWorkingDirectory()));
    }

    // The path is either a folder or a file. Retrieve metadata to
    // determine if it is a directory or file.
    FileMetadata meta = null;
    try {
      meta = store.retrieveMetadata(key);
    } catch(Exception ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

          throw new FileNotFoundException(String.format("%s is not found", key));
       }

      throw ex;
    }

    if (meta != null) {
      if (meta.isDirectory()) {
        // The path is a folder with files in it.
        //

        LOG.debug("Path {} is a folder.", f.toString());

        // If a rename operation for the folder was pending, redo it.
        // Then the file does not exist, so signal that.
        if (conditionalRedoFolderRename(f)) {
          throw new FileNotFoundException(
              absolutePath + ": No such file or directory.");
        }

        // Return reference to the directory object.
        return updateFileStatusPath(meta, absolutePath);
      }

      // The path is a file.
      LOG.debug("Found the path: {} as a file.", f.toString());

      // Return with reference to a file object.
      return updateFileStatusPath(meta, absolutePath);
    }

    // File not found. Throw exception no such file or directory.
    //
    throw new FileNotFoundException(
        absolutePath + ": No such file or directory.");
  }

  // Return true if there is a rename pending and we redo it, otherwise false.
  private boolean conditionalRedoFolderRename(Path f) throws IOException {

    // Can't rename /, so return immediately in that case.
    if (f.getName().equals("")) {
      return false;
    }

    // Check if there is a -RenamePending.json file for this folder, and if so,
    // redo the rename.
    Path absoluteRenamePendingFile = renamePendingFilePath(f);
    if (existsInternal(absoluteRenamePendingFile)) {
      FolderRenamePending pending =
          new FolderRenamePending(absoluteRenamePendingFile, this);
      pending.redo();
      return true;
    } else {
      return false;
    }
  }

  // Return the path name that would be used for rename of folder with path f.
  private Path renamePendingFilePath(Path f) {
    Path absPath = makeAbsolute(f);
    String key = pathToKey(absPath);
    key += "-RenamePending.json";
    return keyToPath(key);
  }

  @Override
  public URI getUri() {
    return uri;
  }

  /**
   * Retrieve the status of a given path if it is a file, or of all the
   * contained files if it is a directory.
   */
  @Override
  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {

    LOG.debug("Listing status for {}", f.toString());

    Path absolutePath = makeAbsolute(f);

    performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath);

    String key = pathToKey(absolutePath);

    FileMetadata meta = null;
    try {
      meta = store.retrieveMetadata(key);
    } catch (IOException ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("%s is not found", f));
      }

      throw ex;
    }

    if (meta == null) {
      // There is no metadata found for the path.
      LOG.debug("Did not find any metadata for path: {}", key);
      throw new FileNotFoundException(f + " is not found");
    }

    if (!meta.isDirectory()) {
      LOG.debug("Found path as a file");
      return new FileStatus[] { updateFileStatusPath(meta, absolutePath) };
    }

    FileMetadata[] listing;

    listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);

    // NOTE: We don't check for Null condition as the Store API should return
    // an empty list if there are not listing.

    // For any -RenamePending.json files in the listing,
    // push the rename forward.
    boolean renamed = conditionalRedoFolderRenames(listing);

    // If any renames were redone, get another listing,
    // since the current one may have changed due to the redo.
    if (renamed) {
      listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
    }

    // We only need to check for AZURE_TEMP_FOLDER if the key is the root,
    // and if it is not the root we also know the exact size of the array
    // of FileStatus.

    FileMetadata[] result = null;

    if (key.equals("/")) {
      ArrayList<FileMetadata> status = new ArrayList<>(listing.length);

      for (FileMetadata fileMetadata : listing) {
        if (fileMetadata.isDirectory()) {
          // Make sure we hide the temp upload folder
          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
            // Don't expose that.
            continue;
          }
          status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
        } else {
          status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
        }
      }
      result = status.toArray(new FileMetadata[0]);
    } else {
      for (int i = 0; i < listing.length; i++) {
        FileMetadata fileMetadata = listing[i];
          listing[i] = updateFileStatusPath(fileMetadata, fileMetadata.getPath());
      }
      result = listing;
    }

    LOG.debug("Found path as a directory with {}"
        + " files in it.", result.length);

    return result;
  }

  private FileMetadata[] listWithErrorHandling(String prefix, final int maxListingCount,
                                              final int maxListingDepth) throws IOException {
    try {
      return store.list(prefix, maxListingCount, maxListingDepth);
    } catch (IOException ex) {
      Throwable innerException
          = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException(
          (StorageException) innerException)) {
        throw new FileNotFoundException(String.format("%s is not found", prefix));
      }
      throw ex;
    }
  }

  // Redo any folder renames needed if there are rename pending files in the
  // directory listing. Return true if one or more redo operations were done.
  private boolean conditionalRedoFolderRenames(FileMetadata[] listing)
      throws IllegalArgumentException, IOException {
    boolean renamed = false;
    for (FileMetadata fileMetadata : listing) {
      Path subpath = fileMetadata.getPath();
      if (isRenamePendingFile(subpath)) {
        FolderRenamePending pending =
            new FolderRenamePending(subpath, this);
        pending.redo();
        renamed = true;
      }
    }
    return renamed;
  }

  // True if this is a folder rename pending file, else false.
  private boolean isRenamePendingFile(Path path) {
    return path.toString().endsWith(FolderRenamePending.SUFFIX);
  }

  private FileMetadata updateFileStatusPath(FileMetadata meta, Path path) {
    meta.setPath(path.makeQualified(getUri(), getWorkingDirectory()));
    // reduce memory use by setting the internal-only key to null
    meta.removeKey();
    return meta;
  }

  private static enum UMaskApplyMode {
    NewFile,
    NewDirectory,
    NewDirectoryNoUmask,
    ChangeExistingFile,
    ChangeExistingDirectory,
  }

  /**
   * Applies the applicable UMASK's on the given permission.
   *
   * @param permission
   *          The permission to mask.
   * @param applyMode
   *          Whether to also apply the default umask.
   * @return The masked persmission.
   */
  private FsPermission applyUMask(final FsPermission permission,
      final UMaskApplyMode applyMode) {
    FsPermission newPermission = new FsPermission(permission);
    // Apply the default umask - this applies for new files or directories.
    if (applyMode == UMaskApplyMode.NewFile
        || applyMode == UMaskApplyMode.NewDirectory) {
      newPermission = newPermission
          .applyUMask(FsPermission.getUMask(getConf()));
    }
    return newPermission;
  }

  /**
   * Creates the PermissionStatus object to use for the given permission, based
   * on the current user in context.
   *
   * @param permission
   *          The permission for the file.
   * @return The permission status object to use.
   * @throws IOException
   *           If login fails in getCurrentUser
   */
  @VisibleForTesting
  PermissionStatus createPermissionStatus(FsPermission permission)
    throws IOException {
    // Create the permission status for this file based on current user
    return new PermissionStatus(
        UserGroupInformation.getCurrentUser().getShortUserName(),
        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
            AZURE_DEFAULT_GROUP_DEFAULT),
        permission);
  }

  private Path getAncestor(Path f) throws IOException {

    for (Path current = f, parent = current.getParent();
         parent != null; // Stop when you get to the root
         current = parent, parent = current.getParent()) {

      String currentKey = pathToKey(current);
      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
      if (currentMetadata != null && currentMetadata.isDirectory()) {
        Path ancestor = currentMetadata.getPath();
        LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
        return ancestor;
      }
    }

    return new Path("/");
  }

  @Override
  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
      return mkdirs(f, permission, false);
  }

  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {

    LOG.debug("Creating directory: {}", f.toString());

    if (containsColon(f)) {
      throw new IOException("Cannot create directory " + f
          + " through WASB that has colons in the name");
    }

    Path absolutePath = makeAbsolute(f);
    Path ancestor = getAncestor(absolutePath);

    if (absolutePath.equals(ancestor)) {
      return true;
    }

    performAuthCheck(ancestor, WasbAuthorizationOperations.WRITE, "mkdirs", absolutePath);

    PermissionStatus permissionStatus = null;
    if(noUmask) {
      // ensure owner still has wx permissions at the minimum
      permissionStatus = createPermissionStatus(
          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
              UMaskApplyMode.NewDirectoryNoUmask));
    } else {
      permissionStatus = createPermissionStatus(
          applyUMask(permission, UMaskApplyMode.NewDirectory));
    }


    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
    // Check that there is no file in the parent chain of the given path.
    for (Path current = absolutePath, parent = current.getParent();
        parent != null; // Stop when you get to the root
        current = parent, parent = current.getParent()) {
      String currentKey = pathToKey(current);
      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
      if (currentMetadata != null && !currentMetadata.isDirectory()) {
        throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
            + current + " is an existing file.");
      } else if (currentMetadata == null) {
        keysToCreateAsFolder.add(currentKey);
      }
    }

    for (String currentKey : keysToCreateAsFolder) {
      store.storeEmptyFolder(currentKey, permissionStatus);
    }

    instrumentation.directoryCreated();

    // otherwise throws exception
    return true;
  }

  @Override
  public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
    return open(f, bufferSize, Optional.empty());
  }

  private FSDataInputStream open(Path f, int bufferSize,
      Optional<Configuration> options)
      throws FileNotFoundException, IOException {

    LOG.debug("Opening file: {}", f.toString());

    Path absolutePath = makeAbsolute(f);

    performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "read", absolutePath);

    String key = pathToKey(absolutePath);
    FileMetadata meta = null;
    try {
      meta = store.retrieveMetadata(key);
    } catch(Exception ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("%s is not found", key));
      }

      throw ex;
    }

    if (meta == null) {
      throw new FileNotFoundException(f.toString());
    }
    if (meta.isDirectory()) {
      throw new FileNotFoundException(f.toString()
          + " is a directory not a file.");
    }

    InputStream inputStream;
    try {
      inputStream = store.retrieve(key, 0, options);
    } catch(Exception ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("%s is not found", key));
      }

      throw ex;
    }

    return new FSDataInputStream(new BufferedFSInputStream(
        new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
  }

  @Override
  protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
      OpenFileParameters parameters) throws IOException {
    AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
        parameters.getMandatoryKeys(),
        FS_OPTION_OPENFILE_STANDARD_OPTIONS,
        "for " + path);
    return LambdaUtils.eval(
        new CompletableFuture<>(), () ->
            open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
  }

  @Override
  public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {

    FolderRenamePending renamePending = null;

    LOG.debug("Moving {} to {}", src, dst);

    if (containsColon(dst)) {
      throw new IOException("Cannot rename to file " + dst
          + " through WASB that has colons in the name");
    }

    Path absoluteSrcPath = makeAbsolute(src);
    Path srcParentFolder = absoluteSrcPath.getParent();

    if (srcParentFolder == null) {
      // Cannot rename root of file system
      return false;
    }

    String srcKey = pathToKey(absoluteSrcPath);

    if (srcKey.length() == 0) {
      // Cannot rename root of file system
      return false;
    }

    performAuthCheck(srcParentFolder, WasbAuthorizationOperations.WRITE, "rename",
        absoluteSrcPath);

    if (this.azureAuthorization) {
      try {
        performStickyBitCheckForRenameOperation(absoluteSrcPath, srcParentFolder);
      } catch (FileNotFoundException ex) {
        return false;
      } catch (IOException ex) {
        Throwable innerException = checkForAzureStorageException(ex);
        if (innerException instanceof StorageException
          && isFileNotFoundException((StorageException) innerException)) {
          LOG.debug("Encountered FileNotFound Exception when performing sticky bit check "
            + "on {}. Failing rename", srcKey);
          return false;
        }
        throw ex;
      }
    }

    // Figure out the final destination
    Path absoluteDstPath = makeAbsolute(dst);
    Path dstParentFolder = absoluteDstPath.getParent();

    String dstKey = pathToKey(absoluteDstPath);
    FileMetadata dstMetadata = null;
    try {
      dstMetadata = store.retrieveMetadata(dstKey);
    } catch (IOException ex) {

      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      // A BlobNotFound storage exception in only thrown from retrieveMetadata API when
      // there is a race condition. If there is another thread which deletes the destination
      // file or folder, then this thread calling rename should be able to continue with
      // rename gracefully. Hence the StorageException is swallowed here.
      if (innerException instanceof StorageException) {
        if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
          LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
              + "Swallowing the exception to handle race condition gracefully", dstKey);
        }
      } else {
        throw ex;
      }
    }

    if (dstMetadata != null && dstMetadata.isDirectory()) {
      // It's an existing directory.
      performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename",
          absoluteDstPath);

      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
      LOG.debug("Destination {} "
          + " is a directory, adjusted the destination to be {}", dst, dstKey);
    } else if (dstMetadata != null) {
      // Attempting to overwrite a file using rename()
      LOG.debug("Destination {}"
          + " is an already existing file, failing the rename.", dst);
      return false;
    } else {
      // Check that the parent directory exists.
      FileMetadata parentOfDestMetadata = null;
      try {
        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDstPath.getParent()));
      } catch (IOException ex) {

        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

        if (innerException instanceof StorageException
            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

          LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
          return false;
        }

        throw ex;
      }

      if (parentOfDestMetadata == null) {
        LOG.debug("Parent of the destination {}"
            + " doesn't exist, failing the rename.", dst);
        return false;
      } else if (!parentOfDestMetadata.isDirectory()) {
        LOG.debug("Parent of the destination {}"
            + " is a file, failing the rename.", dst);
        return false;
      } else {
        performAuthCheck(dstParentFolder, WasbAuthorizationOperations.WRITE,
          "rename", absoluteDstPath);
      }
    }
    FileMetadata srcMetadata = null;
    try {
      srcMetadata = store.retrieveMetadata(srcKey);
    } catch (IOException ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        LOG.debug("Source {} doesn't exists. Failing rename", src);
        return false;
      }

      throw ex;
    }

    if (srcMetadata == null) {
      // Source doesn't exist
      LOG.debug("Source {} doesn't exist, failing the rename.", src);
      return false;
    } else if (!srcMetadata.isDirectory()) {
      LOG.debug("Source {} found as a file, renaming.", src);
      try {
        // HADOOP-15086 - file rename must ensure that the destination does
        // not exist.  The fix is targeted to this call only to avoid
        // regressions.  Other call sites are attempting to rename temporary
        // files, redo a failed rename operation, or rename a directory
        // recursively; for these cases the destination may exist.
        store.rename(srcKey, dstKey, false, null,
            false);
      } catch(IOException ex) {
        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

        if (innerException instanceof StorageException) {
          if (NativeAzureFileSystemHelper.isFileNotFoundException(
              (StorageException) innerException)) {
            LOG.debug("BlobNotFoundException encountered. Failing rename", src);
            return false;
          }
          if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
              (StorageException) innerException)) {
            LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
            return false;
          }
        }

        throw ex;
      }
    } else {

      // Prepare for, execute and clean up after of all files in folder, and
      // the root file, and update the last modified time of the source and
      // target parent folders. The operation can be redone if it fails part
      // way through, by applying the "Rename Pending" file.

      // The following code (internally) only does atomic rename preparation
      // and lease management for page blob folders, limiting the scope of the
      // operation to HBase log file folders, where atomic rename is required.
      // In the future, we could generalize it easily to all folders.
      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
      renamePending.execute();

      LOG.debug("Renamed {} to {} successfully.", src, dst);
      renamePending.cleanup();
      return true;
    }

    // Update the last-modified time of the parent folders of both source
    // and destination.
    updateParentFolderLastModifiedTime(srcKey);
    updateParentFolderLastModifiedTime(dstKey);

    LOG.debug("Renamed {} to {} successfully.", src, dst);
    return true;
  }

  /**
   * Update the last-modified time of the parent folder of the file
   * identified by key.
   * @param key
   * @throws IOException
   */
  private void updateParentFolderLastModifiedTime(String key)
      throws IOException {
    Path parent = makeAbsolute(keyToPath(key)).getParent();
    if (parent != null && parent.getParent() != null) { // not root
      String parentKey = pathToKey(parent);

      // ensure the parent is a materialized folder
      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
      // The metadata could be null if the implicit folder only contains a
      // single file. In this case, the parent folder no longer exists if the
      // file is renamed; so we can safely ignore the null pointer case.
      if (parentMetadata != null) {
        if (parentMetadata.isDirectory()
            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
          store.storeEmptyFolder(parentKey,
              createPermissionStatus(FsPermission.getDefault()));
        }

        if (store.isAtomicRenameKey(parentKey)) {
          SelfRenewingLease lease = null;
          try {
            lease = leaseSourceFolder(parentKey);
            store.updateFolderLastModifiedTime(parentKey, lease);
          } catch (AzureException e) {
            String errorCode = "";
            try {
              StorageException e2 = (StorageException) e.getCause();
              errorCode = e2.getErrorCode();
            } catch (Exception e3) {
              // do nothing if cast fails
            }
            if (errorCode.equals("BlobNotFound")) {
              throw new FileNotFoundException("Folder does not exist: " + parentKey);
            }
            LOG.warn("Got unexpected exception trying to get lease on {}. {}",
                parentKey, e.getMessage());
            throw e;
          } finally {
            try {
              if (lease != null) {
                lease.free();
              }
            } catch (Exception e) {
              LOG.error("Unable to free lease on {}", parentKey, e);
            }
          }
        } else {
          store.updateFolderLastModifiedTime(parentKey, null);
        }
      }
    }
  }

  /**
   * If the source is a page blob folder,
   * prepare to rename this folder atomically. This means to get exclusive
   * access to the source folder, and record the actions to be performed for
   * this rename in a "Rename Pending" file. This code was designed to
   * meet the needs of HBase, which requires atomic rename of write-ahead log
   * (WAL) folders for correctness.
   *
   * Before calling this method, the caller must ensure that the source is a
   * folder.
   *
   * For non-page-blob directories, prepare the in-memory information needed,
   * but don't take the lease or write the redo file. This is done to limit the
   * scope of atomic folder rename to HBase, at least at the time of writing
   * this code.
   *
   * @param srcKey Source folder name.
   * @param dstKey Destination folder name.
   * @throws IOException
   */
  @VisibleForTesting
  FolderRenamePending prepareAtomicFolderRename(
      String srcKey, String dstKey) throws IOException {

    if (store.isAtomicRenameKey(srcKey)) {

      // Block unwanted concurrent access to source folder.
      SelfRenewingLease lease = leaseSourceFolder(srcKey);

      // Prepare in-memory information needed to do or redo a folder rename.
      FolderRenamePending renamePending =
          new FolderRenamePending(srcKey, dstKey, lease, this);

      // Save it to persistent storage to help recover if the operation fails.
      renamePending.writeFile(this);
      return renamePending;
    } else {
      FolderRenamePending renamePending =
          new FolderRenamePending(srcKey, dstKey, null, this);
      return renamePending;
    }
  }

  /**
   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
   */
  private SelfRenewingLease leaseSourceFolder(String srcKey)
      throws AzureException {
    return store.acquireLease(srcKey);
  }

  /**
   * Performs sticky bit check on source folder for rename operation.
   *
   * @param srcPath - path which is to be renamed.
   * @param srcParentPath - parent to srcPath to check for stickybit check.
   * @throws FileNotFoundException if srcPath or srcParentPath do not exist.
   * @throws WasbAuthorizationException if stickybit check is violated.
   * @throws IOException when retrieving metadata operation fails.
   */
  private void performStickyBitCheckForRenameOperation(Path srcPath,
      Path srcParentPath)
    throws FileNotFoundException, WasbAuthorizationException, IOException {

    String srcKey = pathToKey(srcPath);
    FileMetadata srcMetadata = null;
    srcMetadata = store.retrieveMetadata(srcKey);
    if (srcMetadata == null) {
      LOG.debug("Source {} doesn't exist. Failing rename.", srcPath);
      throw new FileNotFoundException(
        String.format("%s does not exist.", srcPath));
    }

    String parentkey = pathToKey(srcParentPath);
    FileMetadata parentMetadata = store.retrieveMetadata(parentkey);
    if (parentMetadata == null) {
      LOG.debug("Path {} doesn't exist, failing rename.", srcParentPath);
      throw new FileNotFoundException(
        String.format("%s does not exist.", parentkey));
    }

    if (isStickyBitCheckViolated(srcMetadata, parentMetadata)) {
      throw new WasbAuthorizationException(
        String.format("Rename operation for %s is not permitted."
        + " Details : Stickybit check failed.", srcPath));
    }
  }

  /**
   * Set the working directory to the given directory.
   */
  @Override
  public void setWorkingDirectory(Path newDir) {
    workingDir = makeAbsolute(newDir);
  }

  @Override
  public Path getWorkingDirectory() {
    return workingDir;
  }

  @Override
  public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
    Path absolutePath = makeAbsolute(p);

    String key = pathToKey(absolutePath);
    FileMetadata metadata = null;
    try {
      metadata = store.retrieveMetadata(key);
    } catch (IOException ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
      }

      throw ex;
    }

    if (metadata == null) {
      throw new FileNotFoundException("File doesn't exist: " + p);
    }

    // If authorization is enabled, check if the user is
    // part of chmod allowed list or a daemon user or owner of the file/folder
    if (azureAuthorization) {
      UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();

      // Check if the user is part of chown allowed list or a daemon user.
      if (!isAllowedUser(currentUgi.getShortUserName(), chmodAllowedUsers)
          && !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {

        //Check if the user is the owner of the file.
        String owner = metadata.getOwner();
        if (!currentUgi.getShortUserName().equals(owner)) {
          throw new WasbAuthorizationException(
              String.format("user '%s' does not have the privilege to "
                  + "change the permission of files/folders.",
                      currentUgi.getShortUserName()));
        }
      }
    }

    permission = applyUMask(permission,
        metadata.isDirectory() ? UMaskApplyMode.ChangeExistingDirectory
            : UMaskApplyMode.ChangeExistingFile);
    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
      // It's an implicit folder, need to materialize it.
      store.storeEmptyFolder(key, createPermissionStatus(permission));
    } else if (!metadata.getPermission().
        equals(permission)) {
      store.changePermissionStatus(key, new PermissionStatus(
          metadata.getOwner(),
          metadata.getGroup(),
          permission));
    }
  }

  @Override
  public void setOwner(Path p, String username, String groupname)
      throws IOException {
    Path absolutePath = makeAbsolute(p);

    String key = pathToKey(absolutePath);
    FileMetadata metadata = null;

    try {
      metadata = store.retrieveMetadata(key);
    } catch (IOException ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);

      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {

        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
      }

      throw ex;
    }

    if (metadata == null) {
      throw new FileNotFoundException("File doesn't exist: " + p);
    }

    /* If authorization is enabled, check if the user has privileges
    *  to change the ownership of file/folder
    */
    if (this.azureAuthorization && username != null) {
      UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();

      if (!isAllowedUser(currentUgi.getShortUserName(),
          chownAllowedUsers)) {
          throw new WasbAuthorizationException(
            String.format("user '%s' does not have the privilege to change "
                + "the ownership of files/folders.",
                    currentUgi.getShortUserName()));
      }
    }

    PermissionStatus newPermissionStatus = new PermissionStatus(
        username == null ?
            metadata.getOwner() : username,
        groupname == null ?
            metadata.getGroup() : groupname,
        metadata.getPermission());
    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
      // It's an implicit folder, need to materialize it.
      store.storeEmptyFolder(key, newPermissionStatus);
    } else {
      store.changePermissionStatus(key, newPermissionStatus);
    }
  }

  /**
   * Set the value of an attribute for a path.
   *
   * @param path The path on which to set the attribute
   * @param xAttrName The attribute to set
   * @param value The byte value of the attribute to set (encoded in utf-8)
   * @param flag The mode in which to set the attribute
   * @throws IOException If there was an issue setting the attribute on Azure
   */
  @Override
  public void setXAttr(Path path, String xAttrName, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
    Path absolutePath = makeAbsolute(path);
    performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "setXAttr", absolutePath);

    String key = pathToKey(absolutePath);
    FileMetadata metadata;
    try {
      metadata = store.retrieveMetadata(key);
    } catch (IOException ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
      if (innerException instanceof StorageException
          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
        throw new FileNotFoundException("File " + path + " doesn't exists.");
      }
      throw ex;
    }

    if (metadata == null) {
      throw new FileNotFoundException("File doesn't exist: " + path);
    }

    boolean xAttrExists = store.retrieveAttribute(key, xAttrName) != null;
    XAttrSetFlag.validate(xAttrName, xAttrExists, flag);
    store.storeAttribute(key, xAttrName, value);
  }

  /**
   * Get the value of an attribute for a path.
   *
   * @param path The path on which to get the attribute
   * @param xAttrName The attribute to get
   * @return The bytes of the attribute's value (encoded in utf-8)
   *         or null if the attribute does not exist
   * @throws IOException If there was an issue getting the attribute from Azure
   */
  @Override
  public byte[] getXAttr(Path path, String xAttrName) throws IOException {
    Path absolutePath = makeAbsolute(path);
    performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "getXAttr", absolutePath);

    String key = pathToKey(absolutePath);
    FileMetadata metadata;
    try {
      metadata = store.retrieveMetadata(key);
    } catch (IOException ex) {
      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
      if (innerException instanceof StorageException
              && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
        throw new FileNotFoundException("File " + path + " doesn't exists.");
      }
      throw ex;
    }

    if (metadata == null) {
      throw new FileNotFoundException("File doesn't exist: " + path);
    }

    return store.retrieveAttribute(key, xAttrName);
  }

  /**
   * Is the user allowed?
   * <ol>
   *   <li>No user: false</li>
   *   <li>Empty list: false</li>
   *   <li>List == ["*"]: true</li>
   *   <li>Otherwise: is the user in the list?</li>
   * </ol>
   * @param username user to check; may be null
   * @param userList list of users; may be null or empty
   * @return
   * @throws IllegalArgumentException if the userList is invalid.
   */
  private boolean isAllowedUser(String username, List<String> userList) {

    if (null == userList || userList.isEmpty()) {
      return false;
    }

    boolean shouldSkipUserCheck = userList.size() == 1
        && userList.get(0).equals("*");

    // skip the check if the allowed users config value is set as '*'
    if (!shouldSkipUserCheck) {
      Preconditions.checkArgument(!userList.contains("*"),
        "User list must contain either '*' or a list of user names,"
            + " but not both.");
      return userList.contains(username);
    }
    return true;
  }

  @Override
  public synchronized void close() throws IOException {
    if (isClosed) {
      return;
    }

    // Call the base close() to close any resources there.
    super.close();
    // Close the store to close any resources there - e.g. the bandwidth
    // updater thread would be stopped at this time.
    store.close();
    // Notify the metrics system that this file system is closed, which may
    // trigger one final metrics push to get the accurate final file system
    // metrics out.

    long startTime = System.currentTimeMillis();

    if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
      AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
      AzureFileSystemMetricsSystem.fileSystemClosed();
    }

    LOG.debug("Submitting metrics when file system closed took {} ms.",
        (System.currentTimeMillis() - startTime));
    isClosed = true;
  }

  /**
   * Get a delegation token from remote service endpoint if
   * 'fs.azure.enable.kerberos.support' is set to 'true'.
   * @param renewer the account name that is allowed to renew the token.
   * @return delegation token
   * @throws IOException thrown when getting the current user.
   */
  @Override
  public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
    if (kerberosSupportEnabled) {
      return wasbDelegationTokenManager.getDelegationToken(renewer);
    } else {
      return super.getDelegationToken(renewer);
    }
  }

  @Override
  public void access(Path path, FsAction mode) throws IOException {
    if (azureAuthorization && authorizer != null) {
      try {
        // Required to check the existence of the path.
        getFileStatus(path);
        switch (mode) {
        case READ:
        case READ_EXECUTE:
          performAuthCheck(path, WasbAuthorizationOperations.READ, "access", path);
          break;
        case WRITE:
        case WRITE_EXECUTE:
          performAuthCheck(path, WasbAuthorizationOperations.WRITE, "access",
              path);
          break;
        case READ_WRITE:
        case ALL:
          performAuthCheck(path, WasbAuthorizationOperations.READ, "access", path);
          performAuthCheck(path, WasbAuthorizationOperations.WRITE, "access",
              path);
          break;
        case EXECUTE:
        case NONE:
        default:
          break;
        }
      } catch (WasbAuthorizationException wae){
        throw new AccessControlException(wae);
      }
    } else {
      super.access(path, mode);
    }
  }

  /**
   * A handler that defines what to do with blobs whose upload was
   * interrupted.
   */
  private abstract class DanglingFileHandler {
    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
      throws IOException;
  }

  /**
   * Handler implementation for just deleting dangling files and cleaning
   * them up.
   */
  private class DanglingFileDeleter extends DanglingFileHandler {
    @Override
    void handleFile(FileMetadata file, FileMetadata tempFile)
        throws IOException {

      LOG.debug("Deleting dangling file {}", file.getKey());
      // Not handling delete return type as false return essentially
      // means its a no-op for the caller
      store.delete(file.getKey());
      store.delete(tempFile.getKey());
    }
  }

  /**
   * Handler implementation for just moving dangling files to recovery
   * location (/lost+found).
   */
  private class DanglingFileRecoverer extends DanglingFileHandler {
    private final Path destination;

    DanglingFileRecoverer(Path destination) {
      this.destination = destination;
    }

    @Override
    void handleFile(FileMetadata file, FileMetadata tempFile)
        throws IOException {

      LOG.debug("Recovering {}", file.getKey());
      // Move to the final destination
      String finalDestinationKey =
          pathToKey(new Path(destination, file.getKey()));
      store.rename(tempFile.getKey(), finalDestinationKey);
      if (!finalDestinationKey.equals(file.getKey())) {
        // Delete the empty link file now that we've restored it.
        store.delete(file.getKey());
      }
    }
  }

  /**
   * Check if a path has colons in its name
   */
  private boolean containsColon(Path p) {
    return p.toUri().getPath().toString().contains(":");
  }

  /**
   * Implements recover and delete (-move and -delete) behaviors for handling
   * dangling files (blobs whose upload was interrupted).
   * 
   * @param root
   *          The root path to check from.
   * @param handler
   *          The handler that deals with dangling files.
   */
  private void handleFilesWithDanglingTempData(Path root,
      DanglingFileHandler handler) throws IOException {
    // Calculate the cut-off for when to consider a blob to be dangling.
    long cutoffForDangling = new Date().getTime()
        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
    // Go over all the blobs under the given root and look for blobs to
    // recover.
    FileMetadata[] listing = store.list(pathToKey(root), AZURE_LIST_ALL,
        AZURE_UNBOUNDED_DEPTH);

    for (FileMetadata file : listing) {
      if (!file.isDirectory()) { // We don't recover directory blobs
        // See if this blob has a link in it (meaning it's a place-holder
        // blob for when the upload to the temp blob is complete).
        String link = store.getLinkInFileMetadata(file.getKey());
        if (link != null) {
          // It has a link, see if the temp blob it is pointing to is
          // existent and old enough to be considered dangling.
          FileMetadata linkMetadata = store.retrieveMetadata(link);
          if (linkMetadata != null
              && linkMetadata.getModificationTime() >= cutoffForDangling) {
            // Found one!
            handler.handleFile(file, linkMetadata);
          }
        }
      }
    }
  }

  /**
   * Looks under the given root path for any blob that are left "dangling",
   * meaning that they are place-holder blobs that we created while we upload
   * the data to a temporary blob, but for some reason we crashed in the middle
   * of the upload and left them there. If any are found, we move them to the
   * destination given.
   * 
   * @param root
   *          The root path to consider.
   * @param destination
   *          The destination path to move any recovered files to.
   * @throws IOException Thrown when fail to recover files.
   */
  public void recoverFilesWithDanglingTempData(Path root, Path destination)
      throws IOException {

    LOG.debug("Recovering files with dangling temp data in {}", root);
    handleFilesWithDanglingTempData(root,
        new DanglingFileRecoverer(destination));
  }

  /**
   * Looks under the given root path for any blob that are left "dangling",
   * meaning that they are place-holder blobs that we created while we upload
   * the data to a temporary blob, but for some reason we crashed in the middle
   * of the upload and left them there. If any are found, we delete them.
   * 
   * @param root
   *          The root path to consider.
   * @throws IOException Thrown when fail to delete.
   */
  public void deleteFilesWithDanglingTempData(Path root) throws IOException {

    LOG.debug("Deleting files with dangling temp data in {}", root);
    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
  }

  @Override
  protected void finalize() throws Throwable {
    LOG.debug("finalize() called.");
    close();
    super.finalize();
  }

  /**
   * Encode the key with a random prefix for load balancing in Azure storage.
   * Upload data to a random temporary file then do storage side renaming to
   * recover the original key.
   * 
   * @param aKey a key to be encoded.
   * @return Encoded version of the original key.
   */
  private static String encodeKey(String aKey) {
    // Get the tail end of the key name.
    //
    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
        aKey.length());

    // Construct the randomized prefix of the file name. The prefix ensures the
    // file always drops into the same folder but with a varying tail key name.
    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
        + UUID.randomUUID().toString();

    // Concatenate the randomized prefix with the tail of the key name.
    String randomizedKey = filePrefix + fileName;

    // Return to the caller with the randomized key.
    return randomizedKey;
  }

  /*
   * Helper method to retrieve owner information for a given path.
   * The method returns empty string in case the file is not found or the metadata does not contain owner information
  */
  @VisibleForTesting
  public String getOwnerForPath(Path absolutePath) throws IOException {
    String owner = "";
    FileMetadata meta = null;
    String key = pathToKey(absolutePath);
    try {

      meta = store.retrieveMetadata(key);

      if (meta != null) {
        owner = meta.getOwner();
        LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath);
      } else {
        // meta will be null if file/folder doen not exist
        LOG.debug("Cannot find file/folder - '{}'. Returning owner as empty string", absolutePath);
      }
    } catch(IOException ex) {

          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
          boolean isfileNotFoundException = innerException instanceof StorageException
            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException);

          // should not throw when the exception is related to blob/container/file/folder not found
          if (!isfileNotFoundException) {
            String errorMsg = "Could not retrieve owner information for path - " + absolutePath;
            LOG.error(errorMsg);
            throw new IOException(errorMsg, ex);
          }
      }
    return owner;
  }

  /**
   * Helper method to update the chownAllowedUsers in tests.
   * @param chownAllowedUsers list of chown allowed users
   */
  @VisibleForTesting
  void updateChownAllowedUsers(List<String> chownAllowedUsers) {
    this.chownAllowedUsers = chownAllowedUsers;
  }

  /**
   * Helper method to update the chmodAllowedUsers in tests.
   * @param chmodAllowedUsers list of chmod allowed users
   */
  @VisibleForTesting
  void updateChmodAllowedUsers(List<String> chmodAllowedUsers) {
    this.chmodAllowedUsers = chmodAllowedUsers;
  }

  /**
   * Helper method to update the daemonUsers in tests.
   * @param daemonUsers list of daemon users
   */
  @VisibleForTesting
  void updateDaemonUsers(List<String> daemonUsers) {
    this.daemonUsers = daemonUsers;
  }

  @Override
  public boolean hasPathCapability(final Path path, final String capability)
      throws IOException {
    switch (validatePathCapabilityArgs(path, capability)) {

    case CommonPathCapabilities.FS_PERMISSIONS:
      return true;
    // Append support is dynamic
    case CommonPathCapabilities.FS_APPEND:
      return appendSupportEnabled;
    default:
      return super.hasPathCapability(path, capability);
    }
  }
}