TestAMRMClientRelayerMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.metrics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType;
import org.apache.hadoop.yarn.util.Records;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

/**
 * Unit test for AMRMClientRelayer.
 */
public class TestAMRMClientRelayerMetrics {

  /**
   * Mock AMS for easier testing and mocking of request/responses.
   */
  public static class MockApplicationMasterService
      implements ApplicationMasterProtocol {

    private boolean failover = false;
    private boolean exception = false;
    private List<ResourceRequest> lastAsk;
    private List<ContainerId> lastRelease;
    private List<UpdateContainerRequest> lastUpdates;
    private List<String> lastBlacklistAdditions;
    private List<String> lastBlacklistRemovals;
    private AllocateResponse response = AllocateResponse
        .newInstance(0, null, null, new ArrayList<NodeReport>(),
            Resource.newInstance(0, 0), null, 0, null, null);

    @Override
    public RegisterApplicationMasterResponse registerApplicationMaster(
        RegisterApplicationMasterRequest request)
        throws YarnException, IOException {
      return null;
    }

    @Override
    public FinishApplicationMasterResponse finishApplicationMaster(
        FinishApplicationMasterRequest request)
        throws YarnException, IOException {
      if (this.failover) {
        this.failover = false;
        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
      }
      return null;
    }

    @Override
    public AllocateResponse allocate(AllocateRequest request)
        throws YarnException, IOException {
      if (this.failover) {
        this.failover = false;
        throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
      }
      if(this.exception){
        this.exception = false;
        throw new YarnException("Mock RM encountered exception");
      }
      this.lastAsk = request.getAskList();
      this.lastRelease = request.getReleaseList();
      this.lastUpdates = request.getUpdateRequests();
      this.lastBlacklistAdditions =
          request.getResourceBlacklistRequest().getBlacklistAdditions();
      this.lastBlacklistRemovals =
          request.getResourceBlacklistRequest().getBlacklistRemovals();
      return response;
    }

    public void setFailoverFlag() {
      this.failover = true;
    }
  }

  private Configuration conf;
  private MockApplicationMasterService mockAMS;
  private String homeID = "home";
  private AMRMClientRelayer homeRelayer;
  private String uamID = "uam";
  private AMRMClientRelayer uamRelayer;

  private List<ResourceRequest> asks = new ArrayList<>();
  private List<ContainerId> releases = new ArrayList<>();
  private List<UpdateContainerRequest> updates = new ArrayList<>();
  private List<String> blacklistAdditions = new ArrayList<>();
  private List<String> blacklistRemoval = new ArrayList<>();

  @BeforeEach
  public void setup() throws YarnException, IOException {
    this.conf = new Configuration();

    this.mockAMS = new MockApplicationMasterService();

    this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
        ApplicationId.newInstance(0, 0), this.homeID, conf);
    this.homeRelayer.registerApplicationMaster(
        RegisterApplicationMasterRequest.newInstance("", 0, ""));

