FairSchedulerConfiguration.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import static org.apache.hadoop.yarn.util.resource.ResourceUtils.RESOURCE_REQUEST_VALUE_PATTERN;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Private
@Evolving
public class FairSchedulerConfiguration extends Configuration {

  public static final Logger LOG = LoggerFactory.getLogger(
      FairSchedulerConfiguration.class.getName());

  /**
   * Resource Increment request grant-able by the FairScheduler.
   * This property is looked up in the yarn-site.xml.
   * @deprecated The preferred way to configure the increment is by using the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property,
   * for memory: yarn.resource-types.memory-mb.increment-allocation
   */
  @Deprecated
  public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
    YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb";
  @Deprecated
  public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024;
  /**
   * Resource Increment request grant-able by the FairScheduler.
   * This property is looked up in the yarn-site.xml.
   * @deprecated The preferred way to configure the increment is by using the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property,
   * for CPU: yarn.resource-types.vcores.increment-allocation
   */
  @Deprecated
  public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
    YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
  @Deprecated
  public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;

  /** Threshold for container size for making a container reservation as a
   * multiple of increment allocation. Only container sizes above this are
   * allowed to reserve a node */
  public static final String
      RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE =
      YarnConfiguration.YARN_PREFIX +
          "scheduler.reservation-threshold.increment-multiple";
  public static final float
      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE = 2f;

  private static final String CONF_PREFIX =  "yarn.scheduler.fair.";

  /**
   * Used during FS->CS conversion. When enabled, background threads are
   * not started. This property should NOT be used by end-users!
   */
  public static final String MIGRATION_MODE = CONF_PREFIX + "migration.mode";

  /**
   * Disables checking whether a placement rule is terminal or not. Only
   * used during migration mode. This property should NOT be used by end users!
   */
  public static final String NO_TERMINAL_RULE_CHECK = CONF_PREFIX +
      "no-terminal-rule.check";

  public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
  protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
  
  /** Whether pools can be created that were not specified in the FS configuration file
   */
  public static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX +
      "allow-undeclared-pools";
  public static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
  
  /** Whether to use the user name as the queue name (instead of "default") if
   * the request does not specify a queue. */
  public static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX +
      "user-as-default-queue";
  public static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;

  protected static final float  DEFAULT_LOCALITY_THRESHOLD = -1.0f;

  /** Cluster threshold for node locality. */
  public static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX +
      "locality.threshold.node";
  public static final float  DEFAULT_LOCALITY_THRESHOLD_NODE =
		  DEFAULT_LOCALITY_THRESHOLD;

  /** Cluster threshold for rack locality. */
  public static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX +
      "locality.threshold.rack";
  public static final float  DEFAULT_LOCALITY_THRESHOLD_RACK =
		  DEFAULT_LOCALITY_THRESHOLD;

  /**
   * Delay for node locality.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX +
      "locality-delay-node-ms";
  @Deprecated
  protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;

  /**
   * Delay for rack locality.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX +
      "locality-delay-rack-ms";
  @Deprecated
  protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;

  /**
   * Enable continuous scheduling or not.
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * the scheduler in larger cluster, more than 100 nodes, use
   * {@link #ASSIGN_MULTIPLE} to improve  container allocation ramp up.
   */
  @Deprecated
  public static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
      "continuous-scheduling-enabled";
  @Deprecated
  public static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;

  /**
   * Sleep time of each pass in continuous scheduling (5ms in default).
   * @deprecated Continuous scheduling is known to cause locking issue inside
   * Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
   */
  @Deprecated
  public static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
      "continuous-scheduling-sleep-ms";
  @Deprecated
  public static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;

  /** Whether preemption is enabled. */
  public static final String  PREEMPTION = CONF_PREFIX + "preemption";
  public static final boolean DEFAULT_PREEMPTION = false;

  protected static final String AM_PREEMPTION =
      CONF_PREFIX + "am.preemption";
  protected static final String AM_PREEMPTION_PREFIX =
          CONF_PREFIX + "am.preemption.";
  protected static final boolean DEFAULT_AM_PREEMPTION = true;

  protected static final String PREEMPTION_THRESHOLD =
      CONF_PREFIX + "preemption.cluster-utilization-threshold";
  protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;

  public static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX +
      "waitTimeBeforeKill";
  public static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;

  /**
   * Postfix for resource allocation increments in the
   * yarn.resource-types.{RESOURCE_NAME}.increment-allocation property.
   */
  static final String INCREMENT_ALLOCATION = ".increment-allocation";

