ContainerVolumePublisher.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.OCIContainerRuntime;
import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Publish/un-publish CSI volumes on node manager.
 */
public class ContainerVolumePublisher {

  private static final Logger LOG =
      LoggerFactory.getLogger(ContainerVolumePublisher.class);

  private final Container container;
  private final String localMountRoot;
  private final OCIContainerRuntime runtime;

  public ContainerVolumePublisher(Container container, String localMountRoot,
      OCIContainerRuntime runtime) {
    LOG.info("Initiate container volume publisher, containerID={},"
            + " volume local mount rootDir={}",
        container.getContainerId().toString(), localMountRoot);
    this.container = container;
    this.localMountRoot = localMountRoot;
    this.runtime = runtime;
  }

  /**
   * It first discovers the volume info from container resource;
   * then negotiates with CSI driver adaptor to publish the volume on this
   * node manager, on a specific directory under container's work dir;
   * and then map the local mounted directory to volume target mount in
   * the docker container.
   *
   * CSI volume publish is a two phase work, by reaching up here
   * we can assume the 1st phase is done on the RM side, which means
   * YARN is already called the controller service of csi-driver
   * to publish the volume; here we only need to call the node service of
   * csi-driver to publish the volume on this local node manager.
   *
   * @return a map where each key is the local mounted path on current node,
   *   and value is the remote mount path on the container.
   * @throws YarnException
   * @throws IOException
   */
  public Map<String, String> publishVolumes() throws YarnException,
      IOException {
    LOG.info("publishing volumes");
    Map<String, String> volumeMounts = new HashMap<>();
    List<VolumeMetaData> volumes = getVolumes();
    LOG.info("Found {} volumes to be published on this node", volumes.size());
    for (VolumeMetaData volume : volumes) {
      Map<String, String> bindings = publishVolume(volume);
      if (bindings != null && !bindings.isEmpty()) {
        volumeMounts.putAll(bindings);
      }
    }
    return volumeMounts;
  }

  public void unpublishVolumes() throws YarnException, IOException {
    LOG.info("Un-publishing Volumes");
    List<VolumeMetaData> volumes = getVolumes();
    LOG.info("Volumes to un-publish {}", volumes.size());
    for (VolumeMetaData volume : volumes) {
      this.unpublishVolume(volume);
    }
  }

  private File getLocalVolumeMountPath(
      String containerWorkDir, String volumeId) {
    return new File(containerWorkDir, volumeId + "_mount");
  }

  private File getLocalVolumeStagingPath(
      String containerWorkDir, String volumeId) {
    return new File(containerWorkDir, volumeId + "_staging");
  }

  private List<VolumeMetaData> getVolumes() throws InvalidVolumeException {
    List<VolumeMetaData> volumes = new ArrayList<>();
    Resource containerResource = container.getResource();
    if (containerResource != null) {
      for (ResourceInformation resourceInformation :
          containerResource.getAllResourcesListCopy()) {
        if (resourceInformation.getTags()
            .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) {
          volumes.addAll(VolumeMetaData.fromResource(resourceInformation));
        }
      }
    }
    if (volumes.size() > 0) {
      LOG.info("Total number of volumes require provisioning is {}",
          volumes.size());
    }
    return volumes;
  }

  private Map<String, String> publishVolume(VolumeMetaData volume)
      throws IOException, YarnException {
    Map<String, String> bindVolumes = new HashMap<>();
    // compose a local mount for CSI volume with the container ID
    File localMount = getLocalVolumeMountPath(
        localMountRoot, volume.getVolumeId().toString());
    File localStaging = getLocalVolumeStagingPath(
        localMountRoot, volume.getVolumeId().toString());
    LOG.info("Volume {}, local mount path: {}, local staging path {}",
        volume.getVolumeId().toString(), localMount, localStaging);

    NodePublishVolumeRequest publishRequest = NodePublishVolumeRequest
        .newInstance(volume.getVolumeId().getId(), // volume Id
            false, // read only flag
            localMount.getAbsolutePath(), // target path
            localStaging.getAbsolutePath(), // staging path
            new ValidateVolumeCapabilitiesRequest.VolumeCapability(
                ValidateVolumeCapabilitiesRequest
                    .AccessMode.SINGLE_NODE_WRITER,
                ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM,
                ImmutableList.of()), // capability
            ImmutableMap.of(), // publish context
            ImmutableMap.of());  // secrets

    // make sure the volume is a known type
    if (runtime.getCsiClients().get(volume.getDriverName()) == null) {
      throw new YarnException("No csi-adaptor is found that can talk"
          + " to csi-driver " + volume.getDriverName());
    }

    // publish volume to node
    LOG.info("Publish volume on NM, request {}",
        publishRequest.toString());
    runtime.getCsiClients().get(volume.getDriverName())
        .nodePublishVolume(publishRequest);
    // once succeed, bind the container to this mount
    String containerMountPath = volume.getMountPoint();
    bindVolumes.put(localMount.getAbsolutePath(), containerMountPath);
    return bindVolumes;
  }

  private void unpublishVolume(VolumeMetaData volume)
      throws YarnException, IOException {
    CsiAdaptorProtocol csiClient =
        runtime.getCsiClients().get(volume.getDriverName());
    if (csiClient == null) {
      throw new YarnException(
          "No csi-adaptor is found that can talk"
              + " to csi-driver " + volume.getDriverName());
    }

    // When container is launched, the container work dir is memorized,
    // and that is also the dir we mount the volume to.
    File localMount = getLocalVolumeMountPath(container.getCsiVolumesRootDir(),
        volume.getVolumeId().toString());
    if (!localMount.exists()) {
      LOG.info("Local mount {} no longer exist, skipping cleaning"
          + " up the volume", localMount.getAbsolutePath());
      return;
    }
    NodeUnpublishVolumeRequest unpublishRequest =
        NodeUnpublishVolumeRequest.newInstance(
            volume.getVolumeId().getId(), // volume id
            localMount.getAbsolutePath());  // target path

    // un-publish volume from node
    LOG.info("Un-publish volume {}, request {}",
        volume.getVolumeId().toString(), unpublishRequest.toString());
    csiClient.nodeUnpublishVolume(unpublishRequest);
  }
}