TestGpuResourceAllocator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;

import static org.apache.hadoop.test.MockitoUtil.verifyZeroInteractions;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider.initResourceTypes;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceAllocator.GpuAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/**
 * Unit tests for GpuResourceAllocator.
 */
public class TestGpuResourceAllocator {
  private static final int WAIT_PERIOD_FOR_RESOURCE = 100;

  private static class ContainerMatcher implements ArgumentMatcher<Container> {

    private Container container;

    ContainerMatcher(Container container) {
      this.container = container;
    }

    @Override
    public boolean matches(Container other) {
      long expectedId = container.getContainerId().getContainerId();
      long otherId = other.getContainerId().getContainerId();
      return expectedId == otherId;
    }
  }

  @Captor
  private ArgumentCaptor<List<Serializable>> gpuCaptor;

  @Mock
  private NMContext nmContext;

  @Mock
  private NMStateStoreService nmStateStore;

  private GpuResourceAllocator testSubject;

  @BeforeEach
  public void setup() {
    initResourceTypes(ResourceInformation.GPU_URI);
    MockitoAnnotations.initMocks(this);
    testSubject = createTestSubject(WAIT_PERIOD_FOR_RESOURCE);
  }

  private GpuResourceAllocator createTestSubject(int waitPeriodForResource) {
    when(nmContext.getNMStateStore()).thenReturn(nmStateStore);
    when(nmContext.getContainers()).thenReturn(new ConcurrentHashMap<>());
    return new GpuResourceAllocator(nmContext, waitPeriodForResource);
  }

  private Resource createGpuResourceRequest(int gpus) {
    Resource res = Resource.newInstance(1024, 1);

    if (gpus > 0) {
      res.setResourceValue(ResourceInformation.GPU_URI, gpus);
    }
    return res;
  }

  private List<Container> createMockContainers(int gpus,
      int numberOfContainers) {
    final long id = 111L;

    List<Container> containers = Lists.newArrayList();
    for (int i = 0; i < numberOfContainers; i++) {
      containers.add(createMockContainer(gpus, id + i));
    }
    return containers;
  }

  private Container createMockContainer(int gpus, long id) {
    Resource res = createGpuResourceRequest(gpus);
    ContainerId containerId = mock(ContainerId.class);
    when(containerId.getContainerId()).thenReturn(id);

    Container container = mock(Container.class);
    when(container.getResource()).thenReturn(res);
    when(container.getContainerId()).thenReturn(containerId);
    when(container.getContainerState()).thenReturn(ContainerState.RUNNING);
    nmContext.getContainers().put(containerId, container);

    return container;
  }

  private void createAndAddGpus(int numberOfGpus) {
    for (int i = 0; i < numberOfGpus; i++) {
      testSubject.addGpu(new GpuDevice(1, i));
    }

    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
    assertEquals(numberOfGpus, testSubject.getAllowedGpus().size());
    assertEquals(numberOfGpus, testSubject.getAvailableGpus());
  }

  private void addGpus(GpuDevice... gpus) {
    for (GpuDevice gpu : gpus) {
      testSubject.addGpu(gpu);
    }
    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
    assertEquals(gpus.length, testSubject.getAllowedGpus().size());
    assertEquals(gpus.length, testSubject.getAvailableGpus());
  }

  private void addGpusAndDontVerify(GpuDevice... gpus) {
    for (GpuDevice gpu : gpus) {
      testSubject.addGpu(gpu);
    }
  }

  private void setupContainerAsReleasingGpus(Container... releasingContainers) {
    ContainerState[] finalStates = new ContainerState[] {
        ContainerState.KILLING, ContainerState.DONE,
        ContainerState.LOCALIZATION_FAILED,
        ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_SUCCESS
    };

    final Random random = new Random();
    for (Container container : releasingContainers) {
      ContainerState state = finalStates[random.nextInt(finalStates.length)];
      when(container.getContainerState()).thenReturn(state);
      when(container.isContainerInFinalStates()).thenReturn(true);
    }
  }

  private void assertAllocatedGpu(GpuDevice expectedGpu, Container container,
      GpuAllocation allocation) throws IOException {
    assertEquals(1, allocation.getAllowedGPUs().size());
    assertEquals(0, allocation.getDeniedGPUs().size());

    Set<GpuDevice> allowedGPUs = allocation.getAllowedGPUs();

    GpuDevice allocatedGpu = allowedGPUs.iterator().next();
    assertEquals(expectedGpu, allocatedGpu);
    assertAssignmentInStateStore(expectedGpu, container);
  }