  /**
   * Configurable delay (ms) before an app's starvation is considered after
   * it is identified. This is to give the scheduler enough time to
   * allocate containers post preemption. This delay is added to the
   * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
   *
   * This is intended to be a backdoor on production clusters, and hence
   * intentionally not documented.
   */
  public static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
      CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
  public static final long
      DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;

  /** Whether to assign multiple containers in one check-in. */
  public static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
  public static final boolean DEFAULT_ASSIGN_MULTIPLE = false;

  /** Whether to give more weight to apps requiring many resources. */
  public static final String  SIZE_BASED_WEIGHT = CONF_PREFIX +
      "sizebasedweight";
  public static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;

  /** Maximum number of containers to assign on each check-in. */
  public static final String DYNAMIC_MAX_ASSIGN =
      CONF_PREFIX + "dynamic.max.assign";
  private static final boolean DEFAULT_DYNAMIC_MAX_ASSIGN = true;

  /**
   * Specify exact number of containers to assign on each heartbeat, if dynamic
   * max assign is turned off.
   */
  public static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
  public static final int DEFAULT_MAX_ASSIGN = -1;

  /** The update interval for calculating resources in FairScheduler .*/
  public static final String UPDATE_INTERVAL_MS =
      CONF_PREFIX + "update-interval-ms";
  public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;

  /** Ratio of nodes available for an app to make an reservation on. */
  public static final String RESERVABLE_NODES =
          CONF_PREFIX + "reservable-nodes";
  public static final float RESERVABLE_NODES_DEFAULT = 0.05f;

  private static final String INVALID_RESOURCE_DEFINITION_PREFIX =
          "Error reading resource config--invalid resource definition: ";
  private static final String RESOURCE_PERCENTAGE_PATTERN =
      "^(-?(\\d+)(\\.\\d*)?)\\s*%\\s*";
  private static final String RESOURCE_VALUE_PATTERN =
      "^(-?\\d+)(\\.\\d*)?\\s*";
  /**
   * For resources separated by spaces instead of a comma.
   */
  private static final String RESOURCES_WITH_SPACES_PATTERN =
      "-?\\d+(?:\\.\\d*)?\\s*[a-z]+\\s*";

  public FairSchedulerConfiguration() {
    super();
  }
  
  public FairSchedulerConfiguration(Configuration conf) {
    super(conf);
  }

  public Resource getMinimumAllocation() {
    int mem = getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    int cpu = getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
    return Resources.createResource(mem, cpu);
  }

