QueuePlacementPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.RejectPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;

/**
 * The FairScheduler rules based policy for placing an application in a queue.
 * It parses the configuration and updates the {@link
 * org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager}
 * with a list of {@link PlacementRule}s to execute in order.
 */
@Private
@Unstable
final class QueuePlacementPolicy {
  private static final Logger LOG =
      LoggerFactory.getLogger(QueuePlacementPolicy.class);

  // Simple private class to make the rule mapping simpler.
  private static final class RuleMap {
    private final Class<? extends PlacementRule> ruleClass;
    private final String terminal;

    private RuleMap(Class<? extends PlacementRule> clazz, String terminate) {
      this.ruleClass = clazz;
      this.terminal = terminate;
    }
  }

  // The list of known rules:
  // key to the map is the name in the configuration.
  // for each name the mapping contains the class name of the implementation
  // and a flag (true, false or create) which describes the terminal state
  // see the method getTerminal() for more comments.
  private static final Map<String, RuleMap> RULES;
  static {
    Map<String, RuleMap> map = new HashMap<>();
    map.put("user", new RuleMap(UserPlacementRule.class, "create"));
    map.put("primaryGroup",
        new RuleMap(PrimaryGroupPlacementRule.class, "create"));
    map.put("secondaryGroupExistingQueue",
        new RuleMap(SecondaryGroupExistingPlacementRule.class, "false"));
    map.put("specified", new RuleMap(SpecifiedPlacementRule.class, "false"));
    map.put("nestedUserQueue", new RuleMap(UserPlacementRule.class, "create"));
    map.put("default", new RuleMap(DefaultPlacementRule.class, "create"));
    map.put("reject", new RuleMap(RejectPlacementRule.class, "true"));
    RULES = Collections.unmodifiableMap(map);
  }

  private QueuePlacementPolicy() {
  }

  /**
   * Update the rules in the manager based on this placement policy.
   * @param newRules The new list of rules to set in the manager.
   * @param newTerminalState The list of terminal states for this set of rules.
   * @param fs the reference to the scheduler needed in the rule on init.
   * @throws AllocationConfigurationException for any errors
   */
  private static void updateRuleSet(List<PlacementRule> newRules,
                                    List<Boolean> newTerminalState,
                                    FairScheduler fs)
      throws AllocationConfigurationException {
    if (newRules.isEmpty()) {
      LOG.debug("Empty rule set defined, ignoring update");
      return;
    }
    LOG.debug("Placement rule order check");
    for (int i = 0; i < newTerminalState.size()-1; i++) {
      if (newTerminalState.get(i)) {
        String errorMsg = "Rules after rule "
            + (i+1) + " in queue placement policy can never be reached";
        if (fs.isNoTerminalRuleCheck()) {
          LOG.warn(errorMsg);
        } else {
          throw new AllocationConfigurationException(errorMsg);
        }
      }
    }
    if (!newTerminalState.get(newTerminalState.size()-1)) {
      throw new AllocationConfigurationException(
          "Could get past last queue placement rule without assigning");
    }
    // Set the scheduler in the rule to get queues etc
    LOG.debug("Initialising new rule set");
    try {
      for (PlacementRule rule: newRules){
        rule.initialize(fs);
      }
    } catch (IOException ioe) {
      // We should never throw as we pass in a FS object, however we still
      // should consider any exception here a config error.
      throw new AllocationConfigurationException(
          "Rule initialisation failed with exception", ioe);
    }
    // Update the placement manager with the new rule list.
    // We only get here when all rules are OK.
    fs.getRMContext().getQueuePlacementManager().updateRules(newRules);
    LOG.debug("PlacementManager active with new rule set");
  }

  /**
   * Builds a QueuePlacementPolicy from a xml element.
   * @param confElement the placement policy xml snippet from the
   *                    {@link FairSchedulerConfiguration}
   * @param fs the reference to the scheduler needed in the rule on init.
   * @throws AllocationConfigurationException for any errors
   */
  static void fromXml(Element confElement, FairScheduler fs)
      throws AllocationConfigurationException {
    LOG.debug("Reloading placement policy from allocation config");
    if (confElement == null || !confElement.hasChildNodes()) {
      throw new AllocationConfigurationException(
          "Empty configuration for QueuePlacementPolicy is not allowed");
    }
    List<PlacementRule> newRules = new ArrayList<>();
    List<Boolean> newTerminalState = new ArrayList<>();
    NodeList elements = confElement.getChildNodes();
    for (int i = 0; i < elements.getLength(); i++) {
      Node node = elements.item(i);
      if (node instanceof Element &&
          node.getNodeName().equalsIgnoreCase("rule")) {
        String name = ((Element) node).getAttribute("name");
        LOG.debug("Creating new rule: {}", name);
        PlacementRule rule = createRule((Element)node);

        // The only child node that we currently know is a parent rule
        PlacementRule parentRule = null;
        String parentName = null;
        Element child = getParentRuleElement(node);
        if (child != null) {
          parentName = child.getAttribute("name");
          parentRule = getParentRule(child, fs);
        }
        // Need to make sure that the nestedUserQueue has a parent for
        // backwards compatibility
        if (name.equalsIgnoreCase("nestedUserQueue") && parentRule == null) {
          throw new AllocationConfigurationException("Rule '" + name
              + "' must have a parent rule set");
        }
        newRules.add(rule);
        if (parentRule == null) {
          newTerminalState.add(
              getTerminal(RULES.get(name).terminal, rule));
        } else {
          ((FSPlacementRule)rule).setParentRule(parentRule);
          newTerminalState.add(
              getTerminal(RULES.get(name).terminal, rule) &&
              getTerminal(RULES.get(parentName).terminal, parentRule));
        }
      }
    }
    updateRuleSet(newRules, newTerminalState, fs);
  }