  private void assertAllocatedGpus(int gpus, int deniedGpus,
      Container container,
      GpuAllocation allocation) throws IOException {
    assertEquals(gpus, allocation.getAllowedGPUs().size());
    assertEquals(deniedGpus, allocation.getDeniedGPUs().size());
    assertAssignmentInStateStore(gpus, container);
  }

  private void assertNoAllocation(GpuAllocation allocation) {
    assertEquals(1, allocation.getDeniedGPUs().size());
    assertEquals(0, allocation.getAllowedGPUs().size());
    verifyZeroInteractions(nmStateStore);
  }

  private void assertAssignmentInStateStore(GpuDevice expectedGpu,
      Container container) throws IOException {
    verify(nmStateStore).storeAssignedResources(
        argThat(new ContainerMatcher(container)), eq(GPU_URI),
        gpuCaptor.capture());

    List<Serializable> gpuList = gpuCaptor.getValue();
    assertEquals(1, gpuList.size());
    assertEquals(expectedGpu, gpuList.get(0));
  }

  private void assertAssignmentInStateStore(int gpus,
      Container container) throws IOException {
    verify(nmStateStore).storeAssignedResources(
        argThat(new ContainerMatcher(container)), eq(GPU_URI),
        gpuCaptor.capture());

    List<Serializable> gpuList = gpuCaptor.getValue();
    assertEquals(gpus, gpuList.size());
  }

  private static Set<GpuAllocation> findDuplicates(
      List<GpuAllocation> allocations) {
    final Set<GpuAllocation> result = new HashSet<>();
    final Set<GpuAllocation> tmpSet = new HashSet<>();

    for (GpuAllocation allocation : allocations) {
      if (!tmpSet.add(allocation)) {
        result.add(allocation);
      }
    }
    return result;
  }

  @Test
  public void testNewGpuAllocatorHasEmptyCollectionOfDevices() {
    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
    assertEquals(0, testSubject.getAllowedGpus().size());
    assertEquals(0, testSubject.getAvailableGpus());
  }

  @Test
  public void testAddOneDevice() {
    addGpus(new GpuDevice(1, 1));
    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
  }

  @Test
  public void testAddMoreDevices() {
    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
  }

  @Test
  public void testAddMoreDevicesWithSameData() {
    addGpusAndDontVerify(new GpuDevice(1, 1), new GpuDevice(1, 1));
    assertEquals(0, testSubject.getDeviceAllocationMapping().size());
    assertEquals(0, testSubject.getAssignedGpus().size());
    assertEquals(1, testSubject.getAllowedGpus().size());
    assertEquals(1, testSubject.getAvailableGpus());
  }

  @Test
  public void testRequestZeroGpu() throws ResourceHandlerException {
    addGpus(new GpuDevice(1, 1));

    Container container = createMockContainer(0, 5L);
    GpuAllocation allocation =
        testSubject.assignGpus(container);

    assertNoAllocation(allocation);
  }

  @Test
  public void testRequestOneGpu() throws ResourceHandlerException, IOException {
    GpuDevice gpu = new GpuDevice(1, 1);
    addGpus(gpu);

    Container container = createMockContainer(1, 5L);
    GpuAllocation allocation =
        testSubject.assignGpus(container);

    assertEquals(1, testSubject.getDeviceAllocationMapping().size());
    assertEquals(1, testSubject.getAssignedGpus().size());
    assertEquals(1, testSubject.getAllowedGpus().size());
    assertEquals(0, testSubject.getAvailableGpus());

    assertAllocatedGpu(gpu, container, allocation);
  }

  @Test
  public void testRequestMoreThanAvailableGpu()
      throws ResourceHandlerException {
    ResourceHandlerException exception = assertThrows(ResourceHandlerException.class, () -> {
      addGpus(new GpuDevice(1, 1));
      Container container = createMockContainer(2, 5L);
      testSubject.assignGpus(container);
    });
    assertThat(exception.getMessage()).contains("Failed to find enough GPUs");
  }

  @Test
  public void testRequestMoreThanAvailableGpuAndOneContainerIsReleasingGpus()
      throws ResourceHandlerException, IOException {
    ResourceHandlerException exception = assertThrows(ResourceHandlerException.class, () -> {
      addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
      Container container = createMockContainer(2, 5L);
      GpuAllocation allocation = testSubject.assignGpus(container);
      assertAllocatedGpus(2, 1, container, allocation);

      assertEquals(2, testSubject.getDeviceAllocationMapping().size());
      assertEquals(2, testSubject.getAssignedGpus().size());
      assertEquals(3, testSubject.getAllowedGpus().size());
      assertEquals(1, testSubject.getAvailableGpus());

      setupContainerAsReleasingGpus(container);
      Container container2 = createMockContainer(2, 6L);
      GpuAllocation allocation2 = testSubject.assignGpus(container2);
      assertAllocatedGpus(2, 1, container, allocation2);
    });

    assertThat(exception.getMessage()).
        contains("as some other containers might not releasing GPUs");
  }