  public Resource getMaximumAllocation() {
    int mem = getInt(
        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    int cpu = getInt(
        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    return Resources.createResource(mem, cpu);
  }

  public Resource getIncrementAllocation() {
    Long memory = null;
    Integer vCores = null;
    Map<String, Long> others = new HashMap<>();
    ResourceInformation[] resourceTypes = ResourceUtils.getResourceTypesArray();
    for (int i=0; i < resourceTypes.length; ++i) {
      String name = resourceTypes[i].getName();
      String propertyKey = getAllocationIncrementPropKey(name);
      String propValue = get(propertyKey);
      if (propValue != null) {
        Matcher matcher = RESOURCE_REQUEST_VALUE_PATTERN.matcher(propValue);
        if (matcher.matches()) {
          long value = Long.parseLong(matcher.group(1));
          String unit = matcher.group(2);
          long valueInDefaultUnits = getValueInDefaultUnits(value, unit, name);
          others.put(name, valueInDefaultUnits);
        } else {
          throw new IllegalArgumentException("Property " + propertyKey +
              " is not in \"value [unit]\" format: " + propValue);
        }
      }
    }
    if (others.containsKey(ResourceInformation.MEMORY_MB.getName())) {
      memory = others.get(ResourceInformation.MEMORY_MB.getName());
      if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) != null) {
        String overridingKey = getAllocationIncrementPropKey(
                ResourceInformation.MEMORY_MB.getName());
        LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
            " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_MB +
            "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_MB) + " property");
      }
      others.remove(ResourceInformation.MEMORY_MB.getName());
    } else {
      memory = getLong(
          RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
          DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
    }
    if (others.containsKey(ResourceInformation.VCORES.getName())) {
      vCores = others.get(ResourceInformation.VCORES.getName()).intValue();
      if (get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) != null) {
        String overridingKey = getAllocationIncrementPropKey(
            ResourceInformation.VCORES.getName());
        LOG.warn("Configuration " + overridingKey + "=" + get(overridingKey) +
            " is overriding the " + RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES +
            "=" + get(RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES) + " property");
      }
      others.remove(ResourceInformation.VCORES.getName());
    } else {
      vCores = getInt(
          RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
          DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
    }
    return Resource.newInstance(memory, vCores, others);
  }

  private long getValueInDefaultUnits(long value, String unit,
      String resourceName) {
    return unit.isEmpty() ? value : UnitsConversionUtil.convert(unit,
        ResourceUtils.getDefaultUnit(resourceName), value);
  }

  private String getAllocationIncrementPropKey(String resourceName) {
    return YarnConfiguration.RESOURCE_TYPES + "." + resourceName +
        INCREMENT_ALLOCATION;
  }

  public float getReservationThresholdIncrementMultiple() {
    return getFloat(
      RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE,
      DEFAULT_RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE);
  }

  public float getLocalityThresholdNode() {
    return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
  }

  public float getLocalityThresholdRack() {
    return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
  }

  /**
   * Whether continuous scheduling is turned on.
   * @deprecated use {@link #ASSIGN_MULTIPLE} to improve container allocation
   * ramp up.
   * @return whether continuous scheduling is enabled
   */
  @Deprecated
  public boolean isContinuousSchedulingEnabled() {
    return getBoolean(CONTINUOUS_SCHEDULING_ENABLED,
        DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
  }

  /**
   * The sleep time of the continuous scheduler thread.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return sleep time in ms
   */
  @Deprecated
  public int getContinuousSchedulingSleepMs() {
    return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS,
        DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
  }

  /**
   * Delay in milliseconds for locality fallback node to rack.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return delay in ms
   */
  @Deprecated
  public long getLocalityDelayNodeMs() {
    return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
  }

  /**
   * Delay in milliseconds for locality fallback rack to other.
   * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
   * @return delay in ms
   */
  @Deprecated
  public long getLocalityDelayRackMs() {
    return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
  }

  public boolean getPreemptionEnabled() {
    return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
  }

  public boolean getAMPreemptionEnabled(String queueName) {
    String propertyName = AM_PREEMPTION_PREFIX + queueName;

    if (get(propertyName) != null) {
      boolean amPreemptionEnabled =
          getBoolean(propertyName, DEFAULT_AM_PREEMPTION);
      LOG.debug("AM preemption enabled for queue {}: {}",
          queueName, amPreemptionEnabled);
      return amPreemptionEnabled;
    }

    return getBoolean(AM_PREEMPTION, DEFAULT_AM_PREEMPTION);
  }

  public float getPreemptionUtilizationThreshold() {
    return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
  }

  public boolean getAssignMultiple() {
    return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
  }

  public boolean isMaxAssignDynamic() {
    return getBoolean(DYNAMIC_MAX_ASSIGN, DEFAULT_DYNAMIC_MAX_ASSIGN);
  }

  public int getMaxAssign() {
    return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
  }

  public boolean getSizeBasedWeight() {
    return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
  }

  public long getWaitTimeBeforeNextStarvationCheck() {
    return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
        DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
  }
  
  public int getWaitTimeBeforeKill() {
    return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
  }

  public boolean getUsePortForNodeName() {
    return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
  }

  public float getReservableNodes() {
    return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT);
  }

  /**
   * Parses a resource config value in one of three forms:
   * <ol>
   * <li>Percentage: &quot;50%&quot; or &quot;40% memory, 60% cpu&quot;</li>
   * <li>New style resources: &quot;vcores=10, memory-mb=1024&quot;
   * or &quot;vcores=60%, memory-mb=40%&quot;</li>
   * <li>Old style resources: &quot;1024 mb, 10 vcores&quot;</li>
   * </ol>
   * In new style resources, any resource that is not specified will be
   * set to {@link Long#MAX_VALUE} or 100%, as appropriate. Also, in the new
   * style resources, units are not allowed. Units are assumed from the resource
   * manager's settings for the resources when the value isn't a percentage.
   *
   * @param value the resource definition to parse
   * @return a {@link ConfigurableResource} that represents the parsed value
   * @throws AllocationConfigurationException if the raw value is not a valid
   * resource definition
   */
  public static ConfigurableResource parseResourceConfigValue(String value)
      throws AllocationConfigurationException {
    return parseResourceConfigValue(value, Long.MAX_VALUE);
  }

  /**
   * Parses a resource config value in one of three forms:
   * <ol>
   * <li>Percentage: &quot;50%&quot; or &quot;40% memory, 60% cpu&quot;</li>
   * <li>New style resources: &quot;vcores=10, memory-mb=1024&quot;
   * or &quot;vcores=60%, memory-mb=40%&quot;</li>
   * <li>Old style resources: &quot;1024 mb, 10 vcores&quot;</li>
   * </ol>
   * In new style resources, any resource that is not specified will be
   * set to {@code missing} or 0%, as appropriate. Also, in the new style
   * resources, units are not allowed. Units are assumed from the resource
   * manager's settings for the resources when the value isn't a percentage.
   *
   * The {@code missing} parameter is only used in the case of new style
   * resources without percentages. With new style resources with percentages,
   * any missing resources will be assumed to be 100% because percentages are
   * only used with maximum resource limits.
   *
   * @param value the resource definition to parse
   * @param missing the value to use for any unspecified resources
   * @return a {@link ConfigurableResource} that represents the parsed value
   * @throws AllocationConfigurationException if the raw value is not a valid
   * resource definition
   */
  public static ConfigurableResource parseResourceConfigValue(String value,
      long missing) throws AllocationConfigurationException {
    ConfigurableResource configurableResource;

    if (value.trim().isEmpty()) {
      throw new AllocationConfigurationException("Error reading resource "
          + "config--the resource string is empty.");
    }

    try {
      if (value.contains("=")) {
        configurableResource = parseNewStyleResource(value, missing);
      } else if (value.contains("%")) {
        configurableResource = parseOldStyleResourceAsPercentage(value);
      } else {
        configurableResource = parseOldStyleResource(value);
      }
    } catch (RuntimeException ex) {
      throw new AllocationConfigurationException(
          "Error reading resource config", ex);
    }

    return configurableResource;
  }

  private static ConfigurableResource parseNewStyleResource(String value,
          long missing) throws AllocationConfigurationException {

    final ConfigurableResource configurableResource;
    boolean asPercent = value.contains("%");
    if (asPercent) {
      configurableResource = new ConfigurableResource();
    } else {
      configurableResource = new ConfigurableResource(missing);
    }

    String[] resources = value.split(",");
    for (String resource : resources) {
      String[] parts = resource.split("=");

      if (parts.length != 2) {
        throw createConfigException(value,
                        "Every resource must be of the form: name=value.");
      }

      String resourceName = parts[0].trim();
      String resourceValue = parts[1].trim();
      try {
        if (asPercent) {
          double percentage = parseNewStyleResourceAsPercentage(value,
              resourceName, resourceValue);
          configurableResource.setPercentage(resourceName, percentage);
        } else {
          long parsedValue = parseNewStyleResourceAsAbsoluteValue(value,
              resourceValue, resourceName);
          configurableResource.setValue(resourceName, parsedValue);
        }
      } catch (ResourceNotFoundException ex) {
        throw createConfigException(value, "The "
            + "resource name, \"" + resourceName + "\" was not "
            + "recognized. Please check the value of "
            + YarnConfiguration.RESOURCE_TYPES + " in the Resource "
            + "Manager's configuration files.", ex);
      }
    }
    return configurableResource;
  }

  private static double parseNewStyleResourceAsPercentage(
      String value, String resource, String resourceValue)
      throws AllocationConfigurationException {
    try {
      return findPercentage(resourceValue, resource);
    } catch (AllocationConfigurationException ex) {
      throw createConfigException(value,
          "The resource values must all be percentages. \""
              + resourceValue + "\" is either not a non-negative number " +
              "or does not include the '%' symbol.", ex);
    }
  }

  private static long parseNewStyleResourceAsAbsoluteValue(String value,
      String resourceValue, String resourceName)
      throws AllocationConfigurationException {
    final long parsedValue;
    try {
      parsedValue = Long.parseLong(resourceValue);
    } catch (NumberFormatException e) {
      throw createConfigException(value, "The "
          + "resource values must all be integers. \"" + resourceValue
          + "\" is not an integer.", e);
    }
    if (parsedValue < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of " + resourceName +
              ": " + parsedValue + ", value should not be negative!");
    }
    return parsedValue;
  }