  /**
   * Find the element that defines the parent rule.
   * @param node the xml node to check for a parent rule
   * @return {@link Element} that describes the parent rule or
   * <code>null</code> if none is found
   */
  private static Element getParentRuleElement(Node node)
      throws AllocationConfigurationException {
    Element parent = null;
    // walk over the node list
    if (node.hasChildNodes()) {
      NodeList childList = node.getChildNodes();
      for (int j = 0; j < childList.getLength(); j++) {
        Node child = childList.item(j);
        if (child instanceof Element &&
            child.getNodeName().equalsIgnoreCase("rule")) {
          if (parent != null) {
            LOG.warn("Rule '{}' has multiple parent rules defined, only the " +
                "last parent rule will be used",
                ((Element) node).getAttribute("name"));
          }
          parent = ((Element) child);
        }
      }
    }
    // sanity check the rule that is configured
    if (parent != null) {
      String parentName = parent.getAttribute("name");
      if (parentName.equals("reject") ||
          parentName.equals("nestedUserQueue")) {
        throw new AllocationConfigurationException("Rule '"
            + parentName
            + "' is not allowed as a parent rule for any rule");
      }
    }
    return parent;
  }

  /**
   * Retrieve the configured parent rule from the xml config.
   * @param parent the xml element that contains the name of the rule to add.
   * @param fs the reference to the scheduler needed in the rule on init.
   * @return {@link PlacementRule} to set as a parent
   * @throws AllocationConfigurationException for any error
   */
  private static PlacementRule getParentRule(Element parent,
                                             FairScheduler fs)
      throws AllocationConfigurationException {
    LOG.debug("Creating new parent rule: {}", parent.getAttribute("name"));
    PlacementRule parentRule = createRule(parent);
    // Init the rule, we do not want to add it to the list of the
    // placement manager
    try {
      parentRule.initialize(fs);
    } catch (IOException ioe) {
      // We should never throw as we pass in a FS object, however we
      // still should consider any exception here a config error.
      throw new AllocationConfigurationException(
          "Parent Rule initialisation failed with exception", ioe);
    }
    return parentRule;
  }

  /**
   * Returns the terminal status of the rule based on the definition and the
   * create flag set in the rule.
   * @param terminal The definition of the terminal flag
   * @param rule The rule to check
   * @return <code>true</code> if the rule is terminal <code>false</code> in
   * all other cases.
   */
  private static Boolean getTerminal(String terminal, PlacementRule rule) {
    switch (terminal) {
    case "true":    // rule is always terminal
      return true;
    case "false":   // rule is never terminal
      return false;
    default:        // rule is terminal based on the create flag
      return ((FSPlacementRule)rule).getCreateFlag();
    }
  }

  /**
   * Create a rule from a given a xml node.
   * @param element the xml element to create the rule from
   * @return PlacementRule
   * @throws AllocationConfigurationException for any error
   */
  @SuppressWarnings("unchecked")
  private static PlacementRule createRule(Element element)
      throws AllocationConfigurationException {

    String ruleName = element.getAttribute("name");
    if ("".equals(ruleName)) {
      throw new AllocationConfigurationException("No name provided for a "
          + "rule element");
    }

    Class<? extends PlacementRule> ruleClass = null;
    if (RULES.containsKey(ruleName)) {
      ruleClass = RULES.get(ruleName).ruleClass;
    }
    if (ruleClass == null) {
      throw new AllocationConfigurationException("No rule class found for "
          + ruleName);
    }
    return getPlacementRule(ruleClass, element);
  }
    
  /**
   * Build a simple queue placement policy from the configuration options
   * {@link FairSchedulerConfiguration#ALLOW_UNDECLARED_POOLS} and
   * {@link FairSchedulerConfiguration#USER_AS_DEFAULT_QUEUE}.
   * @param fs the reference to the scheduler needed in the rule on init.
   */
  static void fromConfiguration(FairScheduler fs) {
    LOG.debug("Creating base placement policy from config");
    Configuration conf = fs.getConfig();

    boolean create = conf.getBoolean(
        FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
        FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
    boolean userAsDefaultQueue = conf.getBoolean(
        FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
        FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
    List<PlacementRule> newRules = new ArrayList<>();
    List<Boolean> newTerminalState = new ArrayList<>();
    Class<? extends PlacementRule> clazz =
        RULES.get("specified").ruleClass;
    newRules.add(getPlacementRule(clazz, create));
    newTerminalState.add(false);
    if (userAsDefaultQueue) {
      clazz = RULES.get("user").ruleClass;
      newRules.add(getPlacementRule(clazz, create));
      newTerminalState.add(create);
    }
    if (!userAsDefaultQueue || !create) {
      clazz = RULES.get("default").ruleClass;
      newRules.add(getPlacementRule(clazz, true));
      newTerminalState.add(true);
    }
    try {
      updateRuleSet(newRules, newTerminalState, fs);
    } catch (AllocationConfigurationException ex) {
      throw new RuntimeException("Should never hit exception when loading" +
          "placement policy from conf", ex);
    }
  }
}