TestFrameworkUploader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.uploader;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.util.Lists;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.zip.GZIPInputStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;

/**
 * Unit test class for FrameworkUploader.
 */
public class TestFrameworkUploader {
  private static String testDir;

  @BeforeEach
  public void setUp() {
    String testRootDir =
        new File(System.getProperty("test.build.data", "/tmp"))
            .getAbsolutePath()
            .replace(' ', '+');
    Random random = new Random(System.currentTimeMillis());
    testDir = testRootDir + File.separatorChar +
        Long.toString(random.nextLong());
  }

  /**
   * Test requesting command line help.
   * @throws IOException test failure
   */
  @Test
  void testHelp() throws IOException {
    String[] args = new String[]{"-help"};
    FrameworkUploader uploader = new FrameworkUploader();
    boolean success = uploader.parseArguments(args);
    assertFalse(success, "Expected to print help");
    assertThat(uploader.input)
        .withFailMessage("Expected ignore run")
        .isNull();
    assertThat(uploader.whitelist)
        .withFailMessage("Expected ignore run")
        .isNull();
    assertThat(uploader.target)
        .withFailMessage("Expected ignore run")
        .isNull();
  }

  /**
   * Test invalid argument parsing.
   * @throws IOException test failure
   */
  @Test
  void testWrongArgument() throws IOException {
    String[] args = new String[]{"-unexpected"};
    FrameworkUploader uploader = new FrameworkUploader();
    boolean success = uploader.parseArguments(args);
    assertFalse(success, "Expected to print help");
  }

  /**
   * Test normal argument passing.
   * @throws IOException test failure
   */
  @Test
  void testArguments() throws IOException {
    String[] args =
        new String[]{
            "-input", "A",
            "-whitelist", "B",
            "-blacklist", "C",
            "-fs", "hdfs://C:8020",
            "-target", "D",
            "-initialReplication", "100",
            "-acceptableReplication", "120",
            "-finalReplication", "140",
            "-timeout", "10"};
    FrameworkUploader uploader = new FrameworkUploader();
    boolean success = uploader.parseArguments(args);
    assertTrue(success, "Expected to print help");
    assertEquals("A",
        uploader.input,
        "Input mismatch");
    assertEquals("B",
        uploader.whitelist,
        "Whitelist mismatch");
    assertEquals("C",
        uploader.blacklist,
        "Blacklist mismatch");
    assertEquals("hdfs://C:8020/D",
        uploader.target,
        "Target mismatch");
    assertEquals(100,
        uploader.initialReplication,
        "Initial replication mismatch");
    assertEquals(120,
        uploader.acceptableReplication,
        "Acceptable replication mismatch");
    assertEquals(140,
        uploader.finalReplication,
        "Final replication mismatch");
    assertEquals(10,
        uploader.timeout,
        "Timeout mismatch");
  }

  /**
   * Test the default ways how to specify filesystems.
   */
  @Test
  void testNoFilesystem() throws IOException {
    FrameworkUploader uploader = new FrameworkUploader();
    boolean success = uploader.parseArguments(new String[]{});
    assertTrue(success, "Expected to parse arguments");
    assertEquals(
        "file:////usr/lib/mr-framework.tar.gz#mr-framework", uploader.target, "Expected");
  }

  /**
   * Test the default ways how to specify filesystems.
   */
  @Test
  void testDefaultFilesystem() throws IOException {
    FrameworkUploader uploader = new FrameworkUploader();
    Configuration conf = new Configuration();
    conf.set(FS_DEFAULT_NAME_KEY, "hdfs://namenode:555");
    uploader.setConf(conf);
    boolean success = uploader.parseArguments(new String[]{});
    assertTrue(success, "Expected to parse arguments");
    assertEquals(
        "hdfs://namenode:555/usr/lib/mr-framework.tar.gz#mr-framework",
        uploader.target,
        "Expected");
  }