  @Test
  public void testThreeContainersJustTwoOfThemSatisfied()
      throws ResourceHandlerException, IOException {
    ResourceHandlerException exception =
        assertThrows(ResourceHandlerException.class, () -> {
          addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2),
              new GpuDevice(1, 3), new GpuDevice(1, 4),
              new GpuDevice(1, 5), new GpuDevice(1, 6));
          Container container = createMockContainer(3, 5L);
          Container container2 = createMockContainer(2, 6L);
          Container container3 = createMockContainer(2, 6L);

          GpuAllocation allocation = testSubject.assignGpus(container);
          assertAllocatedGpus(3, 3, container, allocation);
          assertEquals(3, testSubject.getDeviceAllocationMapping().size());
          assertEquals(3, testSubject.getAssignedGpus().size());
          assertEquals(6, testSubject.getAllowedGpus().size());
          assertEquals(3, testSubject.getAvailableGpus());

          GpuAllocation allocation2 = testSubject.assignGpus(container2);
          assertAllocatedGpus(2, 4, container2, allocation2);
          assertEquals(5, testSubject.getDeviceAllocationMapping().size());
          assertEquals(5, testSubject.getAssignedGpus().size());
          assertEquals(6, testSubject.getAllowedGpus().size());
          assertEquals(1, testSubject.getAvailableGpus());

          testSubject.assignGpus(container3);
        });

    assertThat(exception.getMessage()).contains("Failed to find enough GPUs");
  }

  @Test
  public void testReleaseAndAssignGpus()
      throws ResourceHandlerException, IOException {
    addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
    Container container = createMockContainer(2, 5L);
    GpuAllocation allocation = testSubject.assignGpus(container);
    assertAllocatedGpus(2, 1, container, allocation);

    assertEquals(2, testSubject.getDeviceAllocationMapping().size());
    assertEquals(2, testSubject.getAssignedGpus().size());
    assertEquals(3, testSubject.getAllowedGpus().size());
    assertEquals(1, testSubject.getAvailableGpus());

    setupContainerAsReleasingGpus(container);
    Container container2 = createMockContainer(2, 6L);
    try {
      testSubject.assignGpus(container2);
    } catch (ResourceHandlerException e) {
      //intended as we have not enough GPUs available
    }

    assertEquals(2, testSubject.getDeviceAllocationMapping().size());
    assertEquals(2, testSubject.getAssignedGpus().size());
    assertEquals(3, testSubject.getAllowedGpus().size());
    assertEquals(1, testSubject.getAvailableGpus());

    testSubject.unassignGpus(container.getContainerId());
    GpuAllocation allocation2 = testSubject.assignGpus(container2);
    assertAllocatedGpus(2, 1, container, allocation2);
  }

  @Test
  public void testCreateLotsOfContainersVerifyGpuAssignmentsAreCorrect()
      throws ResourceHandlerException, IOException {
    createAndAddGpus(100);

    List<Container> containers = createMockContainers(3, 33);
    List<GpuAllocation> allocations = Lists.newArrayList();
    for (Container container : containers) {
      GpuAllocation allocation = testSubject.assignGpus(container);
      allocations.add(allocation);
      assertAllocatedGpus(3, 97, container, allocation);
    }

    assertEquals(99, testSubject.getDeviceAllocationMapping().size());
    assertEquals(99, testSubject.getAssignedGpus().size());
    assertEquals(100, testSubject.getAllowedGpus().size());
    assertEquals(1, testSubject.getAvailableGpus());

    Set<GpuAllocation> duplicateAllocations = findDuplicates(allocations);
    assertEquals(0, duplicateAllocations.size());
  }

  @Test
  public void testGpuGetsUnassignedWhenStateStoreThrowsException()
      throws ResourceHandlerException, IOException {
    assertThrows(ResourceHandlerException.class, () -> {
      doThrow(new IOException("Failed to save container mappings " +
          "to NM state store!"))
          .when(nmStateStore).storeAssignedResources(any(Container.class),
          anyString(), anyList());

      createAndAddGpus(1);
      Container container = createMockContainer(1, 5L);
      testSubject.assignGpus(container);
    });
  }
}