TestVolumeProcessor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

/**
 * Test cases for volume processor.
 */
public class TestVolumeProcessor {

  private static final int GB = 1024;
  private YarnConfiguration conf;
  private RMNodeLabelsManager mgr;
  private MockRM rm;
  private MockNM[] mockNMS;
  private RMNode[] rmNodes;
  private static final int NUM_OF_NMS = 4;
  // resource-types.xml will be created under target/test-classes/ dir,
  // it must be deleted after the test is done, to avoid it from reading
  // by other UT classes.
  private File resourceTypesFile = null;

  private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume";

  @BeforeEach
  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    resourceTypesFile = new File(conf.getClassLoader()
        .getResource(".").getPath(), "resource-types.xml");
    writeTmpResourceTypesFile(resourceTypesFile);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
    conf.set("yarn.scheduler.capacity.resource-calculator",
        "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
    conf.set(CapacitySchedulerConfiguration.PREFIX
        + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
        "fair");
    // this is required to enable volume processor
    conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
        VolumeAMSProcessor.class.getName());
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    rm = new MockRM(conf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    mockNMS = new MockNM[NUM_OF_NMS];
    rmNodes = new RMNode[NUM_OF_NMS];
    for (int i = 0; i < 4; i++) {
      mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId());
    }
  }

  @AfterEach
  public void tearDown() {
    if (resourceTypesFile != null && resourceTypesFile.exists()) {
      resourceTypesFile.delete();
    }
  }

  private void writeTmpResourceTypesFile(File tmpFile) throws IOException {
    Configuration yarnConf = new YarnConfiguration();
    yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME);
    yarnConf.set("yarn.resource-types."
        + VOLUME_RESOURCE_NAME + ".units", "Mi");
    yarnConf.set("yarn.resource-types."
            + VOLUME_RESOURCE_NAME + ".tags",
        CsiConstants.CSI_VOLUME_RESOURCE_TAG);
    try (FileWriter fw = new FileWriter(tmpFile)) {
      yarnConf.writeXml(fw);
    }
  }

  @Test
  @Timeout(10)
  public void testVolumeProvisioning() throws Exception {
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
    Resource resource = Resource.newInstance(1024, 1);
    ResourceInformation volumeResource = ResourceInformation
        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
            ImmutableMap.of(
                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
                CsiConstants.CSI_DRIVER_NAME, "hostpath",
                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
            )
        );
    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
    SchedulingRequest sc = SchedulingRequest
        .newBuilder().allocationRequestId(0L)
        .resourceSizing(ResourceSizing.newInstance(1, resource))
        .build();
    AllocateRequest ar = AllocateRequest.newBuilder()
        .schedulingRequests(Arrays.asList(sc))
        .build();

    // inject adaptor client for testing
    CsiAdaptorProtocol mockedClient = mock(CsiAdaptorProtocol.class);
    rm.getRMContext().getVolumeManager()
        .registerCsiDriverAdaptor("hostpath", mockedClient);

    // simulate validation succeed
    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
        .when(mockedClient)
        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));

    am1.allocate(ar);
    VolumeStates volumeStates =
        rm.getRMContext().getVolumeManager().getVolumeStates();
    assertNotNull(volumeStates);
    VolumeState volumeState = VolumeState.NEW;
    while (volumeState != VolumeState.NODE_READY) {
      Volume volume = volumeStates
          .getVolume(new VolumeId("test-vol-000001"));
      if (volume != null) {
        volumeState = volume.getVolumeState();
      }
      am1.doHeartbeat();
      mockNMS[0].nodeHeartbeat(true);
      Thread.sleep(500);
    }
    rm.stop();
  }

  @Test
  @Timeout(30)
  public void testInvalidRequest() throws Exception {
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
    Resource resource = Resource.newInstance(1024, 1);
    ResourceInformation volumeResource = ResourceInformation
        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
            ImmutableMap.of(
                // volume ID is missing...
                CsiConstants.CSI_VOLUME_NAME, "test-vol-000001",
                CsiConstants.CSI_DRIVER_NAME, "hostpath",
                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
            )
        );
    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
    SchedulingRequest sc = SchedulingRequest
        .newBuilder().allocationRequestId(0L)
        .resourceSizing(ResourceSizing.newInstance(1, resource))
        .build();
    AllocateRequest ar = AllocateRequest.newBuilder()
        .schedulingRequests(Arrays.asList(sc))
        .build();

    try {
      am1.allocate(ar);
      fail("allocate should fail because invalid request received");
    } catch (Exception e) {
      assertTrue(e instanceof InvalidVolumeException);
    }
    rm.stop();
  }

  @Test
  @Timeout(30)
  public void testProvisioningFailures() throws Exception {
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);

    CsiAdaptorProtocol mockedClient = mock(CsiAdaptorProtocol.class);
    // inject adaptor client
    rm.getRMContext().getVolumeManager()
        .registerCsiDriverAdaptor("hostpath", mockedClient);
    doThrow(new VolumeException("failed"))
        .when(mockedClient)
        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));

    Resource resource = Resource.newInstance(1024, 1);
    ResourceInformation volumeResource = ResourceInformation
        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
            ImmutableMap.of(
                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
                CsiConstants.CSI_DRIVER_NAME, "hostpath",
                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
            )
        );
    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
    SchedulingRequest sc = SchedulingRequest
        .newBuilder().allocationRequestId(0L)
        .resourceSizing(ResourceSizing.newInstance(1, resource))
        .build();
    AllocateRequest ar = AllocateRequest.newBuilder()
        .schedulingRequests(Arrays.asList(sc))
        .build();

    try {
      am1.allocate(ar);
      fail("allocate should fail");
    } catch (Exception e) {
      assertTrue(e instanceof VolumeProvisioningException);
    }
    rm.stop();
  }

  @Test
  @Timeout(10)
  public void testVolumeResourceAllocate() throws Exception {
    RMApp app1 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .build());
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
    Resource resource = Resource.newInstance(1024, 1);
    ResourceInformation volumeResource = ResourceInformation
        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
            ImmutableMap.of(
                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
                CsiConstants.CSI_DRIVER_NAME, "hostpath",
                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
            )
        );
    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
    SchedulingRequest sc = SchedulingRequest
        .newBuilder().allocationRequestId(0L)
        .resourceSizing(ResourceSizing.newInstance(1, resource))
        .build();

    // inject adaptor client for testing
    CsiAdaptorProtocol mockedClient = mock(CsiAdaptorProtocol.class);
    rm.getRMContext().getVolumeManager()
        .registerCsiDriverAdaptor("hostpath", mockedClient);

    // simulate validation succeed
    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
        .when(mockedClient)
        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));

    am1.addSchedulingRequest(ImmutableList.of(sc));
    List<Container> allocated = new ArrayList<>();
    while (allocated.size() != 1) {
      AllocateResponse response = am1.schedule();
      mockNMS[0].nodeHeartbeat(true);
      allocated.addAll(response.getAllocatedContainers());
      Thread.sleep(500);
    }

    assertEquals(1, allocated.size());
    Container alloc = allocated.get(0);
    assertThat(alloc.getResource().getMemorySize()).isEqualTo(1024);
    assertThat(alloc.getResource().getVirtualCores()).isEqualTo(1);
    ResourceInformation allocatedVolume =
        alloc.getResource().getResourceInformation(VOLUME_RESOURCE_NAME);
    assertNotNull(allocatedVolume);
    assertThat(allocatedVolume.getValue()).isEqualTo(1024);
    assertThat(allocatedVolume.getUnits()).isEqualTo("Mi");
    rm.stop();
  }
}