Storage.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.common;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.management.ManagementFactory;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.VersionInfo;

import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Storage information file.
 * <p>
 * Local storage information is stored in a separate file VERSION.
 * It contains type of the node, 
 * the storage layout version, the namespace id, and 
 * the fs state creation time.
 * <p>
 * Local storage can reside in multiple directories. 
 * Each directory should contain the same VERSION file as the others.
 * During startup Hadoop servers (name-node and data-nodes) read their local 
 * storage information from them.
 * <p>
 * The servers hold a lock for each storage directory while they run so that 
 * other nodes were not able to startup sharing the same storage.
 * The locks are released when the servers stop (normally or abnormally).
 * 
 */
@InterfaceAudience.Private
public abstract class Storage extends StorageInfo {

  public static final Logger LOG = LoggerFactory
      .getLogger(Storage.class.getName());

  // last layout version that did not support upgrades
  public static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;
  
  // this corresponds to Hadoop-0.18
  public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -16;
  protected static final String LAST_UPGRADABLE_HADOOP_VERSION = "Hadoop-0.18";
  
  /** Layout versions of 0.20.203 release */
  public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};

  public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
  public    static final String STORAGE_DIR_CURRENT   = "current";
  public    static final String STORAGE_DIR_PREVIOUS  = "previous";
  public    static final String STORAGE_TMP_REMOVED   = "removed.tmp";
  public    static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";
  public    static final String STORAGE_TMP_FINALIZED = "finalized.tmp";
  public    static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
  public    static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
  
  /**
   * The blocksBeingWritten directory which was used in some 1.x and earlier
   * releases.
   */
  public static final String STORAGE_1_BBW = "blocksBeingWritten";
  
  public enum StorageState {
    NON_EXISTENT,
    NOT_FORMATTED,
    COMPLETE_UPGRADE,
    RECOVER_UPGRADE,
    COMPLETE_FINALIZE,
    COMPLETE_ROLLBACK,
    RECOVER_ROLLBACK,
    COMPLETE_CHECKPOINT,
    RECOVER_CHECKPOINT,
    NORMAL;
  }
  
  /**
   * An interface to denote storage directory type
   * Implementations can define a type for storage directory by implementing
   * this interface.
   */
  @InterfaceAudience.Private
  public interface StorageDirType {
    public StorageDirType getStorageDirType();
    public boolean isOfType(StorageDirType type);
  }

  private final List<StorageDirectory> storageDirs =
      new CopyOnWriteArrayList<>();

  private class DirIterator implements Iterator<StorageDirectory> {
    final StorageDirType dirType;
    final boolean includeShared;
    int prevIndex; // for remove()
    int nextIndex; // for next()
    
    DirIterator(StorageDirType dirType, boolean includeShared) {
      this.dirType = dirType;
      this.nextIndex = 0;
      this.prevIndex = 0;
      this.includeShared = includeShared;
    }
    
    @Override
    public boolean hasNext() {
      if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
        return false;
      if (dirType != null || !includeShared) {
        while (nextIndex < storageDirs.size()) {
          if (shouldReturnNextDir())
            break;
          nextIndex++;
        }
        if (nextIndex >= storageDirs.size())
         return false;
      }
      return true;
    }
    
    @Override
    public StorageDirectory next() {
      StorageDirectory sd = getStorageDir(nextIndex);
      prevIndex = nextIndex;
      nextIndex++;
      if (dirType != null || !includeShared) {
        while (nextIndex < storageDirs.size()) {
          if (shouldReturnNextDir())
            break;
          nextIndex++;
        }
      }
      return sd;
    }
    
    @Override
    public void remove() {
      nextIndex = prevIndex; // restore previous state
      storageDirs.remove(prevIndex); // remove last returned element
      hasNext(); // reset nextIndex to correct place
    }
    
    private boolean shouldReturnNextDir() {
      StorageDirectory sd = getStorageDir(nextIndex);
      return (dirType == null || sd.getStorageDirType().isOfType(dirType)) &&
          (includeShared || !sd.isShared());
    }
  }
  
  /**
   * @return A list of the given File in every available storage directory,
   * regardless of whether it might exist.
   */
  public List<File> getFiles(StorageDirType dirType, String fileName) {
    ArrayList<File> list = new ArrayList<File>();
    Iterator<StorageDirectory> it =
      (dirType == null) ? dirIterator() : dirIterator(dirType);
    for ( ;it.hasNext(); ) {
      File currentDir = it.next().getCurrentDir();
      if (currentDir != null) {
        list.add(new File(currentDir, fileName));
      }
    }
    return list;
  }


  /**
   * Return default iterator
   * This iterator returns all entries in storageDirs
   */
  public Iterator<StorageDirectory> dirIterator() {
    return dirIterator(null);
  }
  
  /**
   * Return iterator based on Storage Directory Type
   * This iterator selects entries in storageDirs of type dirType and returns
   * them via the Iterator
   */
  public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
    return dirIterator(dirType, true);
  }
  
  /**
   * Return all entries in storageDirs, potentially excluding shared dirs.
   * @param includeShared whether or not to include shared dirs.
   * @return an iterator over the configured storage dirs.
   */
  public Iterator<StorageDirectory> dirIterator(boolean includeShared) {
    return dirIterator(null, includeShared);
  }
  
  /**
   * @param dirType all entries will be of this type of dir
   * @param includeShared true to include any shared directories,
   *        false otherwise
   * @return an iterator over the configured storage dirs.
   */
  public Iterator<StorageDirectory> dirIterator(StorageDirType dirType,
      boolean includeShared) {
    return new DirIterator(dirType, includeShared);
  }
  
  public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
    return new Iterable<StorageDirectory>() {
      @Override
      public Iterator<StorageDirectory> iterator() {
        return dirIterator(dirType);
      }
    };
  }
  
  
  /**
   * generate storage list (debug line)
   */
  public String listStorageDirectories() {
    StringBuilder buf = new StringBuilder();
    for (StorageDirectory sd : storageDirs) {
      buf.append(sd.getRoot() + "(" + sd.getStorageDirType() + ");");
    }
    return buf.toString();
  }
  
  /**
   * One of the storage directories.
   */
  @InterfaceAudience.Private
  public static class StorageDirectory implements FormatConfirmable {
    final File root;              // root directory
    // whether or not this dir is shared between two separate NNs for HA, or
    // between multiple block pools in the case of federation.
    final boolean isShared;
    final StorageDirType dirType; // storage dir type
    FileLock lock;                // storage lock
    private final FsPermission permission;

    private String storageUuid = null;      // Storage directory identifier.
    
    private final StorageLocation location;
    public StorageDirectory(File dir) {
      this(dir, null, false);
    }
    
    public StorageDirectory(StorageLocation location) {
      this(null, false, location);
    }

    public StorageDirectory(File dir, StorageDirType dirType) {
      this(dir, dirType, false);
    }
    
    public void setStorageUuid(String storageUuid) {
      this.storageUuid = storageUuid;
    }

    public String getStorageUuid() {
      return storageUuid;
    }

    /**
     * Constructor
     * @param dir directory corresponding to the storage
     * @param dirType storage directory type
     * @param isShared whether or not this dir is shared between two NNs. true
     *          disables locking on the storage directory, false enables locking
     */
    public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
      this(dir, dirType, isShared, null);
    }

    public StorageDirectory(File dir, StorageDirType dirType,
                            boolean isShared, FsPermission permission) {
      this(dir, dirType, isShared, null, permission);
    }

    /**
     * Constructor
     * @param dirType storage directory type
     * @param isShared whether or not this dir is shared between two NNs. true
     *          disables locking on the storage directory, false enables locking
     * @param location the {@link StorageLocation} for this directory
     */
    public StorageDirectory(StorageDirType dirType, boolean isShared,
        StorageLocation location) {
      this(getStorageLocationFile(location), dirType, isShared, location, null);
    }

    /**
     * Constructor
     * @param bpid the block pool id
     * @param dirType storage directory type
     * @param isShared whether or not this dir is shared between two NNs. true
     *          disables locking on the storage directory, false enables locking
     * @param location the {@link StorageLocation} for this directory
     */
    public StorageDirectory(String bpid, StorageDirType dirType,
        boolean isShared, StorageLocation location) {
      this(getBlockPoolCurrentDir(bpid, location), dirType,
          isShared, location, null);
    }

    private static File getBlockPoolCurrentDir(String bpid,
        StorageLocation location) {
      if (location == null ||
          location.getStorageType() == StorageType.PROVIDED) {
        return null;
      } else {
        return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT));
      }
    }

    private StorageDirectory(File dir, StorageDirType dirType,
        boolean isShared, StorageLocation location, FsPermission permission) {
      this.root = dir;
      this.lock = null;
      // default dirType is UNDEFINED
      this.dirType = (dirType == null ? NameNodeDirType.UNDEFINED : dirType);
      this.isShared = isShared;
      this.location = location;
      this.permission = permission;
      assert location == null || dir == null ||
          dir.getAbsolutePath().startsWith(
              new File(location.getUri()).getAbsolutePath()):
            "The storage location and directory should be equal";
    }

    private static File getStorageLocationFile(StorageLocation location) {
      if (location == null ||
          location.getStorageType() == StorageType.PROVIDED) {
        return null;
      }
      try {
        return new File(location.getUri());
      } catch (IllegalArgumentException e) {
        //if location does not refer to a File
        return null;
      }
    }

    /**
     * Get root directory of this storage
     */
    public File getRoot() {
      return root;
    }

    /**
     * Get storage directory type
     */
    public StorageDirType getStorageDirType() {
      return dirType;
    }    

    /**
     * Get storage directory size.
     */
    public long getDirecorySize() {
      try {
        if (!isShared() && root != null && root.exists()) {
          return FileUtils.sizeOfDirectory(root);
        }
      } catch (Exception e) {
        LOG.warn("Failed to get directory size : {}", root, e);
      }
      return 0;
    }

    public void read(File from, Storage storage) throws IOException {
      Properties props = readPropertiesFile(from);
      storage.setFieldsFromProperties(props, this);
    }

    /**
     * Clear and re-create storage directory.
     * <p>
     * Removes contents of the current directory and creates an empty directory.
     * 
     * This does not fully format storage directory. 
     * It cannot write the version file since it should be written last after  
     * all other storage type dependent files are written.
     * Derived storage is responsible for setting specific storage values and
     * writing the version file to disk.
     * 
     * @throws IOException
     */
    public void clearDirectory() throws IOException {
      File curDir = this.getCurrentDir();
      if (curDir == null) {
        // if the directory is null, there is nothing to do.
        return;
      }
      if (curDir.exists()) {
        File[] files = FileUtil.listFiles(curDir);
        LOG.info("Will remove files: {}", Arrays.toString(files));
        if (!(FileUtil.fullyDelete(curDir)))
          throw new IOException("Cannot remove current directory: " + curDir);
      }
      if (!curDir.mkdirs()) {
        throw new IOException("Cannot create directory " + curDir);
      }
      if (permission != null) {
        try {
          Set<PosixFilePermission> permissions =
              PosixFilePermissions.fromString(permission.toString());
          Files.setPosixFilePermissions(curDir.toPath(), permissions);
        } catch (UnsupportedOperationException uoe) {
          // Default to FileUtil for non posix file systems
          FileUtil.setPermission(curDir, permission);
        }
      }
    }

    /**
     * Directory {@code current} contains latest files defining
     * the file system meta-data.
     * 
     * @return the directory path
     */
    public File getCurrentDir() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_DIR_CURRENT);
    }

    /**
     * File {@code VERSION} contains the following fields:
     * <ol>
     * <li>node type</li>
     * <li>layout version</li>
     * <li>namespaceID</li>
     * <li>fs state creation time</li>
     * <li>other fields specific for this node type</li>
     * </ol>
     * The version file is always written last during storage directory updates.
     * The existence of the version file indicates that all other files have
     * been successfully written in the storage directory, the storage is valid
     * and does not need to be recovered.
     * 
     * @return the version file path
     */
    public File getVersionFile() {
      if (root == null) {
        return null;
      }
      return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
    }

    /**
     * File {@code VERSION} from the {@code previous} directory.
     * 
     * @return the previous version file path
     */
    public File getPreviousVersionFile() {
      if (root == null) {
        return null;
      }
      return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
    }

    /**
     * Directory {@code previous} contains the previous file system state,
     * which the system can be rolled back to.
     * 
     * @return the directory path
     */
    public File getPreviousDir() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_DIR_PREVIOUS);
    }

    /**
     * {@code previous.tmp} is a transient directory, which holds
     * current file system state while the new state is saved into the new
     * {@code current} during upgrade.
     * If the saving succeeds {@code previous.tmp} will be moved to
     * {@code previous}, otherwise it will be renamed back to 
     * {@code current} by the recovery procedure during startup.
     * 
     * @return the directory path
     */
    public File getPreviousTmp() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_TMP_PREVIOUS);
    }

    /**
     * {@code removed.tmp} is a transient directory, which holds
     * current file system state while the previous state is moved into
     * {@code current} during rollback.
     * If the moving succeeds {@code removed.tmp} will be removed,
     * otherwise it will be renamed back to 
     * {@code current} by the recovery procedure during startup.
     * 
     * @return the directory path
     */
    public File getRemovedTmp() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_TMP_REMOVED);
    }

    /**
     * {@code finalized.tmp} is a transient directory, which holds
     * the {@code previous} file system state while it is being removed
     * in response to the finalize request.
     * Finalize operation will remove {@code finalized.tmp} when completed,
     * otherwise the removal will resume upon the system startup.
     * 
     * @return the directory path
     */
    public File getFinalizedTmp() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_TMP_FINALIZED);
    }

    /**
     * {@code lastcheckpoint.tmp} is a transient directory, which holds
     * current file system state while the new state is saved into the new
     * {@code current} during regular namespace updates.
     * If the saving succeeds {@code lastcheckpoint.tmp} will be moved to
     * {@code previous.checkpoint}, otherwise it will be renamed back to 
     * {@code current} by the recovery procedure during startup.
     * 
     * @return the directory path
     */
    public File getLastCheckpointTmp() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_TMP_LAST_CKPT);
    }

    /**
     * {@code previous.checkpoint} is a directory, which holds the previous
     * (before the last save) state of the storage directory.
     * The directory is created as a reference only, it does not play role
     * in state recovery procedures, and is recycled automatically, 
     * but it may be useful for manual recovery of a stale state of the system.
     * 
     * @return the directory path
     */
    public File getPreviousCheckpoint() {
      if (root == null) {
        return null;
      }
      return new File(root, STORAGE_PREVIOUS_CKPT);
    }

    /**
     * Check to see if current/ directory is empty. This method is used
     * before determining to format the directory.
     *
     * @throws InconsistentFSStateException if not empty.
     * @throws IOException if unable to list files under the directory.
     */
    private void checkEmptyCurrent() throws InconsistentFSStateException,
        IOException {
      File currentDir = getCurrentDir();
      if(currentDir == null || !currentDir.exists()) {
        // if current/ does not exist, it's safe to format it.
        return;
      }
      try(DirectoryStream<java.nio.file.Path> dirStream =
          Files.newDirectoryStream(currentDir.toPath())) {
        if (dirStream.iterator().hasNext()) {
          throw new InconsistentFSStateException(root,
              "Can't format the storage directory because the current "
                  + "directory is not empty.");
        }
      }
    }

    /**
     * Check consistency of the storage directory.
     *
     * @param startOpt a startup option.
     * @param storage The Storage object that manages this StorageDirectory.
     *
     * @return state {@link StorageState} of the storage directory
     * @throws InconsistentFSStateException if directory state is not
     * consistent and cannot be recovered.
     * @throws IOException
     */
    public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
        throws IOException {
      return analyzeStorage(startOpt, storage, false);
    }

    /**
     * Check consistency of the storage directory.
     * 
     * @param startOpt a startup option.
     * @param storage The Storage object that manages this StorageDirectory.
     * @param checkCurrentIsEmpty if true, make sure current/ directory
     *                            is empty before determining to format it.
     *  
     * @return state {@link StorageState} of the storage directory 
     * @throws InconsistentFSStateException if directory state is not 
     * consistent and cannot be recovered.
     * @throws IOException
     */
    public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
        boolean checkCurrentIsEmpty)
        throws IOException {

      if (location != null &&
          location.getStorageType() == StorageType.PROVIDED) {
        // currently we assume that PROVIDED storages are always NORMAL
        return StorageState.NORMAL;
      }

      assert root != null : "root is null";
      boolean hadMkdirs = false;
      String rootPath = root.getCanonicalPath();
      try { // check that storage exists
        if (!root.exists()) {
          // storage directory does not exist
          if (startOpt != StartupOption.FORMAT &&
              startOpt != StartupOption.HOTSWAP) {
            LOG.warn("Storage directory {} does not exist", rootPath);
            return StorageState.NON_EXISTENT;
          }
          LOG.info("{} does not exist. Creating ...", rootPath);
          if (!root.mkdirs()) {
            throw new IOException("Cannot create directory " + rootPath);
          }
          hadMkdirs = true;
        }
        // or is inaccessible
        if (!root.isDirectory()) {
          LOG.warn("{} is not a directory", rootPath);
          return StorageState.NON_EXISTENT;
        }
        if (!FileUtil.canWrite(root)) {
          LOG.warn("Cannot access storage directory {}", rootPath);
          return StorageState.NON_EXISTENT;
        }
      } catch(SecurityException ex) {
        LOG.warn("Cannot access storage directory {}", rootPath, ex);
        return StorageState.NON_EXISTENT;
      }

      this.lock(); // lock storage if it exists

      // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
      // while it also checks the layout version.
      if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
          (startOpt == StartupOption.HOTSWAP && hadMkdirs)) {
        if (checkCurrentIsEmpty) {
          checkEmptyCurrent();
        }
        return StorageState.NOT_FORMATTED;
      }

      if (startOpt != HdfsServerConstants.StartupOption.IMPORT) {
        storage.checkOldLayoutStorage(this);
      }

      // check whether current directory is valid
      File versionFile = getVersionFile();
      boolean hasCurrent = versionFile.exists();

      // check which directories exist
      boolean hasPrevious = getPreviousDir().exists();
      boolean hasPreviousTmp = getPreviousTmp().exists();
      boolean hasRemovedTmp = getRemovedTmp().exists();
      boolean hasFinalizedTmp = getFinalizedTmp().exists();
      boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

      if (!(hasPreviousTmp || hasRemovedTmp
          || hasFinalizedTmp || hasCheckpointTmp)) {
        // no temp dirs - no recovery
        if (hasCurrent)
          return StorageState.NORMAL;
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                              "version file in current directory is missing.");
        if (checkCurrentIsEmpty) {
          checkEmptyCurrent();
        }
        return StorageState.NOT_FORMATTED;
      }

      if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
          + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
        // more than one temp dirs
        throw new InconsistentFSStateException(root,
                                               "too many temporary directories.");

      // # of temp dirs == 1 should either recover or complete a transition
      if (hasCheckpointTmp) {
        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
                          : StorageState.RECOVER_CHECKPOINT;
      }

      if (hasFinalizedTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
                                                 + "cannot exist together.");
        return StorageState.COMPLETE_FINALIZE;
      }

      if (hasPreviousTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
                                                 + " cannot exist together.");
        if (hasCurrent)
          return StorageState.COMPLETE_UPGRADE;
        return StorageState.RECOVER_UPGRADE;
      }
      
      assert hasRemovedTmp : "hasRemovedTmp must be true";
      if (!(hasCurrent ^ hasPrevious))
        throw new InconsistentFSStateException(root,
                                               "one and only one directory " + STORAGE_DIR_CURRENT 
                                               + " or " + STORAGE_DIR_PREVIOUS 
                                               + " must be present when " + STORAGE_TMP_REMOVED
                                               + " exists.");
      if (hasCurrent)
        return StorageState.COMPLETE_ROLLBACK;
      return StorageState.RECOVER_ROLLBACK;
    }

    /**
     * Complete or recover storage state from previously failed transition.
     * 
     * @param curState specifies what/how the state should be recovered
     * @throws IOException
     */
    public void doRecover(StorageState curState) throws IOException {
      File curDir = getCurrentDir();
      if (curDir == null || root == null) {
        // at this point, we do not support recovery on PROVIDED storages
        return;
      }
      String rootPath = root.getCanonicalPath();
      switch(curState) {
      case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
        LOG.info("Completing previous upgrade for storage directory {}",
            rootPath);
        rename(getPreviousTmp(), getPreviousDir());
        return;
      case RECOVER_UPGRADE:   // mv previous.tmp -> current
        LOG.info("Recovering storage directory {} from previous upgrade",
            rootPath);
        deleteAsync(curDir);
        rename(getPreviousTmp(), curDir);
        return;
      case COMPLETE_ROLLBACK: // rm removed.tmp
        LOG.info("Completing previous rollback for storage directory {}",
            rootPath);
        deleteDir(getRemovedTmp());
        return;
      case RECOVER_ROLLBACK:  // mv removed.tmp -> current
        LOG.info("Recovering storage directory {} from previous rollback",
            rootPath);
        rename(getRemovedTmp(), curDir);
        return;
      case COMPLETE_FINALIZE: // rm finalized.tmp
        LOG.info("Completing previous finalize for storage directory {}",
            rootPath);
        deleteAsync(getFinalizedTmp());
        return;
      case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
        LOG.info("Completing previous checkpoint for storage directory {}",
            rootPath);
        File prevCkptDir = getPreviousCheckpoint();
        deleteAsync(prevCkptDir);
        rename(getLastCheckpointTmp(), prevCkptDir);
        return;
      case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
        LOG.info("Recovering storage directory {} from failed checkpoint",
            rootPath);
        deleteAsync(curDir);
        rename(getLastCheckpointTmp(), curDir);
        return;
      default:
        throw new IOException("Unexpected FS state: " + curState
            + " for storage directory: " + rootPath);
      }
    }

    /**
     * Rename the curDir to curDir.tmp and delete the curDir.tmp parallely.
     * @throws IOException
     */
    private void deleteAsync(File curDir) throws IOException {
      if (curDir.exists()) {
        File curTmp = new File(curDir.getParent(), curDir.getName() + ".tmp");
        if (curTmp.exists()) {
          deleteDir(curTmp);
        }
        rename(curDir, curTmp);
        new Thread("Async Delete Current.tmp") {
          public void run() {
            try {
              deleteDir(curTmp);
            } catch (IOException e) {
              LOG.warn("Deleting storage directory {} failed", curTmp);
            }
          }
        }.start();
      }
    }

    /**
     * @return true if the storage directory should prompt the user prior
     * to formatting (i.e if the directory appears to contain some data)
     * @throws IOException if the SD cannot be accessed due to an IO error
     */
    @Override
    public boolean hasSomeData() throws IOException {
      // Its alright for a dir not to exist, or to exist (properly accessible)
      // and be completely empty.
      if (!root.exists()) return false;
      
      if (!root.isDirectory()) {
        // a file where you expect a directory should not cause silent
        // formatting
        return true;
      }
      
      if (FileUtil.listFiles(root).length == 0) {
        // Empty dir can format without prompt.
        return false;
      }
      
      return true;
    }
    
    public boolean isShared() {
      return isShared;
    }


    /**
     * Lock storage to provide exclusive access.
     * 
     * <p> Locking is not supported by all file systems.
     * E.g., NFS does not consistently support exclusive locks.
     * 
     * <p> If locking is supported we guarantee exclusive access to the
     * storage directory. Otherwise, no guarantee is given.
     * 
     * @throws IOException if locking fails
     */
    public void lock() throws IOException {
      if (isShared()) {
        LOG.info("Locking is disabled for {}", this.root);
        return;
      }
      FileLock newLock = tryLock();
      if (newLock == null) {
        String msg = "Cannot lock storage " + this.root
          + ". The directory is already locked";
        LOG.info(msg);
        throw new IOException(msg);
      }
      // Don't overwrite lock until success - this way if we accidentally
      // call lock twice, the internal state won't be cleared by the second
      // (failed) lock attempt
      lock = newLock;
    }

    /**
     * Attempts to acquire an exclusive lock on the storage.
     * 
     * @return A lock object representing the newly-acquired lock or
     * <code>null</code> if storage is already locked.
     * @throws IOException if locking fails.
     */
    @SuppressWarnings("resource")
    FileLock tryLock() throws IOException {
      boolean deletionHookAdded = false;
      File lockF = new File(root, STORAGE_FILE_LOCK);
      if (!lockF.exists()) {
        lockF.deleteOnExit();
        deletionHookAdded = true;
      }
      RandomAccessFile file = new RandomAccessFile(lockF, "rws");
      String jvmName = ManagementFactory.getRuntimeMXBean().getName();
      FileLock res = null;
      try {
        res = file.getChannel().tryLock();
        if (null == res) {
          LOG.error("Unable to acquire file lock on path {}", lockF);
          throw new OverlappingFileLockException();
        }
        file.write(jvmName.getBytes(StandardCharsets.UTF_8));
        LOG.info("Lock on {} acquired by nodename {}", lockF, jvmName);
      } catch(OverlappingFileLockException oe) {
        // Cannot read from the locked file on Windows.
        String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine());
        LOG.error("It appears that another node {} has already locked the "
            + "storage directory: {}", lockingJvmName, root, oe);
        file.close();
        return null;
      } catch(IOException e) {
        LOG.error("Failed to acquire lock on {}. If this storage directory is"
            + " mounted via NFS, ensure that the appropriate nfs lock services"
            + " are running.", lockF, e);
        file.close();
        throw e;
      }
      if (!deletionHookAdded) {
        // If the file existed prior to our startup, we didn't
        // call deleteOnExit above. But since we successfully locked
        // the dir, we can take care of cleaning it up.
        lockF.deleteOnExit();
      }
      return res;
    }

    /**
     * Unlock storage.
     * 
     * @throws IOException
     */
    public void unlock() throws IOException {
      if (this.lock == null)
        return;
      this.lock.release();
      lock.channel().close();
      lock = null;
    }
    
    @Override
    public String toString() {
      return "Storage Directory root= " + this.root +
          "; location= " + this.location;
    }

    /**
     * Check whether underlying file system supports file locking.
     * 
     * @return <code>true</code> if exclusive locks are supported or
     *         <code>false</code> otherwise.
     * @throws IOException
     * @see StorageDirectory#lock()
     */
    public boolean isLockSupported() throws IOException {
      FileLock firstLock = null;
      FileLock secondLock = null;
      try {
        firstLock = lock;
        if(firstLock == null) {
          firstLock = tryLock();
          if(firstLock == null)
            return true;
        }
        secondLock = tryLock();
        if(secondLock == null)
          return true;
      } finally {
        if(firstLock != null && firstLock != lock) {
          firstLock.release();
          firstLock.channel().close();
        }
        if(secondLock != null) {
          secondLock.release();
          secondLock.channel().close();
        }
      }
      return false;
    }

    public StorageLocation getStorageLocation() {
      return location;
    }
  }

  /**
   * Create empty storage info of the specified type
   */
  protected Storage(NodeType type) {
    super(type);
  }
  
  protected Storage(StorageInfo storageInfo) {
    super(storageInfo);
  }
  
  public int getNumStorageDirs() {
    return storageDirs.size();
  }

  public List<StorageDirectory> getStorageDirs() {
    return storageDirs;
  }

  public StorageDirectory getStorageDir(int idx) {
    return storageDirs.get(idx);
  }
  
  /**
   * @return the storage directory, with the precondition that this storage
   * has exactly one storage directory
   */
  public StorageDirectory getSingularStorageDir() {
    Preconditions.checkState(storageDirs.size() == 1);
    return storageDirs.get(0);
  }
  
  protected void addStorageDir(StorageDirectory sd) {
    storageDirs.add(sd);
  }

  /**
   * Returns true if the storage directory on the given directory is already
   * loaded.
   * @param root the root directory of a {@link StorageDirectory}
   * @throws IOException if failed to get canonical path.
   */
  protected boolean containsStorageDir(File root) throws IOException {
    for (StorageDirectory sd : storageDirs) {
      if (sd.getRoot().getCanonicalPath().equals(root.getCanonicalPath())) {
        return true;
      }
    }
    return false;
  }

  /**
   * Returns true if the storage directory on the given directory is already
   * loaded.
   * @param location the {@link StorageLocation}
   * @throws IOException if failed to get canonical path.
   */
  protected boolean containsStorageDir(StorageLocation location)
      throws IOException {
    for (StorageDirectory sd : storageDirs) {
      if (location.matchesStorageDirectory(sd)) {
        return true;
      }
    }
    return false;
  }

  /**
   * Returns true if the storage directory on the given location is already
   * loaded.
   * @param location the {@link StorageLocation}
   * @param bpid the block pool id
   * @return true if the location matches to any existing storage directories
   * @throws IOException IOException if failed to read location
   * or storage directory path
   */
  protected boolean containsStorageDir(StorageLocation location, String bpid)
      throws IOException {
    for (StorageDirectory sd : storageDirs) {
      if (location.matchesStorageDirectory(sd, bpid)) {
        return true;
      }
    }
    return false;
  }

  public NamespaceInfo getNamespaceInfo() {
    return new NamespaceInfo(this);
  }

  /**
   * Return true if the layout of the given storage directory is from a version
   * of Hadoop prior to the introduction of the "current" and "previous"
   * directories which allow upgrade and rollback.
   */
  public abstract boolean isPreUpgradableLayout(StorageDirectory sd)
  throws IOException;

  /**
   * Check if the given storage directory comes from a version of Hadoop
   * prior to when the directory layout changed (ie 0.13). If this is
   * the case, this method throws an IOException.
   */
  private void checkOldLayoutStorage(StorageDirectory sd) throws IOException {
    if (isPreUpgradableLayout(sd)) {
      checkVersionUpgradable(0);
    }
  }

  /**
   * Checks if the upgrade from {@code oldVersion} is supported.
   * @param oldVersion the version of the metadata to check with the current
   *                   version
   * @throws IOException if upgrade is not supported
   */
  public static void checkVersionUpgradable(int oldVersion) 
                                     throws IOException {
    if (oldVersion > LAST_UPGRADABLE_LAYOUT_VERSION) {
      String msg = "*********** Upgrade is not supported from this " +
                   " older version " + oldVersion + 
                   " of storage to the current version." + 
                   " Please upgrade to " + LAST_UPGRADABLE_HADOOP_VERSION +
                   " or a later version and then upgrade to current" +
                   " version. Old layout version is " + 
                   (oldVersion == 0 ? "'too old'" : (""+oldVersion)) +
                   " and latest layout version this software version can" +
                   " upgrade from is " + LAST_UPGRADABLE_LAYOUT_VERSION +
                   ". ************";
      LOG.error(msg);
      throw new IOException(msg); 
    }
    
  }
  
  /**
   * Iterate over each of the {@link FormatConfirmable} objects,
   * potentially checking with the user whether it should be formatted.
   * 
   * If running in interactive mode, will prompt the user for each
   * directory to allow them to format anyway. Otherwise, returns
   * false, unless 'force' is specified.
   * 
   * @param force format regardless of whether dirs exist
   * @param interactive prompt the user when a dir exists
   * @return true if formatting should proceed
   * @throws IOException if some storage cannot be accessed
   */
  public static boolean confirmFormat(
      Iterable<? extends FormatConfirmable> items,
      boolean force, boolean interactive) throws IOException {
    for (FormatConfirmable item : items) {
      if (!item.hasSomeData())
        continue;
      if (force) { // Don't confirm, always format.
        System.err.println(
            "Data exists in " + item + ". Formatting anyway.");
        continue;
      }
      if (!interactive) { // Don't ask - always don't format
        System.err.println(
            "Running in non-interactive mode, and data appears to exist in " +
            item + ". Not formatting.");
        return false;
      }
      if (!ToolRunner.confirmPrompt("Re-format filesystem in " + item + " ?")) {
        System.err.println("Format aborted in " + item);
        return false;
      }
    }
    
    return true;
  }
  
  /**
   * Interface for classes which need to have the user confirm their
   * formatting during NameNode -format and other similar operations.
   * 
   * This is currently a storage directory or journal manager.
   */
  @InterfaceAudience.Private
  public interface FormatConfirmable {
    /**
     * @return true if the storage seems to have some valid data in it,
     * and the user should be required to confirm the format. Otherwise,
     * false.
     * @throws IOException if the storage cannot be accessed at all.
     */
    public boolean hasSomeData() throws IOException;
    
    /**
     * @return a string representation of the formattable item, suitable
     * for display to the user inside a prompt
     */
    @Override
    public String toString();
  }
  
  /**
   * Set common storage fields into the given properties object.
   * Should be overloaded if additional fields need to be set.
   * 
   * @param props the Properties object to write into
   */
  protected void setPropertiesFromFields(Properties props, 
                                         StorageDirectory sd)
      throws IOException {
    props.setProperty("layoutVersion", String.valueOf(layoutVersion));
    props.setProperty("storageType", storageType.toString());
    props.setProperty("namespaceID", String.valueOf(namespaceID));
    // Set clusterID in version with federation support
    if (versionSupportsFederation(getServiceLayoutFeatureMap())) {
      props.setProperty("clusterID", clusterID);
    }
    props.setProperty("cTime", String.valueOf(cTime));
  }

  /**
   * Write properties to the VERSION file in the given storage directory.
   */
  public void writeProperties(StorageDirectory sd) throws IOException {
    writeProperties(sd.getVersionFile(), sd);
  }
  
  public void writeProperties(File to, StorageDirectory sd) throws IOException {
    if (to == null) {
      return;
    }
    Properties props = new Properties();
    setPropertiesFromFields(props, sd);
    writeProperties(to, props);
  }

  public static void writeProperties(File to, Properties props)
      throws IOException {
    try (RandomAccessFile file = new RandomAccessFile(to, "rws");
        FileOutputStream out = new FileOutputStream(file.getFD())) {
      file.seek(0);
      /*
       * If server is interrupted before this line,
       * the version file will remain unchanged.
       */
      props.store(out, null);
      /*
       * Now the new fields are flushed to the head of the file, but file
       * length can still be larger then required and therefore the file can
       * contain whole or corrupted fields from its old contents in the end.
       * If server is interrupted here and restarted later these extra fields
       * either should not effect server behavior or should be handled
       * by the server correctly.
       */
      file.setLength(out.getChannel().position());
    }
  }

  public static void rename(File from, File to) throws IOException {
    try {
      NativeIO.renameTo(from, to);
    } catch (NativeIOException e) {
      throw new IOException("Failed to rename " + from.getCanonicalPath()
        + " to " + to.getCanonicalPath() + " due to failure in native rename. "
        + e.toString());
    }
  }

  /**
   * Copies a file (usually large) to a new location using native unbuffered IO.
   * <p>
   * This method copies the contents of the specified source file
   * to the specified destination file using OS specific unbuffered IO.
   * The goal is to avoid churning the file system buffer cache when copying
   * large files.
   *
   * We can't use FileUtils#copyFile from apache-commons-io because it
   * is a buffered IO based on FileChannel#transferFrom, which uses MmapByteBuffer
   * internally.
   *
   * The directory holding the destination file is created if it does not exist.
   * If the destination file exists, then this method will delete it first.
   * <p>
   * <strong>Note:</strong> Setting <code>preserveFileDate</code> to
   * {@code true} tries to preserve the file's last modified
   * date/times using {@link File#setLastModified(long)}, however it is
   * not guaranteed that the operation will succeed.
   * If the modification operation fails, no indication is provided.
   *
   * @param srcFile  an existing file to copy, must not be {@code null}
   * @param destFile  the new file, must not be {@code null}
   * @param preserveFileDate  true if the file date of the copy
   *  should be the same as the original
   *
   * @throws NullPointerException if source or destination is {@code null}
   * @throws IOException if source or destination is invalid
   * @throws IOException if an IO error occurs during copying
   */
  public static void nativeCopyFileUnbuffered(File srcFile, File destFile,
      boolean preserveFileDate) throws IOException {
    if (srcFile == null) {
      throw new NullPointerException("Source must not be null");
    }
    if (destFile == null) {
      throw new NullPointerException("Destination must not be null");
    }
    if (srcFile.exists() == false) {
      throw new FileNotFoundException("Source '" + srcFile + "' does not exist");
    }
    if (srcFile.isDirectory()) {
      throw new IOException("Source '" + srcFile + "' exists but is a directory");
    }
    if (srcFile.getCanonicalPath().equals(destFile.getCanonicalPath())) {
      throw new IOException("Source '" + srcFile + "' and destination '" +
          destFile + "' are the same");
    }
    File parentFile = destFile.getParentFile();
    if (parentFile != null) {
      if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
        throw new IOException("Destination '" + parentFile
            + "' directory cannot be created");
      }
    }
    if (destFile.exists()) {
      if (FileUtil.canWrite(destFile) == false) {
        throw new IOException("Destination '" + destFile
            + "' exists but is read-only");
      } else {
        if (destFile.delete() == false) {
          throw new IOException("Destination '" + destFile
              + "' exists but cannot be deleted");
        }
      }
    }
    try {
      NativeIO.copyFileUnbuffered(srcFile, destFile);
    } catch (NativeIOException e) {
      throw new IOException("Failed to copy " + srcFile.getCanonicalPath()
          + " to " + destFile.getCanonicalPath()
          + " due to failure in NativeIO#copyFileUnbuffered(). "
          + e.toString());
    }
    if (srcFile.length() != destFile.length()) {
      throw new IOException("Failed to copy full contents from '" + srcFile
          + "' to '" + destFile + "'");
    }
    if (preserveFileDate) {
      if (destFile.setLastModified(srcFile.lastModified()) == false) {
        LOG.debug("Failed to preserve last modified date from'{}' to '{}'",
            srcFile, destFile);
      }
    }
  }

  /**
   * Recursively delete all the content of the directory first and then 
   * the directory itself from the local filesystem.
   * @param dir The directory to delete
   * @throws IOException
   */
  public static void deleteDir(File dir) throws IOException {
    if (!FileUtil.fullyDelete(dir))
      throw new IOException("Failed to delete " + dir.getCanonicalPath());
  }
  
  /**
   * Write all data storage files.
   * @throws IOException
   */
  public void writeAll() throws IOException {
    this.layoutVersion = getServiceLayoutVersion();
    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
      writeProperties(it.next());
    }
  }

  /**
   * Unlock all storage directories.
   * @throws IOException
   */
  public void unlockAll() throws IOException {
    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
      it.next().unlock();
    }
  }

  public static String getBuildVersion() {
    return VersionInfo.getRevision();
  }

  public static String getRegistrationID(StorageInfo storage) {
    return "NS-" + Integer.toString(storage.getNamespaceID())
      + "-" + storage.getClusterID()
      + "-" + Long.toString(storage.getCTime());
  }
  
  public static boolean is203LayoutVersion(int layoutVersion) {
    for (int lv203 : LAYOUT_VERSIONS_203) {
      if (lv203 == layoutVersion) {
        return true;
      }
    }
    return false;
  }
}