  /**
   * Test the explicit filesystem specification.
   */
  @Test
  void testExplicitFilesystem() throws IOException {
    FrameworkUploader uploader = new FrameworkUploader();
    Configuration conf = new Configuration();
    uploader.setConf(conf);
    boolean success = uploader.parseArguments(new String[]{
        "-target",
        "hdfs://namenode:555/usr/lib/mr-framework.tar.gz#mr-framework"
    });
    assertTrue(success, "Expected to parse arguments");
    assertEquals(
        "hdfs://namenode:555/usr/lib/mr-framework.tar.gz#mr-framework",
        uploader.target,
        "Expected");
  }

  /**
   * Test the conflicting filesystem specification.
   */
  @Test
  void testConflictingFilesystem() throws IOException {
    FrameworkUploader uploader = new FrameworkUploader();
    Configuration conf = new Configuration();
    conf.set(FS_DEFAULT_NAME_KEY, "hdfs://namenode:555");
    uploader.setConf(conf);
    boolean success = uploader.parseArguments(new String[]{
        "-target",
        "file:///usr/lib/mr-framework.tar.gz#mr-framework"
    });
    assertTrue(success, "Expected to parse arguments");
    assertEquals(
        "file:///usr/lib/mr-framework.tar.gz#mr-framework",
        uploader.target,
        "Expected");
  }

  /**
   * Test whether we can filter a class path properly.
   * @throws IOException test failure
   */
  @Test
  void testCollectPackages() throws IOException, UploaderException {
    File parent = new File(testDir);
    try {
      parent.deleteOnExit();
      assertTrue(parent.mkdirs(), "Directory creation failed");
      File dirA = new File(parent, "A");
      assertTrue(dirA.mkdirs());
      File dirB = new File(parent, "B");
      assertTrue(dirB.mkdirs());
      File jarA = new File(dirA, "a.jar");
      assertTrue(jarA.createNewFile());
      File jarB = new File(dirA, "b.jar");
      assertTrue(jarB.createNewFile());
      File jarC = new File(dirA, "c.jar");
      assertTrue(jarC.createNewFile());
      File txtD = new File(dirA, "d.txt");
      assertTrue(txtD.createNewFile());
      File jarD = new File(dirB, "d.jar");
      assertTrue(jarD.createNewFile());
      File txtE = new File(dirB, "e.txt");
      assertTrue(txtE.createNewFile());

      FrameworkUploader uploader = new FrameworkUploader();
      uploader.whitelist = ".*a\\.jar,.*b\\.jar,.*d\\.jar";
      uploader.blacklist = ".*b\\.jar";
      uploader.input = dirA.getAbsolutePath() + File.separatorChar + "*" +
          File.pathSeparatorChar +
          dirB.getAbsolutePath() + File.separatorChar + "*";
      uploader.collectPackages();
      assertEquals(3,
          uploader.whitelistedFiles.size(),
          "Whitelist count error");
      assertEquals(1,
          uploader.blacklistedFiles.size(),
          "Blacklist count error");

      assertTrue(uploader.filteredInputFiles.contains(jarA.getAbsolutePath()),
          "File not collected");
      assertFalse(uploader.filteredInputFiles.contains(jarB.getAbsolutePath()),
          "File collected");
      assertTrue(uploader.filteredInputFiles.contains(jarD.getAbsolutePath()),
          "File not collected");
      assertEquals(2,
          uploader.filteredInputFiles.size(),
          "Too many whitelists");
    } finally {
      FileUtils.deleteDirectory(parent);
    }
  }

  /**
   * Test building a tarball from source jars.
   */
  @Test
  void testBuildTarBall()
      throws IOException, UploaderException, InterruptedException {
    String[] testFiles = {"upload.tar", "upload.tar.gz"};
    for (String testFile : testFiles) {
      File parent = new File(testDir);
      try {
        parent.deleteOnExit();
        FrameworkUploader uploader = prepareTree(parent);

        File gzipFile =
            new File(parent.getAbsolutePath() + "/" + testFile);
        gzipFile.deleteOnExit();

        uploader.target =
            "file:///" + gzipFile.getAbsolutePath();
        uploader.beginUpload();
        uploader.buildPackage();
        InputStream stream = new FileInputStream(gzipFile);
        if (gzipFile.getName().endsWith(".gz")) {
          stream = new GZIPInputStream(stream);
        }

        TarArchiveInputStream result = null;
        try {
          result =
              new TarArchiveInputStream(stream);
          Set<String> fileNames = new HashSet<>();
          Set<Long> sizes = new HashSet<>();
          TarArchiveEntry entry1 = result.getNextTarEntry();
          fileNames.add(entry1.getName());
          sizes.add(entry1.getSize());
          TarArchiveEntry entry2 = result.getNextTarEntry();
          fileNames.add(entry2.getName());
          sizes.add(entry2.getSize());
          assertTrue(
              fileNames.contains("a.jar"), "File name error");
          assertTrue(
              sizes.contains((long) 13), "File size error");
          assertTrue(
              fileNames.contains("b.jar"), "File name error");
          assertTrue(
              sizes.contains((long) 14), "File size error");
        } finally {
          if (result != null) {
            result.close();
          }
        }
      } finally {
        FileUtils.deleteDirectory(parent);
      }
    }
  }

