TestClientDistributedCacheManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.filecache;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestClientDistributedCacheManager {
  private static final Logger LOG =
      LoggerFactory.getLogger(TestClientDistributedCacheManager.class);
  
  private static final Path TEST_ROOT_DIR = new Path(
      System.getProperty("test.build.data",
          System.getProperty("java.io.tmpdir")),
      TestClientDistributedCacheManager.class.getSimpleName());

  private static final Path TEST_VISIBILITY_PARENT_DIR =
      new Path(TEST_ROOT_DIR, "TestCacheVisibility_Parent");

  private static final Path TEST_VISIBILITY_CHILD_DIR =
      new Path(TEST_VISIBILITY_PARENT_DIR, "TestCacheVisibility_Child");

  private static final String FIRST_CACHE_FILE = "firstcachefile";
  private static final String SECOND_CACHE_FILE = "secondcachefile";

  private FileSystem fs;
  private Path firstCacheFile;
  private Path secondCacheFile;
  private Configuration conf;
  
  @BeforeEach
  public void setup() throws IOException {
    conf = new Configuration();
    fs = FileSystem.get(conf);
    firstCacheFile = new Path(TEST_VISIBILITY_PARENT_DIR, FIRST_CACHE_FILE);
    secondCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR, SECOND_CACHE_FILE);
    createTempFile(firstCacheFile, conf);
    createTempFile(secondCacheFile, conf);
  }
  
  @AfterEach
  public void tearDown() throws IOException {
    if (fs.delete(TEST_ROOT_DIR, true)) {
      LOG.warn("Failed to delete test root dir and its content under "
          + TEST_ROOT_DIR);
    }
  }
  
  @Test
  public void testDetermineTimestamps() throws IOException {
    Job job = Job.getInstance(conf);
    job.addCacheFile(firstCacheFile.toUri());
    job.addCacheFile(secondCacheFile.toUri());
    Configuration jobConf = job.getConfiguration();
    
    Map<URI, FileStatus> statCache = new HashMap<>();
    ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);
    
    FileStatus firstStatus = statCache.get(firstCacheFile.toUri());
    FileStatus secondStatus = statCache.get(secondCacheFile.toUri());
    
    assertNotNull(firstStatus, firstCacheFile + " was not found in the stats cache");
    assertNotNull(secondStatus, secondCacheFile + " was not found in the stats cache");
    assertEquals(2, statCache.size(),
        "Missing/extra entries found in the stats cache");
    String expected = firstStatus.getModificationTime() + ","
        + secondStatus.getModificationTime();
    assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS));

    job = Job.getInstance(conf);
    job.addCacheFile(new Path(TEST_VISIBILITY_CHILD_DIR, "*").toUri());
    jobConf = job.getConfiguration();
    statCache.clear();
    ClientDistributedCacheManager.determineTimestamps(jobConf, statCache);

    FileStatus thirdStatus = statCache.get(TEST_VISIBILITY_CHILD_DIR.toUri());

    assertEquals(1, statCache.size(),
        "Missing/extra entries found in the stats cache");
    assertNotNull(thirdStatus, TEST_VISIBILITY_CHILD_DIR
        + " was not found in the stats cache");
    expected = Long.toString(thirdStatus.getModificationTime());
    assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS),
        "Incorrect timestamp for " + TEST_VISIBILITY_CHILD_DIR);
  }
  
  @Test
  public void testDetermineCacheVisibilities() throws IOException {
    fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
        new FsPermission((short)00777));
    fs.setPermission(TEST_VISIBILITY_CHILD_DIR,
        new FsPermission((short)00777));
    fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR);
    Job job = Job.getInstance(conf);
    Path relativePath = new Path(SECOND_CACHE_FILE);
    Path wildcardPath = new Path("*");
    Map<URI, FileStatus> statCache = new HashMap<>();
    Configuration jobConf;

    job.addCacheFile(firstCacheFile.toUri());
    job.addCacheFile(relativePath.toUri());
    jobConf = job.getConfiguration();

    // skip test if scratch dir is not PUBLIC
    assumeTrue(ClientDistributedCacheManager.isPublic(
        jobConf, TEST_VISIBILITY_PARENT_DIR.toUri(), statCache),
        TEST_VISIBILITY_PARENT_DIR + " is not public");

    ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
        statCache);
    // We use get() instead of getBoolean() so we can tell the difference
    // between wrong and missing
    assertEquals("true,true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES),
        "The file paths were not found to be publicly visible " +
        "even though the full path is publicly accessible");
    checkCacheEntries(statCache, null, firstCacheFile, relativePath);

    job = Job.getInstance(conf);
    job.addCacheFile(wildcardPath.toUri());
    jobConf = job.getConfiguration();
    statCache.clear();

    ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
        statCache);
    // We use get() instead of getBoolean() so we can tell the difference
    // between wrong and missing
    assertEquals("true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES),
        "The file path was not found to be publicly visible " +
        "even though the full path is publicly accessible");
    checkCacheEntries(statCache, null, wildcardPath.getParent());

    Path qualifiedParent = fs.makeQualified(TEST_VISIBILITY_PARENT_DIR);
    fs.setPermission(TEST_VISIBILITY_PARENT_DIR,
        new FsPermission((short)00700));
    job = Job.getInstance(conf);
    job.addCacheFile(firstCacheFile.toUri());
    job.addCacheFile(relativePath.toUri());
    jobConf = job.getConfiguration();
    statCache.clear();

    ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
        statCache);
    // We use get() instead of getBoolean() so we can tell the difference
    // between wrong and missing
    assertEquals("false,false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES),
        "The file paths were found to be publicly visible " +
        "even though the parent directory is not publicly accessible");
    checkCacheEntries(statCache, qualifiedParent,
        firstCacheFile, relativePath);

    job = Job.getInstance(conf);
    job.addCacheFile(wildcardPath.toUri());
    jobConf = job.getConfiguration();
    statCache.clear();

    ClientDistributedCacheManager.determineCacheVisibilities(jobConf,
        statCache);
    // We use get() instead of getBoolean() so we can tell the difference
    // between wrong and missing
    assertEquals("false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES),
        "The file path was found to be publicly visible " +
        "even though the parent directory is not publicly accessible");
    checkCacheEntries(statCache, qualifiedParent, wildcardPath.getParent());
  }

  /**
   * Validate that the file status cache contains all and only entries for a
   * given set of paths up to a common parent.
   *
   * @param statCache the cache
   * @param top the common parent at which to stop digging
   * @param paths the paths to compare against the cache
   */
  private void checkCacheEntries(Map<URI, FileStatus> statCache, Path top,
      Path... paths) {
    Set<URI> expected = new HashSet<>();

    for (Path path : paths) {
      Path p = fs.makeQualified(path);

      while (!p.isRoot() && !p.equals(top)) {
        expected.add(p.toUri());
        p = p.getParent();
      }

      expected.add(p.toUri());
    }

    Set<URI> uris = statCache.keySet();
    Set<URI> missing = new HashSet<>(uris);
    Set<URI> extra = new HashSet<>(expected);

    missing.removeAll(expected);
    extra.removeAll(uris);

    assertTrue(missing.isEmpty(),
        "File status cache does not contain an entries for " + missing);
    assertTrue(extra.isEmpty(),
        "File status cache contains extra entries: " + extra);
  }

  @SuppressWarnings("deprecation")
  void createTempFile(Path p, Configuration conf) throws IOException {
    SequenceFile.Writer writer = null;
    try {
      writer = SequenceFile.createWriter(fs, conf, p,
                                         Text.class, Text.class,
                                         CompressionType.NONE);
      writer.append(new Text("text"), new Text("moretext"));
    } catch(Exception e) {
      throw new IOException(e.getLocalizedMessage());
    } finally {
      if (writer != null) {
        writer.close();
      }
      writer = null;
    }
    LOG.info("created: " + p);
  }
}