FsVolumeList.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;

class FsVolumeList {
  private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
      new CopyOnWriteArrayList<>();
  // Tracks volume failures, sorted by volume path.
  // map from volume storageID to the volume failure info
  private final Map<StorageLocation, VolumeFailureInfo> volumeFailureInfos =
      Collections.synchronizedMap(
          new TreeMap<StorageLocation, VolumeFailureInfo>());
  private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved =
      new ConcurrentLinkedQueue<>();
  private final AutoCloseableLock checkDirsLock;
  private final Condition checkDirsLockCondition;

  private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
  private final BlockScanner blockScanner;

  private final boolean enableSameDiskTiering;
  private final MountVolumeMap mountVolumeMap;
  private Map<URI, Double> capacityRatioMap;
  private final DataNodeDiskMetrics diskMetrics;

  FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
      BlockScanner blockScanner,
      VolumeChoosingPolicy<FsVolumeImpl> blockChooser,
      Configuration config, DataNodeDiskMetrics dataNodeDiskMetrics) {
    this.blockChooser = blockChooser;
    this.blockScanner = blockScanner;
    this.checkDirsLock = new AutoCloseableLock();
    this.checkDirsLockCondition = checkDirsLock.newCondition();
    this.diskMetrics = dataNodeDiskMetrics;
    for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
      volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
          volumeFailureInfo);
    }
    enableSameDiskTiering = config.getBoolean(
        DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
        DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
    mountVolumeMap = new MountVolumeMap(config);
    initializeCapacityRatio(config);
  }

  MountVolumeMap getMountVolumeMap() {
    return mountVolumeMap;
  }

  /**
   * Return an immutable list view of all the volumes.
   */
  List<FsVolumeImpl> getVolumes() {
    return Collections.unmodifiableList(volumes);
  }

  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
      long blockSize, String storageId) throws IOException {

    // Exclude slow disks when choosing volume.
    if (diskMetrics != null) {
      List<String> slowDisksToExclude = diskMetrics.getSlowDisksToExclude();
      list = list.stream()
          .filter(volume -> !slowDisksToExclude.contains(volume.getBaseURI().getPath()))
          .collect(Collectors.toList());
    }

    while (true) {
      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize,
          storageId);
      try {
        return volume.obtainReference();
      } catch (ClosedChannelException e) {
        FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume);
        // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list
        // is empty, indicating that all volumes are closed.
        list.remove(volume);
      }
    }
  }

  /**
   * Get volume by disk mount to place a block.
   * This is useful for same disk tiering.
   *
   * @param storageType The desired {@link StorageType}
   * @param mount Disk mount of the volume
   * @param blockSize Free space needed on the volume
   * @return
   * @throws IOException
   */
  FsVolumeReference getVolumeByMount(StorageType storageType,
      String mount, long blockSize) throws IOException {
    if (!enableSameDiskTiering) {
      return null;
    }
    FsVolumeReference volume = mountVolumeMap
        .getVolumeRefByMountAndStorageType(mount, storageType);
    // Check if volume has enough capacity
    if (volume != null && volume.getVolume().getAvailable() > blockSize) {
      return volume;
    }
    return null;
  }

  private void initializeCapacityRatio(Configuration config) {
    if (capacityRatioMap == null) {
      String capacityRatioConfig = config.get(
          DFSConfigKeys
              .DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE,
          DFSConfigKeys
              .DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE_DEFAULT
      );

      this.capacityRatioMap = StorageLocation
          .parseCapacityRatio(capacityRatioConfig);
    }
  }

  /** 
   * Get next volume.
   *
   * @param blockSize free space needed on the volume
   * @param storageType the desired {@link StorageType}
   * @param storageId the storage id which may or may not be used by
   *                  the VolumeChoosingPolicy.
   * @return next volume to store the block in.
   */
  FsVolumeReference getNextVolume(StorageType storageType, String storageId,
      long blockSize) throws IOException {
    final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
    for(FsVolumeImpl v : volumes) {
      if (v.getStorageType() == storageType) {
        list.add(v);
      }
    }
    return chooseVolume(list, blockSize, storageId);
  }

  /**
   * Get next volume.
   *
   * @param blockSize free space needed on the volume
   * @return next volume to store the block in.
   */
  FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
    // Get a snapshot of currently available volumes.
    final List<FsVolumeImpl> curVolumes = getVolumes();
    final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
    for(FsVolumeImpl v : curVolumes) {
      if (v.isTransientStorage()) {
        list.add(v);
      }
    }
    return chooseVolume(list, blockSize, null);
  }

  long getDfsUsed() throws IOException {
    long dfsUsed = 0L;
    for (FsVolumeImpl v : volumes) {
      try(FsVolumeReference ref = v.obtainReference()) {
        dfsUsed += v.getDfsUsed();
      } catch (ClosedChannelException e) {
        // ignore.
      }
    }
    return dfsUsed;
  }

  long getBlockPoolUsed(String bpid) throws IOException {
    long dfsUsed = 0L;
    for (FsVolumeImpl v : volumes) {
      try (FsVolumeReference ref = v.obtainReference()) {
        dfsUsed += v.getBlockPoolUsed(bpid);
      } catch (ClosedChannelException e) {
        // ignore.
      }
    }
    return dfsUsed;
  }

  long getCapacity() {
    long capacity = 0L;
    for (FsVolumeImpl v : volumes) {
      try (FsVolumeReference ref = v.obtainReference()) {
        capacity += v.getCapacity();
      } catch (IOException e) {
        // ignore.
      }
    }
    return capacity;
  }
    
  long getRemaining() throws IOException {
    long remaining = 0L;
    for (FsVolumeSpi vol : volumes) {
      try (FsVolumeReference ref = vol.obtainReference()) {
        remaining += vol.getAvailable();
      } catch (ClosedChannelException e) {
        // ignore
      }
    }
    return remaining;
  }
  
  void getAllVolumesMap(final String bpid,
                        final ReplicaMap volumeMap,
                        final RamDiskReplicaTracker ramDiskReplicaMap)
      throws IOException {
    long totalStartTime = Time.monotonicNow();
    final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
        new ConcurrentHashMap<FsVolumeSpi, IOException>();
    List<Thread> replicaAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
                bpid + " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
                + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
                "from " + v + ". Will throw later.", ioe);
            unhealthyDataDirs.put(v, ioe);
          }
        }
      };
      replicaAddingThreads.add(t);
      t.start();
    }
    for (Thread t : replicaAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    long totalTimeTaken = Time.monotonicNow() - totalStartTime;
    FsDatasetImpl.LOG
        .info("Total time to add all replicas to map for block pool " + bpid
            + ": " + totalTimeTaken + "ms");
    if (!unhealthyDataDirs.isEmpty()) {
      throw new AddBlockPoolException(unhealthyDataDirs);
    }
  }

  /**
   * Updates the failed volume info in the volumeFailureInfos Map
   * and calls {@link #removeVolume(FsVolumeImpl)} to remove the volume
   * from the volume list for each of the failed volumes.
   *
   * @param failedVolumes set of volumes marked failed.
   */
  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
    try (AutoCloseableLock lock = checkDirsLock.acquire()) {

      for(FsVolumeSpi vol : failedVolumes) {
        FsVolumeImpl fsv = (FsVolumeImpl) vol;
        try (FsVolumeReference ref = fsv.obtainReference()) {
          addVolumeFailureInfo(fsv);
          removeVolume(fsv);
        } catch (ClosedChannelException e) {
          FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
            "reference count on closed volume", e);
        } catch (IOException e) {
          FsDatasetImpl.LOG.error("Unexpected IOException", e);
        }
      }
      
      waitVolumeRemoved(5000, checkDirsLockCondition);
    }
  }

  /**
   * Wait for the reference of the volume removed from a previous
   * {@link #removeVolume(FsVolumeImpl)} call to be released.
   *
   * @param sleepMillis interval to recheck.
   */
  void waitVolumeRemoved(int sleepMillis, Condition condition) {
    while (!checkVolumesRemoved()) {
      if (FsDatasetImpl.LOG.isDebugEnabled()) {
        FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
      }
      try {
        condition.await(sleepMillis, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
            + "volume reference to be released.");
        Thread.currentThread().interrupt();
      }
    }
    FsDatasetImpl.LOG.info("Volume reference is released.");
  }

  /**
   * Wait for the reference of the volume removed from a previous
   * {@link #removeVolume(FsVolumeImpl)} call to be released.
   *
   * @param sleepMillis interval to recheck.
   */
  void waitVolumeRemoved(int sleepMillis, Object condition) {
    while (!checkVolumesRemoved()) {
      if (FsDatasetImpl.LOG.isDebugEnabled()) {
        FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
      }
      try {
        condition.wait(sleepMillis);
      } catch (InterruptedException e) {
        FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
            + "volume reference to be released.");
        Thread.currentThread().interrupt();
      }
    }
    FsDatasetImpl.LOG.info("Volume reference is released.");
  }

  @Override
  public String toString() {
    return volumes.toString();
  }

  /**
   * Dynamically add new volumes to the existing volumes that this DN manages.
   *
   * @param ref       a reference to the new FsVolumeImpl instance.
   */
  void addVolume(FsVolumeReference ref) throws IOException {
    FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
    volumes.add(volume);
    if (isSameDiskTieringApplied(volume)) {
      mountVolumeMap.addVolume(volume);
      URI uri = volume.getStorageLocation().getUri();
      if (capacityRatioMap.containsKey(uri)) {
        mountVolumeMap.setCapacityRatio(volume, capacityRatioMap.get(uri));
      }
    }
    if (blockScanner != null) {
      blockScanner.addVolumeScanner(ref);
    } else {
      // If the volume is not put into a volume scanner, it does not need to
      // hold the reference.
      IOUtils.cleanupWithLogger(null, ref);
    }
    // If the volume is used to replace a failed volume, it needs to reset the
    // volume failure info for this volume.
    removeVolumeFailureInfo(volume.getStorageLocation());
    FsDatasetImpl.LOG.info("Added new volume: " +
        volume.getStorageID());
  }

  /**
   * Dynamically remove a volume in the list.
   * @param target the volume instance to be removed.
   */
  private void removeVolume(FsVolumeImpl target) {
    if (volumes.remove(target)) {
      if (isSameDiskTieringApplied(target)) {
        mountVolumeMap.removeVolume(target);
      }
      if (blockScanner != null) {
        blockScanner.removeVolumeScanner(target);
      }
      try {
        target.setClosed();
      } catch (IOException e) {
        FsDatasetImpl.LOG.warn(
            "Error occurs when waiting volume to close: " + target, e);
      }
      target.shutdown();
      volumesBeingRemoved.add(target);
      FsDatasetImpl.LOG.info("Removed volume: " + target);
    } else {
      if (FsDatasetImpl.LOG.isDebugEnabled()) {
        FsDatasetImpl.LOG.debug("Volume " + target +
            " does not exist or is removed by others.");
      }
    }
  }

  /**
   * Check if same disk tiering is applied to the volume.
   */
  private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
    return enableSameDiskTiering
        && StorageType.allowSameDiskTiering(target.getStorageType());
  }

  /**
   * Dynamically remove volume in the list.
   * @param storageLocation {@link StorageLocation} of the volume to be removed.
   * @param clearFailure set true to remove failure info for this volume.
   */
  void removeVolume(StorageLocation storageLocation, boolean clearFailure) {
    for (FsVolumeImpl fsVolume : volumes) {
      StorageLocation baseLocation = fsVolume.getStorageLocation();
      if (baseLocation.equals(storageLocation)) {
        removeVolume(fsVolume);
      }
    }
    if (clearFailure) {
      removeVolumeFailureInfo(storageLocation);
    }
  }

  VolumeFailureInfo[] getVolumeFailureInfos() {
    Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
    return infos.toArray(new VolumeFailureInfo[infos.size()]);
  }

  /**
   * Check whether the reference of the volume from a previous
   * {@link #removeVolume(FsVolumeImpl)} call is released.
   *
   * @return Whether the reference is released.
   */
  boolean checkVolumesRemoved() {
    Iterator<FsVolumeImpl> it = volumesBeingRemoved.iterator();
    while (it.hasNext()) {
      FsVolumeImpl volume = it.next();
      if (!volume.checkClosed()) {
        return false;
      }
      it.remove();
    }
    return true;
  }

  void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
    // There could be redundant requests for adding the same failed
    // volume because of repeated DataNode reconfigure with same list
    // of volumes. Ignoring update on failed volume so as to preserve
    // old failed capacity details in the map.
    if (!volumeFailureInfos.containsKey(volumeFailureInfo
        .getFailedStorageLocation())) {
      volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
          volumeFailureInfo);
    }
  }

  private void addVolumeFailureInfo(FsVolumeImpl vol) {
    addVolumeFailureInfo(new VolumeFailureInfo(
        vol.getStorageLocation(),
        Time.now(),
        vol.getCapacity()));
  }

  void removeVolumeFailureInfo(StorageLocation location) {
    volumeFailureInfos.remove(location);
  }

  void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
    final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
        new ConcurrentHashMap<FsVolumeSpi, IOException>();
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            v.addBlockPool(bpid, conf);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                " on " + v + ": " + timeTaken + "ms");
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                ". Will throw later.", ioe);
            unhealthyDataDirs.put(v, ioe);
          }
        }
      };
      blockPoolAddingThreads.add(t);
      t.start();
    }
    for (Thread t : blockPoolAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    long totalTimeTaken = Time.monotonicNow() - totalStartTime;
    FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
        bpid + ": " + totalTimeTaken + "ms");
    if (!unhealthyDataDirs.isEmpty()) {
      throw new AddBlockPoolException(unhealthyDataDirs);
    }
  }

  void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
      blocksPerVolume) {
    for (FsVolumeImpl v : volumes) {
      v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
    }
  }

  void shutdown() {
    for (FsVolumeImpl volume : volumes) {
      if(volume != null) {
        volume.shutdown();
      }
    }
  }
}