  private static ConfigurableResource parseOldStyleResourceAsPercentage(
          String value) throws AllocationConfigurationException {
    return new ConfigurableResource(
            getResourcePercentage(StringUtils.toLowerCase(value)));
  }

  private static ConfigurableResource parseOldStyleResource(String input)
          throws AllocationConfigurationException {
    final String lowerCaseInput = StringUtils.toLowerCase(input);
    String[] resources = lowerCaseInput.split(",");

    if (resources.length != 2) {
      resources = findOldStyleResourcesInSpaceSeparatedInput(lowerCaseInput);
      if (resources.length != 2) {
        throw new AllocationConfigurationException(
            "Cannot parse resource values from input: " + input);
      }
    }
    final int memory = parseOldStyleResourceMemory(resources);
    final int vcores = parseOldStyleResourceVcores(resources);
    return new ConfigurableResource(
            Resources.createResource(memory, vcores));
  }

  private static String[] findOldStyleResourcesInSpaceSeparatedInput(
      String input) {
    final Pattern pattern = Pattern.compile(RESOURCES_WITH_SPACES_PATTERN);
    final Matcher matcher = pattern.matcher(input);

    List<String> resources = Lists.newArrayList();
    while (matcher.find()) {
      resources.add(matcher.group(0));
    }
    return resources.toArray(new String[0]);
  }

