AzcopyToolHelper.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.utils;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;

/**
 * Singleton class to create a file or folder in Azure Blob Storage using Azcopy tool.
 * <a href="https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10">
 * Azcopy</a> is a command-line utility tool to copy blobs or files to or from a storage account.
 * It uses Blob Endpoint and ends up creating implicit paths in the storage account.
 * We will leverage this tool to create implicit paths in storage account for testing purposes.
 */
public final class AzcopyToolHelper {

  private File hadoopAzureDir;
  private String azcopyDirPath;
  private String azcopyExecutablePath;
  private String fileCreationScriptPath;
  private String folderCreationScriptPath;
  private String fileCreationScriptContent;
  private String folderCreationScriptContent;
  private String sasToken;
  private boolean initialized = false;

  private static final String USER_DIR_SYSTEM_PROPERTY = "user.dir";
  private static final String HADOOP_AZURE_DIR = "hadoop-azure";
  private static final String AZCOPY_DIR_NAME = "azcopy";
  private static final String AZCOPY_EXECUTABLE_NAME = "azcopy";
  private static final String FILE_CREATION_SCRIPT_NAME = "createAzcopyFile.sh";
  private static final String FOLDER_CREATION_SCRIPT_NAME = "createAzcopyFolder.sh";
  private static final String DIR_NOT_FOUND_ERROR = " directory not found";
  private static final String AZCOPY_DOWNLOADED_DIR_NAME = "/azcopy_linux_amd64_*/* ";
  private static final String AZCOPY_DOWNLOADED_TAR_NAME = "/azcopy_linux_amd64_* azcopy.tar.gz";
  private static final String SCRIPT_CREATION_ERROR = "Unable to create azcopy file/folder creation script. ";
  private static final String SCRIPT_RUN_ERROR = "Unable to run azcopy file/folder creation script. ";
  private static final String SCRIPT_NOT_FOUND_ERROR = "Azcopy file/folder creation script not found and should be regenerated. ";
  private static final String SCRIPT_EXECUTION_FAILED = "Azcopy file/folder creation script failed with non-zero exit code. "
      + "This can be due to corrupt azcopy executable or invalid SAS Token. Exit code: ";

  private static final String AZCOPY_CMD_SHELL = "bash";
  private static final String AZCOPY_CMD_OPTION = "-c";
  private static final String AZCOPY_DOWNLOAD_URL = "https://aka.ms/downloadazcopy-v10-linux";
  private static final String AZCOPY_DOWNLOAD_CMD = "wget " + AZCOPY_DOWNLOAD_URL + " -O azcopy.tar.gz" + " --no-check-certificate";
  private static final String EXTRACT_CMD = "tar -xf azcopy.tar.gz -C ";
  private static final String MOVE_CMD = "mv ";
  private static final String REMOVE_CMD = "rm -rf ";
  private static final String CHMOD_CMD = "chmod +x ";
  private static final char QUESTION_MARK = '?';
  private static final int WAIT_TIME = 10_000; // 10 seconds
  private static final int MAX_WAIT_TIME = 2 * 6 * WAIT_TIME; // 2 minutes
  private static final Logger LOG = LoggerFactory.getLogger(AzcopyToolHelper.class);

  private static AzcopyToolHelper azcopyToolHelper; // singleton, initialized in static initialization block
  private static final ReentrantLock LOCK = new ReentrantLock();

  private AzcopyToolHelper() {

  }

  /**
   * Constructor to initialize the AzcopyToolHelper. Each JVM running will have
   * its own instance but will share the tool and scripts.
   * Azcopy tool work with SAS based authentication. SAS can be configured using
   * test configuration "fs.azure.test.fixed.sas.token".
   * @param sasToken to be used for authentication.
   */
  public static AzcopyToolHelper getInstance(String sasToken)
      throws IOException, InterruptedException {
    if (azcopyToolHelper == null) {
      LOCK.lock();
      try {
        if (azcopyToolHelper == null) {
          azcopyToolHelper = new AzcopyToolHelper();
          azcopyToolHelper.init(sasToken);
        }
      } finally {
        LOCK.unlock();
      }
    }
    return azcopyToolHelper;
  }

