MockApplications.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;

import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

class MockApplications {
  private static final Logger LOG = LoggerFactory.getLogger(
      MockApplications.class);

  private String config;
  private ResourceCalculator resourceCalculator;
  private Map<String, CSQueue> nameToCSQueues;
  private Map<String, Resource> partitionToResource;
  private Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes;
  private Map<String, Set<String>> userMap = new HashMap<>();
  private Map<String, Map<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
  private int id = 1;

  MockApplications(String appsConfig,
      ResourceCalculator resourceCalculator,
      Map<String, CSQueue> nameToCSQueues,
      Map<String, Resource> partitionToResource,
      Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes) {
    this.config = appsConfig;
    this.resourceCalculator = resourceCalculator;
    this.nameToCSQueues = nameToCSQueues;
    this.partitionToResource = partitionToResource;
    this.nodeIdToSchedulerNodes = nodeIdToSchedulerNodes;
    init();
  }

  /**
   * Format is:
   * <pre>
   * queueName\t  // app1
   * (priority,resource,host,expression,#repeat,reserved)
   * (priority,resource,host,expression,#repeat,reserved);
   * queueName\t  // app2
   * </pre>
   */
  private void init() {
    int mulp = -1;
    for (String appConfig : config.split(";")) {
      String[] appConfigComponents = appConfig.split("\t");
      String queueName = appConfigComponents[0];
      if (mulp <= 0 && appConfigComponents.length > 2 && appConfigComponents[2] != null) {
        LOG.info("Mulp value: " + appConfigComponents[2]);
        mulp = 100 / (Integer.parseInt(appConfigComponents[2]));
      }

      String containersConfig = appConfigComponents[1];
      MockApplication mockApp = new MockApplication(id, containersConfig, queueName);
      new MockContainers(mockApp, nameToCSQueues, nodeIdToSchedulerNodes);
      add(mockApp);
      id++;
    }
    setupUserResourceUsagePerLabel(resourceCalculator, mulp);
  }

  private void add(MockApplication mockApp) {
    // add to LeafQueue
    LeafQueue queue = (LeafQueue) nameToCSQueues.get(mockApp.queueName);
    queue.getApplications().add(mockApp.app);
    queue.getAllApplications().add(mockApp.app);
    when(queue.getMinimumAllocation()).thenReturn(Resource.newInstance(1,1));
    when(mockApp.app.getCSLeafQueue()).thenReturn(queue);

    LOG.debug("Application mock: queue: " + mockApp.queueName + ", appId:" + mockApp.app);

    Set<String> users = userMap.computeIfAbsent(mockApp.queueName, k -> new HashSet<>());
    users.add(mockApp.app.getUser());

    String label = mockApp.app.getAppAMNodePartitionName();

    // Get label to queue
    Map<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue =
        userResourceUsagePerLabel.computeIfAbsent(label, k -> new HashMap<>());

    // Get queue to user based resource map
    Map<String, ResourceUsage> userResourceUsage =
        userResourceUsagePerQueue.computeIfAbsent(mockApp.queueName, k -> new HashMap<>());

    // Get user to its resource usage.
    ResourceUsage usage = userResourceUsage.get(mockApp.app.getUser());
    if (null == usage) {
      usage = new ResourceUsage();
      userResourceUsage.put(mockApp.app.getUser(), usage);
    }

    usage.incAMUsed(mockApp.app.getAMResource(label));
    usage.incUsed(mockApp.app.getAppAttemptResourceUsage().getUsed(label));
  }

  private void setupUserResourceUsagePerLabel(ResourceCalculator resourceCalculator,
      int mulp) {
    for (String label : userResourceUsagePerLabel.keySet()) {
      for (String queueName : userMap.keySet()) {
        LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
        // Currently we have user-limit test support only for default label.
        Resource toResourcePartition = partitionToResource.get("");
        Resource capacity = Resources.multiply(toResourcePartition,
            queue.getQueueCapacities().getAbsoluteCapacity());
        Set<String> users = userMap.get(queue.getQueueName());
        //TODO: Refactor this test class to use queue path internally like
        // CS does from now on
        if (users == null) {
          users = userMap.get(queue.getQueuePath());
        }
        when(queue.getAllUsers()).thenReturn(users);
        Resource userLimit = calculateUserLimit(resourceCalculator, mulp, capacity,
            users);
        LOG.debug("Updating user-limit from mock: toResourcePartition="
            + toResourcePartition + ", capacity=" + capacity
            + ", users.size()=" + users.size() + ", userLimit= " + userLimit
            + ",label= " + label + ",queueName= " + queueName);

        setupUserToQueueSettings(label, queueName, queue, users, userLimit);
      }
    }
  }

  private void setupUserToQueueSettings(String label, String queueName,
      LeafQueue queue, Set<String> users, Resource userLimit) {
    Map<String, ResourceUsage> userResourceUsage =
        userResourceUsagePerLabel.get(label).get(queueName);
    for (String userName : users) {
      User user = new User(userName);
      if (userResourceUsage != null) {
        user.setResourceUsage(userResourceUsage.get(userName));
      }
      when(queue.getUser(eq(userName))).thenReturn(user);
      when(queue.getOrCreateUser(eq(userName))).thenReturn(user);
      when(queue.getResourceLimitForAllUsers(eq(userName),
          any(Resource.class), anyString(), any(SchedulingMode.class)))
          .thenReturn(userLimit);
    }
  }

  private Resource calculateUserLimit(ResourceCalculator resourceCalculator,
      int mulp, Resource capacity, Set<String> users) {
    if (mulp > 0) {
      return Resources.divideAndCeil(resourceCalculator, capacity, mulp);
    } else {
      return Resources.divideAndCeil(resourceCalculator, capacity, users.size());
    }
  }
}