QueueCapacityConfigParser.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * A class that parses {@code QueueCapacityVector} from the capacity
 * configuration property set for a queue.
 *
 * A new syntax for capacity property could be implemented, by creating a parser
 * with a regex to match the pattern and a method that creates a
 * {@code QueueCapacityVector} from the matched pattern.
 * Extending the parsers field with a {@code Parser} object in the constructor
 * is needed in this case.
 *
 * A new capacity type for the existing parsers could be added by extending
 * the {@code QueueCapacityVector.QueueCapacityType} with a new type and its
 * associated postfix symbol.
 */
public class QueueCapacityConfigParser {
  private static final String UNIFORM_REGEX = "^([0-9.]+)(.*)";
  private static final String RESOURCE_REGEX = "^\\[([\\w\\.,\\-_%\\ /]+=[\\w\\.,\\-_%\\ /]+)+\\]$";

  private static final Pattern RESOURCE_PATTERN = Pattern.compile(RESOURCE_REGEX);
  private static final Pattern UNIFORM_PATTERN = Pattern.compile(UNIFORM_REGEX);
  public static final String FLOAT_DIGIT_REGEX = "[0-9.]";

  private final List<Parser> parsers = new ArrayList<>();

  public QueueCapacityConfigParser() {
    parsers.add(new Parser(RESOURCE_PATTERN, this::heterogeneousParser));
    parsers.add(new Parser(UNIFORM_PATTERN, this::uniformParser));
  }

  /**
   * Creates a {@code QueueCapacityVector} parsed from the capacity configuration
   * property set for a queue.
   * @param capacityString capacity string to parse
   * @param queuePath queue for which the capacity property is parsed
   * @return a parsed capacity vector
   */
  public QueueCapacityVector parse(String capacityString, QueuePath queuePath) {

    if (queuePath.isRoot()) {
      return QueueCapacityVector.of(100f, ResourceUnitCapacityType.PERCENTAGE);
    }

    if (capacityString == null) {
      return new QueueCapacityVector();
    }
    // Trim all spaces from capacity string
    capacityString = capacityString.replaceAll(" ", "");

    for (Parser parser : parsers) {
      Matcher matcher = parser.regex.matcher(capacityString);
      if (matcher.find()) {
        return parser.parser.apply(matcher);
      }
    }

    return new QueueCapacityVector();
  }

  /**
   * A parser method that is usable on uniform capacity values e.g. percentage or
   * weight.
   * @param matcher a regex matcher that contains parsed value and its possible
   *                suffix
   * @return a parsed capacity vector
   */
  private QueueCapacityVector uniformParser(Matcher matcher) {
    ResourceUnitCapacityType capacityType = null;
    String value = matcher.group(1);
    if (matcher.groupCount() == 2) {
      String matchedSuffix = matcher.group(2);
      for (ResourceUnitCapacityType suffix : ResourceUnitCapacityType.values()) {
        // Absolute uniform syntax is not supported
        if (suffix.equals(ResourceUnitCapacityType.ABSOLUTE)) {
          continue;
        }
        // when capacity is given in percentage, we do not need % symbol
        String uniformSuffix = suffix.getPostfix().replaceAll("%", "");
        if (uniformSuffix.equals(matchedSuffix)) {
          capacityType = suffix;
        }
      }
    }

    if (capacityType == null) {
      return new QueueCapacityVector();
    }

    return QueueCapacityVector.of(Float.parseFloat(value), capacityType);
  }

  /**
   * A parser method that is usable on resource capacity values e.g. mixed or
   * absolute resource.
   * @param matcher a regex matcher that contains the matched resource string
   * @return a parsed capacity vector
   */
  private QueueCapacityVector heterogeneousParser(Matcher matcher) {
    QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();

    /*
     * Absolute resource configuration for a queue will be grouped by "[]".
     * Syntax of absolute resource config could be like below
     * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
     */
    // Get the sub-group.
    String bracketedGroup = matcher.group(0);
    // Get the string inside starting and closing []
    bracketedGroup = bracketedGroup.substring(1, bracketedGroup.length() - 1);
    // Split by comma and equals delimiter eg. the string memory=1024,vcores=6
    // is converted to an array of array as {{memory,1024}, {vcores, 6}}
    for (String kvPair : bracketedGroup.trim().split(",")) {
      String[] splits = kvPair.split("=");

      // Ensure that each sub string is key value pair separated by '='.
      if (splits.length > 1) {
        setCapacityVector(capacityVector, splits[0], splits[1]);
      }
    }

    return capacityVector;
  }

  private void setCapacityVector(
      QueueCapacityVector resource, String resourceName, String resourceValue) {
    ResourceUnitCapacityType capacityType = ResourceUnitCapacityType.ABSOLUTE;

    // Extract suffix from a value e.g. for 6w extract w
    String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, "");
    if (!resourceValue.endsWith(suffix)) {
      return;
    }

    float parsedResourceValue = Float.parseFloat(resourceValue.substring(
        0, resourceValue.length() - suffix.length()));
    float convertedValue = parsedResourceValue;

    if (!suffix.isEmpty() && UnitsConversionUtil.KNOWN_UNITS.contains(suffix)) {
      // Convert all incoming units to MB if units is configured.
      convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue);
    } else {
      for (ResourceUnitCapacityType capacityTypeSuffix : ResourceUnitCapacityType.values()) {
        if (capacityTypeSuffix.getPostfix().equals(suffix)) {
          capacityType = capacityTypeSuffix;
        }
      }
    }

    resource.setResource(resourceName, convertedValue, capacityType);
  }

  /**
   * Checks whether the given capacity string is in a capacity vector compatible
   * format.
   * @param configuredCapacity capacity string
   * @return true, if capacity string is in capacity vector format,
   * false otherwise
   */
  public boolean isCapacityVectorFormat(String configuredCapacity) {
    if (configuredCapacity == null) {
      return false;
    }

    String formattedCapacityString = configuredCapacity.replaceAll(" ", "");
    return RESOURCE_PATTERN.matcher(formattedCapacityString).find();
  }

  private static class Parser {
    private final Pattern regex;
    private final Function<Matcher, QueueCapacityVector> parser;

    Parser(Pattern regex, Function<Matcher, QueueCapacityVector> parser) {
      this.regex = regex;
      this.parser = parser;
    }
  }

}