MockApplication.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class MockApplication {
  private static final Logger LOG = LoggerFactory.getLogger(MockApplication.class);
  private List<RMContainer> liveContainers = new ArrayList<>();
  private List<RMContainer> reservedContainers = new ArrayList<>();

  private ApplicationId appId;
  final String containersConfig;
  final String queueName;
  ApplicationAttemptId appAttemptId;
  FiCaSchedulerApp app;

  MockApplication(int id, String containersConfig, String queueName) {
    this.appId = ApplicationId.newInstance(0L, id);
    this.containersConfig = containersConfig;
    this.queueName = queueName;

    //dynamic fields
    this.appAttemptId = ApplicationAttemptId
        .newInstance(appId, 1);
    //this must be the last step
    setupInitialMocking(queueName);
  }

  private void setupInitialMocking(String queueName) {
    this.app = mock(FiCaSchedulerApp.class);
    when(app.getAMResource(anyString()))
        .thenReturn(Resources.createResource(0, 0));
    when(app.getLiveContainers()).thenReturn(liveContainers);
    when(app.getReservedContainers()).thenReturn(reservedContainers);
    when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
    when(app.getApplicationId()).thenReturn(appId);
    when(app.getQueueName()).thenReturn(queueName);
  }

  private void addLiveContainer(RMContainer c) {
    this.liveContainers.add(c);
  }

  private void addReservedContainer(RMContainer c) {
    this.reservedContainers.add(c);
  }

  void addMockContainer(MockContainer mockContainer,
      FiCaSchedulerNode schedulerNode, LeafQueue queue) {
    int containerId = mockContainer.containerId;
    ContainerSpecification containerSpec = mockContainer.containerSpec;

    if (containerId == 1) {
      when(app.getAMResource(containerSpec.label)).thenReturn(containerSpec.resource);
      when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
    }

    if (containerSpec.reserved) {
      addReservedContainer(mockContainer.rmContainerMock);
    } else {
      addLiveContainer(mockContainer.rmContainerMock);
    }

    // Add container to scheduler-node
    addContainerToSchedulerNode(schedulerNode, mockContainer.rmContainerMock, containerSpec.reserved);

    // If this is a non-exclusive allocation
    String partition = null;
    if (containerSpec.label.isEmpty()
        && !(partition = schedulerNode.getPartition())
        .isEmpty()) {
      Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
          .getIgnoreExclusivityRMContainers();
      if (!ignoreExclusivityContainers.containsKey(partition)) {
        ignoreExclusivityContainers.put(partition, new TreeSet<>());
      }
      ignoreExclusivityContainers.get(partition).add(mockContainer.rmContainerMock);
      LOG.info("Added an ignore-exclusivity container to partition {}, new size is: {}", partition, ignoreExclusivityContainers.get(partition).size());

    }
    LOG.debug("add container to app=" + appAttemptId + " res=" + containerSpec.resource + " node="
        + containerSpec.nodeId + " nodeLabelExpression=" + containerSpec.label + " partition="
        + partition);
  }

  void addAggregatedContainerData(ContainerSpecification containerSpec,
      Resource usedResources) {
    // If app has 0 container, and it has only pending, still make sure to
    // update label.
    if (containerSpec.repeat == 0) {
      when(app.getAppAMNodePartitionName()).thenReturn(containerSpec.label);
    }

    // Some more app specific aggregated data can be better filled here.
    when(app.getPriority()).thenReturn(containerSpec.priority);
    when(app.getUser()).thenReturn(containerSpec.username);
    when(app.getCurrentConsumption()).thenReturn(usedResources);
    when(app.getCurrentReservation())
        .thenReturn(Resources.createResource(0, 0));

    Map<String, Resource> pendingForDefaultPartition =
        new HashMap<>();
    // Add for default partition for now.
    pendingForDefaultPartition.put(containerSpec.label, containerSpec.pendingResource);
    when(app.getTotalPendingRequestsPerPartition())
        .thenReturn(pendingForDefaultPartition);

    // need to set pending resource in resource usage as well
    ResourceUsage ru = Mockito.spy(new ResourceUsage());
    ru.setUsed(containerSpec.label, usedResources);
    when(ru.getCachedUsed(anyString())).thenReturn(usedResources);
    when(app.getAppAttemptResourceUsage()).thenReturn(ru);
    when(app.getSchedulingResourceUsage()).thenReturn(ru);
  }

  private void addContainerToSchedulerNode(SchedulerNode node, RMContainer container,
      boolean isReserved) {
    assert node != null;

    if (isReserved) {
      when(node.getReservedContainer()).thenReturn(container);
    } else {
      node.getCopiedListOfRunningContainers().add(container);
      Resources.subtractFrom(node.getUnallocatedResource(),
          container.getAllocatedResource());
    }
  }
}