  /**
   * Create a file with implicit parent in the container using Azcopy tool.
   * @param absolutePathToBeCreated absolute path to be created.
   * @throws Exception if file creation fails.
   */
  public void createFileUsingAzcopy(String absolutePathToBeCreated) throws Exception {
    if (absolutePathToBeCreated != null) {
      absolutePathToBeCreated = absolutePathToBeCreated.replace(
          ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME) + sasToken;
      runShellScript(fileCreationScriptPath, absolutePathToBeCreated);
    }
  }

  /**
   * Create a implicit folder with implicit parent in the container using Azcopy tool.
   * @param absolutePathToBeCreated absolute path to be created.
   * @throws Exception
   */
  public void createFolderUsingAzcopy(String absolutePathToBeCreated) throws Exception {
    if (absolutePathToBeCreated != null) {
      absolutePathToBeCreated = absolutePathToBeCreated.replace(
          ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME) + sasToken;
      runShellScript(folderCreationScriptPath, absolutePathToBeCreated);
    }
  }

  private void init(String sasToken) throws IOException, InterruptedException {
    if (initialized) {
      return;
    }
    this.sasToken = sasToken.charAt(0) == QUESTION_MARK ? sasToken : QUESTION_MARK + sasToken;
    hadoopAzureDir = findHadoopAzureDir();
    azcopyDirPath = hadoopAzureDir.getAbsolutePath() + FORWARD_SLASH + AZCOPY_DIR_NAME;
    azcopyExecutablePath = azcopyDirPath + FORWARD_SLASH + AZCOPY_EXECUTABLE_NAME;
    fileCreationScriptPath = azcopyDirPath + FORWARD_SLASH + FILE_CREATION_SCRIPT_NAME;
    folderCreationScriptPath = azcopyDirPath + FORWARD_SLASH + FOLDER_CREATION_SCRIPT_NAME;
    fileCreationScriptContent = "blobPath=$1\n" + "echo $blobPath\n"
        + azcopyExecutablePath + " copy \"" + azcopyDirPath
        + "/NOTICE.txt\" $blobPath\n";
    folderCreationScriptContent = "blobPath=$1\n" + "echo $blobPath\n"
        + azcopyExecutablePath + " copy \"" + azcopyDirPath
        + "\" $blobPath --recursive\n";

    /*
     * Synchronized across JVMs on directory creation. If multiple process try
     * to create directory, only one will succeed and that process will download
     * azcopy tool if not present and generate scripts if not present.
     */
    downloadAzcopyToolAndGenerateScripts();

    // Change working directory to the hadoop-azure directory.
    System.setProperty(USER_DIR_SYSTEM_PROPERTY, hadoopAzureDir.getAbsolutePath());

    initialized = true;
  }

  private void downloadAzcopyToolAndGenerateScripts()
      throws IOException, InterruptedException {
    File azcopyDir = new File(azcopyDirPath);
    if (!azcopyDir.exists()) {
      if (!azcopyDir.mkdir()) {
        LOG.info("Azcopy Directory not created by process: {}",
            Thread.currentThread().getName());
        return;
      }
      downloadAzcopyExecutable();
      createShellScript(fileCreationScriptPath, fileCreationScriptContent);
      createShellScript(folderCreationScriptPath, folderCreationScriptContent);
    } else {
      LOG.info("Azcopy directory already exists. Skipping download and script "
          + "generation by the process: {}", Thread.currentThread().getName());
    }
  }

  private void downloadAzcopyExecutable()
      throws IOException, InterruptedException {
    // Check if azcopy tool is present in the azcopy directory.
    File azcopyFile = new File(azcopyExecutablePath);
    if (!azcopyFile.exists()) {
      // Download Azcopy tool from the Azure website.
      executeCommand(AZCOPY_DOWNLOAD_CMD);

      // Extract the azcopy executable from the tarball
      String extractCmd = EXTRACT_CMD + hadoopAzureDir.getAbsolutePath();
      executeCommand(extractCmd);

      // Rename the azcopy_linux_amd64_* directory to 'azcopy'
      String renameCmd = MOVE_CMD + hadoopAzureDir.getAbsolutePath()
          + AZCOPY_DOWNLOADED_DIR_NAME + azcopyDirPath;
      executeCommand(renameCmd);

      // Remove the downloaded tarball and azcopy folder
      String cleanupCmd = REMOVE_CMD + hadoopAzureDir.getAbsolutePath()
          + AZCOPY_DOWNLOADED_TAR_NAME;
      executeCommand(cleanupCmd);

      // Set the execute permission on the azcopy executable
      String chmodCmd = CHMOD_CMD + azcopyDirPath;
      executeCommand(chmodCmd);
    } else {
      LOG.info("Azcopy executable already exists. Skipping download by process: {}",
          Thread.currentThread().getName());
    }
  }

