StoragePolicySatisfyManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.sps;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * This manages satisfy storage policy invoked path ids and expose methods to
 * process these path ids. It maintains sps mode(EXTERNAL/NONE)
 * configured by the administrator.
 *
 * <p>
 * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
 * it won't do anything, just maintains the sps invoked path ids. Administrator
 * requires to start external sps service explicitly, to fetch the sps invoked
 * path ids from namenode, then do necessary computations and block movement in
 * order to satisfy the storage policy. Please refer
 * {@link ExternalStoragePolicySatisfier} class to understand more about the
 * external sps service functionality.
 *
 * <p>
 * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then it
 * will disable the sps feature completely by clearing all queued up sps path's
 * hint.
 *
 * This class is instantiated by the BlockManager.
 */
public class StoragePolicySatisfyManager {
  private static final Logger LOG = LoggerFactory
      .getLogger(StoragePolicySatisfyManager.class);
  private final StoragePolicySatisfier spsService;
  private final boolean storagePolicyEnabled;
  private volatile StoragePolicySatisfierMode mode;
  private final Queue<Long> pathsToBeTraversed;
  private final int outstandingPathsLimit;
  private final Namesystem namesystem;

  public StoragePolicySatisfyManager(Configuration conf,
      Namesystem namesystem) {
    // StoragePolicySatisfier(SPS) configs
    storagePolicyEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
    String modeVal = conf.get(
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
    outstandingPathsLimit = conf.getInt(
        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
        DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
    mode = StoragePolicySatisfierMode.fromString(modeVal);
    pathsToBeTraversed = new LinkedList<Long>();
    this.namesystem = namesystem;
    // instantiate SPS service by just keeps config reference and not starting
    // any supporting threads.
    spsService = new StoragePolicySatisfier(conf);
  }

  /**
   * This function will do following logic based on the configured sps mode:
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
   * it won't do anything. Administrator requires to start external sps service
   * explicitly.
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then the
   * service is disabled and won't do any action.
   */
  public void start() {
    if (!storagePolicyEnabled) {
      LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.",
          DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
      return;
    }

    switch (mode) {
    case EXTERNAL:
      LOG.info("Storage policy satisfier is configured as external, "
          + "please start external sps service explicitly to satisfy policy");
      break;
    case NONE:
      LOG.info("Storage policy satisfier is disabled");
      break;
    default:
      LOG.info("Given mode: {} is invalid", mode);
      break;
    }
  }

  /**
   * This function will do following logic based on the configured sps mode:
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#EXTERNAL}, then
   * it won't do anything. Administrator requires to stop external sps service
   * explicitly, if needed.
   *
   * <p>
   * If the configured mode is {@link StoragePolicySatisfierMode#NONE}, then the
   * service is disabled and won't do any action.
   */
  public void stop() {
    if (!storagePolicyEnabled) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Storage policy is not enabled, ignoring");
      }
      return;
    }

    switch (mode) {
    case EXTERNAL:
      removeAllPathIds();
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Storage policy satisfier service is running outside namenode"
            + ", ignoring");
      }
      break;
    case NONE:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Storage policy satisfier is not enabled, ignoring");
      }
      break;
    default:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Invalid mode:{}, ignoring", mode);
      }
      break;
    }
  }

  /**
   * Sets new sps mode. If the new mode is none, then it will disable the sps
   * feature completely by clearing all queued up sps path's hint.
   */
  public void changeModeEvent(StoragePolicySatisfierMode newMode) {
    if (!storagePolicyEnabled) {
      LOG.info("Failed to change storage policy satisfier as {} set to {}.",
          DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
      return;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
          mode, newMode);
    }

    switch (newMode) {
    case EXTERNAL:
      if (mode == newMode) {
        LOG.info("Storage policy satisfier is already in mode:{},"
            + " so ignoring change mode event.", newMode);
        return;
      }
      spsService.stopGracefully();
      break;
    case NONE:
      if (mode == newMode) {
        LOG.info("Storage policy satisfier is already disabled, mode:{}"
            + " so ignoring change mode event.", newMode);
        return;
      }
      LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
      spsService.stop(true);
      clearPathIds();
      break;
    default:
      if (LOG.isDebugEnabled()) {
        LOG.debug("Given mode: {} is invalid", newMode);
      }
      break;
    }

    // update sps mode
    mode = newMode;
  }

  /**
   * @return true if the internal storage policy satisfier daemon is running,
   *         false otherwise.
   */
  @VisibleForTesting
  public boolean isSatisfierRunning() {
    return spsService.isRunning();
  }

  /**
   * @return the next SPS path id, on which path users has invoked to satisfy
   *         storages.
   */
  public Long getNextPathId() {
    synchronized (pathsToBeTraversed) {
      return pathsToBeTraversed.poll();
    }
  }

  /**
   * Verify that satisfier queue limit exceeds allowed outstanding limit.
   * @throws IOException
   */
  public void verifyOutstandingPathQLimit() throws IOException {
    long size = pathsToBeTraversed.size();
    // Checking that the SPS call Q exceeds the allowed limit.
    if (outstandingPathsLimit - size <= 0) {
      LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
          outstandingPathsLimit, size);
      throw new IOException("Outstanding satisfier queue limit: "
          + outstandingPathsLimit + " exceeded, try later!");
    }
  }

  /**
   * Removes the SPS path id from the list of sps paths.
   *
   * @throws IOException
   */
  private void clearPathIds(){
    synchronized (pathsToBeTraversed) {
      Iterator<Long> iterator = pathsToBeTraversed.iterator();
      while (iterator.hasNext()) {
        Long trackId = iterator.next();
        try {
          namesystem.removeXattr(trackId,
              HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
        } catch (IOException e) {
          LOG.debug("Failed to remove sps xattr!", e);
        }
        iterator.remove();
      }
    }
  }

  /**
   * Clean up all sps path ids.
   */
  public void removeAllPathIds() {
    synchronized (pathsToBeTraversed) {
      pathsToBeTraversed.clear();
    }
  }

  /**
   * Adds the sps path to SPSPathIds list.
   * @param id
   */
  public void addPathId(long id) {
    synchronized (pathsToBeTraversed) {
      pathsToBeTraversed.add(id);
    }
  }

  /**
   * @return true if sps is configured as an external
   *         service, false otherwise.
   */
  public boolean isEnabled() {
    return mode == StoragePolicySatisfierMode.EXTERNAL;
  }

  /**
   * @return sps service mode.
   */
  public StoragePolicySatisfierMode getMode() {
    return mode;
  }

  /**
   * @return the number of paths to be processed by storage policy satisfier.
   */
  public int getPendingSPSPaths() {
    return pathsToBeTraversed.size();
  }
}