  /**
   * Test upload to HDFS.
   */
  @Test
  void testUpload()
      throws IOException, UploaderException, InterruptedException {
    final String fileName = "/upload.tar.gz";
    File parent = new File(testDir);
    try {
      parent.deleteOnExit();

      FrameworkUploader uploader = prepareTree(parent);

      uploader.target = "file://" + parent.getAbsolutePath() + fileName;

      uploader.buildPackage();
      try (TarArchiveInputStream archiveInputStream = new TarArchiveInputStream(
          new GZIPInputStream(
              new FileInputStream(
                  parent.getAbsolutePath() + fileName)))) {
        Set<String> fileNames = new HashSet<>();
        Set<Long> sizes = new HashSet<>();
        TarArchiveEntry entry1 = archiveInputStream.getNextTarEntry();
        fileNames.add(entry1.getName());
        sizes.add(entry1.getSize());
        TarArchiveEntry entry2 = archiveInputStream.getNextTarEntry();
        fileNames.add(entry2.getName());
        sizes.add(entry2.getSize());
        assertTrue(
            fileNames.contains("a.jar"), "File name error");
        assertTrue(
            sizes.contains((long) 13), "File size error");
        assertTrue(
            fileNames.contains("b.jar"), "File name error");
        assertTrue(
            sizes.contains((long) 14), "File size error");
      }
    } finally {
      FileUtils.deleteDirectory(parent);
    }
  }

  /**
   * Prepare a mock directory tree to compress and upload.
   */
  private FrameworkUploader prepareTree(File parent)
      throws FileNotFoundException {
    assertTrue(parent.mkdirs());
    File dirA = new File(parent, "A");
    assertTrue(dirA.mkdirs());
    File jarA = new File(parent, "a.jar");
    PrintStream printStream = new PrintStream(new FileOutputStream(jarA));
    printStream.println("Hello World!");
    printStream.close();
    File jarB = new File(dirA, "b.jar");
    printStream = new PrintStream(new FileOutputStream(jarB));
    printStream.println("Hello Galaxy!");
    printStream.close();

    FrameworkUploader uploader = new FrameworkUploader();
    uploader.filteredInputFiles.add(jarA.getAbsolutePath());
    uploader.filteredInputFiles.add(jarB.getAbsolutePath());

    return uploader;
  }

  /**
   * Test regex pattern matching and environment variable replacement.
   */
  @Test
  void testEnvironmentReplacement() throws UploaderException {
    String input = "C/$A/B,$B,D";
    Map<String, String> map = new HashMap<>();
    map.put("A", "X");
    map.put("B", "Y");
    map.put("C", "Z");
    FrameworkUploader uploader = new FrameworkUploader();
    String output = uploader.expandEnvironmentVariables(input, map);
    assertEquals("C/X/B,Y,D", output, "Environment not expanded");

  }

  /**
   * Test regex pattern matching and environment variable replacement.
   */
  @Test
  void testRecursiveEnvironmentReplacement()
      throws UploaderException {
    String input = "C/$A/B,$B,D";
    Map<String, String> map = new HashMap<>();
    map.put("A", "X");
    map.put("B", "$C");
    map.put("C", "Y");
    FrameworkUploader uploader = new FrameworkUploader();
    String output = uploader.expandEnvironmentVariables(input, map);
    assertEquals("C/X/B,Y,D", output, "Environment not expanded");

  }

