QueueAssertionBuilder.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
 * Provides a fluent API to assert resource and capacity attributes of queues.
 */
class QueueAssertionBuilder {
  private static final String EFFECTIVE_MAX_RES_INFO = "Effective Maximum Resource";
  private static final BiFunction<QueueResourceQuotas, String, Resource> EFFECTIVE_MAX_RES =
      QueueResourceQuotas::getEffectiveMaxResource;

  private static final String EFFECTIVE_MIN_RES_INFO = "Effective Minimum Resource";
  private static final BiFunction<QueueResourceQuotas, String, Resource> EFFECTIVE_MIN_RES =
      QueueResourceQuotas::getEffectiveMinResource;

  private static final String CAPACITY_INFO = "Capacity";
  private static final BiFunction<QueueCapacities, String, Float> CAPACITY =
      QueueCapacities::getCapacity;

  private static final String ABS_CAPACITY_INFO = "Absolute Capacity";
  private static final BiFunction<QueueCapacities, String, Float> ABS_CAPACITY =
      QueueCapacities::getAbsoluteCapacity;

  private static final String ASSERTION_ERROR_MESSAGE =
      "'%s' of queue '%s' does not match %f for label %s";
  private static final String RESOURCE_ASSERTION_ERROR_MESSAGE =
      "'%s' of queue '%s' does not match %s for label %s";
  private final CapacityScheduler cs;

  QueueAssertionBuilder(CapacityScheduler cs) {
    this.cs = cs;
  }

  public class QueueAssertion {
    private final QueuePath queuePath;
    private final List<QueueAssertion.ValueAssertion> assertions = new ArrayList<>();

    QueueAssertion(QueuePath queuePath) {
      this.queuePath = queuePath;
    }


    public QueueAssertion withQueue(QueuePath queuePath) {
      return QueueAssertionBuilder.this.withQueue(queuePath);
    }

    public QueueAssertionBuilder build() {
      return QueueAssertionBuilder.this.build();
    }

    public QueueAssertion assertEffectiveMaxResource(Resource expected) {
      ValueAssertion valueAssertion = new ValueAssertion(expected);
      valueAssertion.withResourceSupplier(EFFECTIVE_MAX_RES, EFFECTIVE_MAX_RES_INFO);
      assertions.add(valueAssertion);

      return this;
    }

    public QueueAssertion assertEffectiveMinResource(Resource expected, String label) {
      ValueAssertion valueAssertion = new ValueAssertion(expected);
      valueAssertion.withResourceSupplier(EFFECTIVE_MIN_RES, EFFECTIVE_MIN_RES_INFO);
      assertions.add(valueAssertion);
      valueAssertion.label = label;

      return this;
    }

    public QueueAssertion assertEffectiveMinResource(Resource expected) {
      return assertEffectiveMinResource(expected, NO_LABEL);
    }

    public QueueAssertion assertCapacity(double expected) {
      ValueAssertion valueAssertion = new ValueAssertion(expected);
      valueAssertion.withCapacitySupplier(CAPACITY, CAPACITY_INFO);
      assertions.add(valueAssertion);

      return this;
    }

    public QueueAssertion assertAbsoluteCapacity(double expected) {
      ValueAssertion valueAssertion = new ValueAssertion(expected);
      valueAssertion.withCapacitySupplier(ABS_CAPACITY, ABS_CAPACITY_INFO);
      assertions.add(valueAssertion);

      return this;
    }

    private class ValueAssertion {
      private double expectedValue = 0;
      private Resource expectedResource = null;
      private String assertionType;
      private Supplier<Float> valueSupplier;
      private Supplier<Resource> resourceSupplier;
      private String label = "";

      ValueAssertion(double expectedValue) {
        this.expectedValue = expectedValue;
      }

      ValueAssertion(Resource expectedResource) {
        this.expectedResource = expectedResource;
      }

      public void setLabel(String label) {
        this.label = label;
      }

      public void withResourceSupplier(
          BiFunction<QueueResourceQuotas, String, Resource> assertion, String messageInfo) {
        CSQueue queue = cs.getQueue(queuePath.getFullPath());
        if (queue == null) {
          fail("Queue " + queuePath + " is not found");
        }

        assertionType = messageInfo;
        resourceSupplier = () -> assertion.apply(queue.getQueueResourceQuotas(), label);
      }

      public void withCapacitySupplier(
          BiFunction<QueueCapacities, String, Float> assertion, String messageInfo) {
        CSQueue queue = cs.getQueue(queuePath.getFullPath());
        if (queue == null) {
          fail("Queue " + queuePath + " is not found");
        }
        assertionType = messageInfo;
        valueSupplier = () -> assertion.apply(queue.getQueueCapacities(), label);
      }
    }

  }

  private final Map<QueuePath, QueueAssertion> assertions = new LinkedHashMap<>();

  public QueueAssertionBuilder build() {
    return this;
  }

  /**
   * Creates a new assertion group for a specific queue.
   * @param queuePath path of the queue
   * @return queue assertion group
   */
  public QueueAssertion withQueue(QueuePath queuePath) {
    assertions.putIfAbsent(queuePath, new QueueAssertion(queuePath));
    return assertions.get(queuePath);
  }

  /**
   * Executes assertions created for all queues.
   */
  public void finishAssertion() {
    for (Map.Entry<QueuePath, QueueAssertion> assertionEntry : assertions.entrySet()) {
      for (QueueAssertion.ValueAssertion assertion : assertionEntry.getValue().assertions) {
        if (assertion.resourceSupplier != null) {
          String errorMessage = String.format(RESOURCE_ASSERTION_ERROR_MESSAGE,
              assertion.assertionType, assertionEntry.getKey(),
              assertion.expectedResource.toString(), assertion.label);
          assertEquals(assertion.expectedResource,
              assertion.resourceSupplier.get(), errorMessage);
        } else {
          String errorMessage = String.format(ASSERTION_ERROR_MESSAGE,
              assertion.assertionType, assertionEntry.getKey(), assertion.expectedValue,
              assertion.label);
          assertEquals(assertion.expectedValue,
              assertion.valueSupplier.get(), EPSILON, errorMessage);
        }
      }
    }
  }

  /**
   * Returns all queues that have defined assertions.
   * @return queue paths
   */
  public Set<QueuePath> getQueues() {
    return assertions.keySet();
  }
}