  private void executeCommand(String command) throws IOException, InterruptedException {
    String[] commandArray = {AZCOPY_CMD_SHELL, AZCOPY_CMD_OPTION, command};
    Process process = Runtime.getRuntime().exec(commandArray);
    process.waitFor();
  }

  /**
   * Create a shell script if not already created.
   * @param scriptPath to be created
   * @param scriptContent to be written in the script.
   */
  private void createShellScript(String scriptPath, String scriptContent) throws IOException {
    File scriptFile = new File(scriptPath);
    if (!scriptFile.exists()) {
      try {
        FileWriter writer = new FileWriter(scriptFile);
        writer.write(scriptContent);
        writer.close();
        scriptFile.setExecutable(true); // make the script executable
      } catch (IOException e) {
        LOG.error("Error creating shell script: {} by process {}",
            e.getMessage(), Thread.currentThread().getName());
        throw new AzcopyExecutionException(SCRIPT_CREATION_ERROR, azcopyDirPath, e);
      }
    }
  }

  private void runShellScript(String scriptPath, String argument) throws IOException {
    // Check if script exists, otherwise wait for parallel JVM process to create the script.
    checkAndWaitOnScriptCreation(scriptPath);
    try {
      ProcessBuilder pb = new ProcessBuilder(scriptPath, argument);
      Process p = pb.start();
      // wait for the process to finish
      int exitCode = p.waitFor();
      if (exitCode != 0) {
        throw new AzcopyExecutionException(SCRIPT_EXECUTION_FAILED + exitCode
            + DOT + SINGLE_WHITE_SPACE, azcopyDirPath);
      }
    } catch (AzcopyExecutionException e) {
      throw e;
    } catch (Exception e) {
      throw new AzcopyExecutionException(SCRIPT_RUN_ERROR, azcopyDirPath, e);
    }
  }

  private void checkAndWaitOnScriptCreation(String scriptPath) throws IOException {
    File scriptFile = new File(scriptPath);
    int totalWaitTime = 0;
    while (!(scriptFile.exists() && scriptFile.canExecute())) {
      try {
        Thread.sleep(WAIT_TIME);
        totalWaitTime += WAIT_TIME;
        if (totalWaitTime > MAX_WAIT_TIME) {
          LOG.error("Timeout waiting for script creation: {} by process {}",
              scriptPath, Thread.currentThread().getName());
          throw new AzcopyExecutionException(SCRIPT_NOT_FOUND_ERROR, azcopyDirPath);
        }
      } catch (InterruptedException e) {
        LOG.error("Error waiting for script creation: {} by process {}",
            scriptPath, Thread.currentThread().getName());
        throw new AzcopyExecutionException(SCRIPT_NOT_FOUND_ERROR, azcopyDirPath);
      }
    }
  }

  private File findHadoopAzureDir() throws FileNotFoundException {
    // Find the hadoop-azure directory from the current working directory.
    File hadoopAzureDir;
    File currentDir = new File(System.getProperty(USER_DIR_SYSTEM_PROPERTY));
    if (!currentDir.isDirectory() && !currentDir.getName().equals(HADOOP_AZURE_DIR)) {
      hadoopAzureDir = findHadoopAzureDir(currentDir);
      if (hadoopAzureDir == null) {
        throw new FileNotFoundException(HADOOP_AZURE_DIR + DIR_NOT_FOUND_ERROR);
      }
    } else {
      hadoopAzureDir = currentDir;
    }
    return hadoopAzureDir;
  }

  private File findHadoopAzureDir(File dir) {
    if (dir == null) {
      return null;
    }
    File[] files = dir.listFiles();
    if (files == null) {
      return null;
    }
    for (File file : files) {
      if (file.isDirectory() && file.getName().equals(HADOOP_AZURE_DIR)) {
        return file;
      } else {
        File hadoopAzureDir = findHadoopAzureDir(file);
        if (hadoopAzureDir != null) {
          return hadoopAzureDir;
        }
      }
    }
    return null;
  }
}