  /**
   * Test native IO.
   */
  @Test
  void testNativeIO() throws IOException {
    FrameworkUploader uploader = new FrameworkUploader();
    File parent = new File(testDir);
    try {
      // Create a parent directory
      parent.deleteOnExit();
      assertTrue(parent.mkdirs());

      // Create a target file
      File targetFile = new File(parent, "a.txt");
      try (FileOutputStream os = new FileOutputStream(targetFile)) {
        IOUtils.writeLines(Lists.newArrayList("a", "b"), null, os, StandardCharsets.UTF_8);
      }
      assertFalse(uploader.checkSymlink(targetFile));

      // Create a symlink to the target
      File symlinkToTarget = new File(parent, "symlinkToTarget.txt");
      try {
        Files.createSymbolicLink(
            Paths.get(symlinkToTarget.getAbsolutePath()),
            Paths.get(targetFile.getAbsolutePath()));
      } catch (UnsupportedOperationException e) {
        // Symlinks are not supported, so ignore the test
        Assumptions.assumeTrue(false);
      }
      assertTrue(uploader.checkSymlink(symlinkToTarget));

      // Create a symlink to the target with /./ in the path
      symlinkToTarget = new File(parent.getAbsolutePath() +
          "/./symlinkToTarget2.txt");
      try {
        Files.createSymbolicLink(
            Paths.get(symlinkToTarget.getAbsolutePath()),
            Paths.get(targetFile.getAbsolutePath()));
      } catch (UnsupportedOperationException e) {
        // Symlinks are not supported, so ignore the test
        Assumptions.assumeTrue(false);
      }
      assertTrue(uploader.checkSymlink(symlinkToTarget));

      // Create a symlink outside the current directory
      File symlinkOutside = new File(parent, "symlinkToParent.txt");
      try {
        Files.createSymbolicLink(
            Paths.get(symlinkOutside.getAbsolutePath()),
            Paths.get(parent.getAbsolutePath()));
      } catch (UnsupportedOperationException e) {
        // Symlinks are not supported, so ignore the test
        Assumptions.assumeTrue(false);
      }
      assertFalse(uploader.checkSymlink(symlinkOutside));
    } finally {
      FileUtils.forceDelete(parent);
    }

  }

  @Test
  void testPermissionSettingsOnRestrictiveUmask()
      throws Exception {
    File parent = new File(testDir);
    parent.deleteOnExit();
    MiniDFSCluster cluster = null;

    try {
      assertTrue(parent.mkdirs(), "Directory creation failed");
      Configuration hdfsConf = new HdfsConfiguration();
      String namenodeDir = new File(MiniDFSCluster.getBaseDirectory(),
          "name").getAbsolutePath();
      hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
      hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
      hdfsConf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "027");
      cluster = new MiniDFSCluster.Builder(hdfsConf)
          .numDataNodes(1).build();
      DistributedFileSystem dfs = cluster.getFileSystem();
      cluster.waitActive();

      File file1 = new File(parent, "a.jar");
      file1.createNewFile();
      File file2 = new File(parent, "b.jar");
      file2.createNewFile();
      File file3 = new File(parent, "c.jar");
      file3.createNewFile();

      FrameworkUploader uploader = new FrameworkUploader();
      uploader.whitelist = "";
      uploader.blacklist = "";
      uploader.input = parent.getAbsolutePath() + File.separatorChar + "*";
      String hdfsUri = hdfsConf.get(FS_DEFAULT_NAME_KEY);
      String targetPath = "/test.tar.gz";
      uploader.target = hdfsUri + targetPath;
      uploader.acceptableReplication = 1;
      uploader.setConf(hdfsConf);

      uploader.collectPackages();
      uploader.buildPackage();

      FileStatus fileStatus = dfs.getFileStatus(new Path(targetPath));
      FsPermission perm = fileStatus.getPermission();
      assertEquals(new FsPermission(0644), perm, "Permissions");
    } finally {
      if (cluster != null) {
        cluster.close();
      }
      FileUtils.deleteDirectory(parent);
    }
  }
}