  private static int parseOldStyleResourceMemory(String[] resources)
      throws AllocationConfigurationException {
    final int memory = findResource(resources, "mb");

    if (memory < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of memory: " + memory +
              ", value should not be negative!");
    }
    return memory;
  }

  private static int parseOldStyleResourceVcores(String[] resources)
      throws AllocationConfigurationException {
    final int vcores = findResource(resources, "vcores");

    if (vcores < 0) {
      throw new AllocationConfigurationException(
          "Invalid value of vcores: " + vcores +
              ", value should not be negative!");
    }
    return vcores;
  }

  private static double[] getResourcePercentage(String val)
      throws AllocationConfigurationException {
    int numberOfKnownResourceTypes = ResourceUtils
        .getNumberOfCountableResourceTypes();
    double[] resourcePercentage = new double[numberOfKnownResourceTypes];
    String[] values = val.split(",");

    if (values.length == 1) {
      double percentage = findPercentage(values, "");
      for (int i = 0; i < numberOfKnownResourceTypes; i++) {
        resourcePercentage[i] = percentage;
      }
    } else {
      resourcePercentage[0] = findPercentage(values, "memory");
      resourcePercentage[1] = findPercentage(values, "cpu");
    }

    return resourcePercentage;
  }

  private static double findPercentage(String resourceValue, String resource)
      throws AllocationConfigurationException {
    return findPercentageInternal(resource, resourceValue, false);
  }

  private static double findPercentage(String[] resourceValues, String resource)
      throws AllocationConfigurationException {
    String resourceValue = findResourceFromValues(resourceValues, resource);
    return findPercentageInternal(resource, resourceValue, true);
  }

  private static double findPercentageInternal(String resource,
      String resourceValue, boolean includeResourceInPattern)
      throws AllocationConfigurationException {
    final Pattern pattern;
    if (includeResourceInPattern) {
      pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN + resource);
    } else {
      pattern = Pattern.compile(RESOURCE_PERCENTAGE_PATTERN);
    }

    Matcher matcher = pattern.matcher(resourceValue);
    if (!matcher.matches()) {
      if (resource.equals("")) {
        throw new AllocationConfigurationException("Invalid percentage: " +
            resourceValue);
      } else {
        throw new AllocationConfigurationException("Invalid percentage of " +
            resource + ": " + resourceValue);
      }
    }
    double percentage = Double.parseDouble(matcher.group(1)) / 100.0;

    if (percentage < 0) {
      throw new AllocationConfigurationException("Invalid percentage: " +
          resourceValue + ", percentage should not be negative!");
    }

    return percentage;
  }

  private static AllocationConfigurationException createConfigException(
          String value, String message) {
    return createConfigException(value, message, null);
  }

  private static AllocationConfigurationException createConfigException(
      String value, String message, Throwable t) {
    String msg = INVALID_RESOURCE_DEFINITION_PREFIX + value + ". " + message;
    if (t != null) {
      return new AllocationConfigurationException(msg, t);
    } else {
      return new AllocationConfigurationException(msg);
    }
  }

  public long getUpdateInterval() {
    return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS);
  }
  
  private static int findResource(String[] resourceValues, String resource)
      throws AllocationConfigurationException {
    String resourceValue = findResourceFromValues(resourceValues, resource);
    final Pattern pattern = Pattern.compile(RESOURCE_VALUE_PATTERN +
        resource);
    Matcher matcher = pattern.matcher(resourceValue);
    if (!matcher.find()) {
      throw new AllocationConfigurationException("Invalid value of " +
          (resource.equals("mb") ? "memory" : resource) + ": " + resourceValue);
    }
    return Integer.parseInt(matcher.group(1));
  }

  private static String findResourceFromValues(String[] resourceValues,
      String resource) throws AllocationConfigurationException {
    for (String resourceValue : resourceValues) {
      if (resourceValue.contains(resource)) {
        return resourceValue.trim();
      }
    }
    throw new AllocationConfigurationException("Missing resource: " + resource);
  }
}