    this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
        ApplicationId.newInstance(0, 0), this.uamID, conf);
    this.uamRelayer.registerApplicationMaster(
        RegisterApplicationMasterRequest.newInstance("", 0, ""));

    clearAllocateRequestLists();

    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(homeID, RequestType.Guaranteed, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(homeID, RequestType.Opportunistic, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(homeID, RequestType.Promote, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(homeID, RequestType.Demote, 0);

    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(uamID, RequestType.Guaranteed, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(uamID, RequestType.Opportunistic, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(uamID, RequestType.Promote, 0);
    AMRMClientRelayerMetrics.getInstance()
        .setClientPending(uamID, RequestType.Demote, 0);
  }

  private AllocateRequest getAllocateRequest() {
    // Need to create a new one every time because rather than directly
    // referring the lists, the protobuf impl makes a copy of the lists
    return AllocateRequest.newBuilder()
        .responseId(0)
        .progress(0).askList(asks)
        .releaseList(new ArrayList<>(this.releases))
        .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
            new ArrayList<>(this.blacklistAdditions),
            new ArrayList<>(this.blacklistRemoval)))
        .updateRequests(new ArrayList<>(this.updates))
        .build();
  }

  private void clearAllocateRequestLists() {
    this.asks.clear();
    this.releases.clear();
    this.updates.clear();
    this.blacklistAdditions.clear();
    this.blacklistRemoval.clear();
  }

  private static UpdateContainerRequest createPromote(int id){
    return UpdateContainerRequest.newInstance(0, createContainerId(id),
        ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
        ExecutionType.GUARANTEED);
  }

  private static UpdateContainerRequest createDemote(int id){
    return UpdateContainerRequest.newInstance(0, createContainerId(id),
        ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
        ExecutionType.OPPORTUNISTIC);
  }

  private static ContainerId createContainerId(int id) {
    return ContainerId.newContainerId(
        ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
        id);
  }

  public ResourceRequest createResourceRequest(long id, String resource,
      int memory, int vCores, int priority, ExecutionType execType,
      int containers) {
    ResourceRequest req = Records.newRecord(ResourceRequest.class);
    req.setAllocationRequestId(id);
    req.setResourceName(resource);
    req.setCapability(Resource.newInstance(memory, vCores));
    req.setPriority(Priority.newInstance(priority));
    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
    req.setNumContainers(containers);
    return req;
  }

  @Test
  public void testGPending() throws YarnException, IOException {
    // Ask for two containers, one with location preference
    this.asks.add(
        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(
        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
        ExecutionType.GUARANTEED, 2));
    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Ask from the uam
    this.uamRelayer.allocate(getAllocateRequest());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Update the any to ask for an extra container
    this.asks.get(2).setNumContainers(3);
    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(3, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Update the any to ask to pretend a container was allocated
    this.asks.get(2).setNumContainers(2);
    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());
  }

  @Test
  public void testPromotePending() throws YarnException, IOException {
    // Ask to promote 3 containers
    this.updates.add(createPromote(1));
    this.updates.add(createPromote(2));
    this.updates.add(createPromote(3));

    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(3, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Promote).value());

    // Demote 2 containers, one of which is pending promote
    this.updates.remove(createPromote(3));
    this.updates.add(createDemote(3));
    this.updates.add(createDemote(4));

    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Promote).value());

    // Let the RM respond with two successful promotions, one of which
    // was pending promote
    List<UpdatedContainer> updated = new ArrayList<>();
    updated.add(UpdatedContainer
        .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
            .newInstance(createContainerId(2), null, null, null,
                null, null)));
    updated.add(UpdatedContainer
        .newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
            .newInstance(createContainerId(5), null, null, null,
                null, null)));
    this.mockAMS.response.setUpdatedContainers(updated);

    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(1, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Promote).value());

    // Remove the promoted container and clean up response
    this.mockAMS.response.getUpdatedContainers().clear();
    this.updates.remove(createPromote(2));

    // Let the RM respond with two completed containers, one of which was
    // pending promote
    List<ContainerStatus> completed = new ArrayList<>();
    completed
        .add(ContainerStatus.newInstance(createContainerId(1), null, "", 0));
    completed
        .add(ContainerStatus.newInstance(createContainerId(6), null, "", 0));
    this.mockAMS.response.setCompletedContainersStatuses(completed);

    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Promote).value());
  }

  @Test
  public void testCleanUpOnFinish() throws YarnException, IOException {
    // Ask for two containers, one with location preference
    this.asks.add(
        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(
        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
        ExecutionType.GUARANTEED, 2));

    // Ask to promote 3 containers
    this.updates.add(createPromote(1));
    this.updates.add(createPromote(2));
    this.updates.add(createPromote(3));

    // Run the allocate call to start tracking pending
    this.homeRelayer.allocate(getAllocateRequest());

    // After finish, the metrics should reset to zero
    this.homeRelayer.shutdown();

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Promote).value());
  }

  @Test
  public void testFailover() throws YarnException, IOException {
    // Ask for two containers, one with location preference
    this.asks.add(
        createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(
        createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
            1));
    this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
        ExecutionType.GUARANTEED, 2));

    long previousSuccess = AMRMClientRelayerMetrics.getInstance()
        .getHeartbeatSuccessMetric(homeID).value();
    long previousFailover = AMRMClientRelayerMetrics.getInstance()
        .getRMMasterSlaveSwitchMetric(homeID).value();
    // Set failover to trigger
    mockAMS.failover = true;
    this.homeRelayer.allocate(getAllocateRequest());
    // The failover metric should be incremented
    assertEquals(++previousFailover,
        AMRMClientRelayerMetrics.getInstance()
        .getRMMasterSlaveSwitchMetric(homeID).value());

    // The success metric should be incremented once
    assertEquals(++previousSuccess,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatSuccessMetric(homeID).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Ask from the uam
    this.uamRelayer.allocate(getAllocateRequest());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Update the any to ask for an extra container
    this.asks.get(2).setNumContainers(3);
    mockAMS.failover = true;
    this.homeRelayer.allocate(getAllocateRequest());
    // The failover metric should be incremented
    assertEquals(++previousFailover,
        AMRMClientRelayerMetrics.getInstance()
            .getRMMasterSlaveSwitchMetric(homeID).value());

    // The success metric should be incremented once
    assertEquals(++previousSuccess,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatSuccessMetric(homeID).value());

    assertEquals(3, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    // Update the any to ask to pretend a container was allocated
    this.asks.get(2).setNumContainers(2);
    mockAMS.failover = true;
    this.homeRelayer.allocate(getAllocateRequest());
    // The failover metric should be incremented
    assertEquals(++previousFailover,
        AMRMClientRelayerMetrics.getInstance()
            .getRMMasterSlaveSwitchMetric(homeID).value());

    // The success metric should be incremented once
    assertEquals(++previousSuccess,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatSuccessMetric(homeID).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(2, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());

    long previousFailure = AMRMClientRelayerMetrics.getInstance()
        .getHeartbeatFailureMetric(homeID).value();

    mockAMS.exception = true;
    try{
      this.homeRelayer.allocate(getAllocateRequest());
      fail();
    } catch (YarnException e){
    }
    // The failover metric should not be incremented
    assertEquals(previousFailover,
        AMRMClientRelayerMetrics.getInstance()
            .getRMMasterSlaveSwitchMetric(homeID).value());

    // The success metric should not be incremented
    assertEquals(previousSuccess,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatSuccessMetric(homeID).value());

    // The failure metric should be incremented
    assertEquals(++previousFailure,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatFailureMetric(homeID).value());

    mockAMS.failover = true;
    mockAMS.exception = true;
    try{
      this.homeRelayer.allocate(getAllocateRequest());
      fail();
    } catch (YarnException e){
    }
    // The failover metric should be incremented
    assertEquals(++previousFailover,
        AMRMClientRelayerMetrics.getInstance()
            .getRMMasterSlaveSwitchMetric(homeID).value());

    // The success metric should not be incremented
    assertEquals(previousSuccess,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatSuccessMetric(homeID).value());

    // The failure metric should be incremented
    assertEquals(++previousFailure,
        AMRMClientRelayerMetrics.getInstance()
            .getHeartbeatFailureMetric(homeID).value());
  }

  @Test
  public void testNewEmptyRequest()
      throws YarnException, IOException {
    // Ask for zero containers
    this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1,
        ExecutionType.GUARANTEED, 0));
    this.homeRelayer.allocate(getAllocateRequest());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(homeID, RequestType.Guaranteed).value());

    assertEquals(0, AMRMClientRelayerMetrics.getInstance()
        .getPendingMetric(uamID, RequestType.Guaranteed).value());
  }
}