TestSchedulingWithAllocationRequestId.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Tests for checking Scheduling with allocationRequestId, i.e. mapping of
 * allocated containers to the original client {@code ResourceRequest}.
 */
public class TestSchedulingWithAllocationRequestId
    extends ParameterizedSchedulerTestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
  private static final int GB = 1024;

  public void initTestSchedulingWithAllocationRequestId(SchedulerType type) throws IOException {
    initParameterizedSchedulerTestBase(type);
  }

  @Override
  public YarnConfiguration getConf() {
    YarnConfiguration conf = super.getConf();
    if (getSchedulerType().equals(SchedulerType.FAIR)) {
      // Some tests here rely on being able to assign multiple containers with
      // a single heartbeat
      conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
    }
    return conf;
  }

  @Timeout(10)
  @ParameterizedTest(name = "{0}")
  @MethodSource("getParameters")
  public void testMultipleAllocationRequestIds(SchedulerType type) throws Exception {
    initParameterizedSchedulerTestBase(type);
    YarnConfiguration conf = getConf();
    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
      MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
      RMApp app1 = MockRMAppSubmitter.submitWithMemory(2048, rm);
      // kick the scheduling
      nm1.nodeHeartbeat(true);
      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
      am1.registerAppAttempt();

      // send requests for containers with id 10 & 20
      am1.allocate(am1.createReq(
          new String[] {"127.0.0.1"}, 2 * GB, 1, 1, 10L), null);
      am1.allocate(am1.createReq(
          new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);

      // check if request id 10 is satisfied
      AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
      List<Container> allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);

      // check now if request id 20 is satisfied
      allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
      allocated = allocResponse.getAllocatedContainers();
      assertEquals(2, allocated.size());
      for (Container container : allocated) {
        checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
      }
    } finally {
      if (rm != null) {
        rm.stop();
      }
    }
  }

  @Timeout(10)
  @ParameterizedTest(name = "{0}")
  @MethodSource("getParameters")
  public void testMultipleAllocationRequestDiffPriority(SchedulerType type) throws Exception {
    initTestSchedulingWithAllocationRequestId(type);
    YarnConfiguration conf = getConf();
    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
      MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
      RMApp app1 = MockRMAppSubmitter.submitWithMemory(2048, rm);
      // kick the scheduling
      nm1.nodeHeartbeat(true);
      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
      am1.registerAppAttempt();

      // send requests for containers with id 10 & 20
      am1.allocate(am1.createReq(
          new String[] {"127.0.0.1"}, 2 * GB, 2, 1, 10L), null);
      am1.allocate(am1.createReq(
          new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);

      // check if request id 20 is satisfied first
      AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
      List<Container> allocated = allocResponse.getAllocatedContainers();
      assertEquals(2, allocated.size());
      for (Container container : allocated) {
        checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
      }

      // check now if request id 10 is satisfied
      allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
      allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
    } finally {
      if (rm != null) {
        rm.stop();
      }
    }
  }

  private void checkAllocatedContainer(Container allocated, int memory,
      NodeId nodeId, long allocationRequestId) {
    assertEquals(memory, allocated.getResource().getMemorySize());
    assertEquals(nodeId, allocated.getNodeId());
    assertEquals(allocationRequestId,
        allocated.getAllocationRequestId());
  }

  @Timeout(10)
  @ParameterizedTest(name = "{0}")
  @MethodSource("getParameters")
  public void testMultipleAppsWithAllocationReqId(SchedulerType type) throws Exception {
    initTestSchedulingWithAllocationRequestId(type);
    YarnConfiguration conf = getConf();
    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      // Register node1
      String host0 = "host_0";
      String host1 = "host_1";
      MockNM nm1 =
          new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService());
      nm1.registerNode();

      // Register node2
      MockNM nm2 =
          new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService());
      nm2.registerNode();

      // submit 1st app
      MockRMAppSubmissionData data1 =
          MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
              .withAppName("user_0")
              .withUser("a1")
              .build();
      RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

      // Submit app1 RR with allocationReqId = 5
      int numContainers = 1;
      am1.allocate(am1.createReq(
          new String[] {host0, host1}, 1 * GB, 1, numContainers, 5L), null);

      // wait for container to be allocated.
      AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
      List<Container> allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);

      // Submit another application
      MockRMAppSubmissionData data =
          MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
              .withAppName("user_1")
              .withUser("a2")
              .build();
      RMApp app2 = MockRMAppSubmitter.submit(rm, data);
      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);

      // Submit app2 RR with allocationReqId = 5
      am2.allocate(am1.createReq(
          new String[] {host0, host1}, 2 * GB, 1, numContainers, 5L), null);

      // wait for container to be allocated.
      allocResponse = waitForAllocResponse(rm, am2, nm2, 1);
      allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);

      // Now submit app2 RR with allocationReqId = 10
      am2.allocate(am1.createReq(
          new String[] {host0, host1}, 3 * GB, 1, numContainers, 10L), null);

      // wait for container to be allocated.
      allocResponse = waitForAllocResponse(rm, am2, nm1, 1);
      allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);

      // Now submit app1 RR with allocationReqId = 10
      am1.allocate(am1.createReq(
          new String[] {host0, host1}, 4 * GB, 1, numContainers, 10L), null);

      // wait for container to be allocated.
      allocResponse = waitForAllocResponse(rm, am1, nm2, 1);
      allocated = allocResponse.getAllocatedContainers();
      assertEquals(1, allocated.size());
      checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
    } finally {
      if (rm != null) {
        rm.stop();
      }
    }
  }

  private AllocateResponse waitForAllocResponse(MockRM rm, MockAM am, MockNM nm,
      int size) throws Exception {
    AllocateResponse allocResponse = am.doHeartbeat();
    while (allocResponse.getAllocatedContainers().size() < size) {
      LOG.info("Waiting for containers to be created for app...");
      nm.nodeHeartbeat(true);
      ((AbstractYarnScheduler) rm.getResourceScheduler()).update();
      Thread.sleep(100);
      allocResponse = am.doHeartbeat();
    }
    return allocResponse;
  }

}