AbstractContractDistCpTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.tools.contract;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.TestName;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.SimpleCopyListing;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.functional.RemoteIterators;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Contract test suite covering a file system's integration with DistCp.  The
 * tests coordinate two file system instances: one "local", which is the local
 * file system, and the other "remote", which is the file system implementation
 * under test.  The tests in the suite cover both copying from local to remote
 * (e.g. a backup use case) and copying from remote to local (e.g. a restore use
 * case).
 * The HDFS contract test needs to be run explicitly.
 */
public abstract class AbstractContractDistCpTest
    extends AbstractFSContractTestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractContractDistCpTest.class);

  /** Using offset to change modification time in tests. */
  private static final long MODIFICATION_TIME_OFFSET = 10000;

  public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
      = "scale.test.distcp.file.size.kb";

  public static final int DEFAULT_DISTCP_SIZE_KB = 1024;

  protected static final int MB = 1024 * 1024;

  /**
   * Default depth for a directory tree: {@value}.
   */
  protected static final int DEFAULT_DEPTH = 3;

  /**
   * Default width for a directory tree: {@value}.
   * Total dir size is
   * <pre>
   *   DEFAULT_WITH^DEFAULT_DEPTH
   * </pre>
   * So the duration of a test run grows rapidly with this value.
   * This has very significant consequences for object storage runs.
   */
  protected static final int DEFAULT_WIDTH = 2;

  @RegisterExtension
  public TestName testName = new TestName();

  /**
   * The timeout value is extended over the default so that large updates
   * are allowed to take time, especially to remote stores.
   * @return the current test timeout
   */
  protected int getTestTimeoutMillis() {
    return 15  * 60 * 1000;
  }

  private Configuration conf;
  private FileSystem localFS, remoteFS;
  private Path localDir, remoteDir;

  private Path inputDir;

  private Path inputSubDir1;

  private Path inputSubDir2;

  private Path inputSubDir4;

  private Path inputFile1;

  private Path inputFile2;

  private Path inputFile3;

  private Path inputFile4;

  private Path inputFile5;

  private Path outputDir;

  private Path outputSubDir1;

  private Path outputSubDir2;

  private Path outputSubDir4;

  private Path outputFile1;

  private Path outputFile2;

  private Path outputFile3;

  private Path outputFile4;

  private Path outputFile5;

  private Path inputDirUnderOutputDir;

  @Override
  protected Configuration createConfiguration() {
    Configuration newConf = new Configuration();
    newConf.set("mapred.job.tracker", "local");
    return newConf;
  }

  @BeforeEach
  @Override
  public void setup() throws Exception {
    super.setup();
    conf = getContract().getConf();
    localFS = FileSystem.getLocal(conf);
    remoteFS = getFileSystem();
    // Test paths are isolated by concrete subclass name and test method name.
    // All paths are fully qualified including scheme (not taking advantage of
    // default file system), so if something fails, the messages will make it
    // clear which paths are local and which paths are remote.
    String className = getClass().getSimpleName();
    String testSubDir = className + "/" + testName.getMethodName();
    localDir =
        localFS.makeQualified(new Path(new Path(
        GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
    localFS.delete(localDir, true);
    mkdirs(localFS, localDir);
    Path testSubPath = path(testSubDir);
    remoteDir = new Path(testSubPath, "remote");
    // test teardown does this, but IDE-based test debugging can skip
    // that teardown; this guarantees the initial state is clean
    remoteFS.delete(remoteDir, true);
  }

  @Override
  public void teardown() throws Exception {
    // if remote FS supports IOStatistics log it.
    logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, getRemoteFS());
    super.teardown();
  }

  /**
   * Set up both input and output fields.
   * @param src source tree
   * @param dest dest tree
   */
  protected void initPathFields(final Path src, final Path dest) {
    initInputFields(src);
    initOutputFields(dest);
  }

  /**
   * Output field setup.
   * @param path path to set up
   */
  protected void initOutputFields(final Path path) {
    outputDir = new Path(path, "outputDir");
    inputDirUnderOutputDir = new Path(outputDir, "inputDir");
    outputFile1 = new Path(inputDirUnderOutputDir, "file1");
    outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1");
    outputFile2 = new Path(outputSubDir1, "file2");
    outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2");
    outputFile3 = new Path(outputSubDir2, "file3");
    outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4");
    outputFile4 = new Path(outputSubDir4, "file4");
    outputFile5 = new Path(outputSubDir4, "file5");
  }

  /**
   * this path setup is used across different methods (copy, update, track)
   * so they are set up as fields.
   * @param srcDir source directory for these to go under.
   */
  protected void initInputFields(final Path srcDir) {
    inputDir = new Path(srcDir, "inputDir");
    inputFile1 = new Path(inputDir, "file1");
    inputSubDir1 = new Path(inputDir, "subDir1");
    inputFile2 = new Path(inputSubDir1, "file2");
    inputSubDir2 = new Path(inputDir, "subDir2/subDir2");
    inputFile3 = new Path(inputSubDir2, "file3");
    inputSubDir4 = new Path(inputDir, "subDir4/subDir4");
    inputFile4 = new Path(inputSubDir4, "file4");
    inputFile5 = new Path(inputSubDir4, "file5");
  }

  protected FileSystem getLocalFS() {
    return localFS;
  }

  protected FileSystem getRemoteFS() {
    return remoteFS;
  }

  protected Path getLocalDir() {
    return localDir;
  }

  protected Path getRemoteDir() {
    return remoteDir;
  }

  @Test
  public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
    describe("update a deep directory structure from local to remote");
    distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
    distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
  }

  @Test
  public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
    describe("update an unchanged directory structure"
        + " from local to remote; expect no copy");
    Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
        remoteDir);
    describe("\nExecuting Update\n");
    Job job = distCpUpdate(localDir, target);
    assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
    assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0);
  }

  /**
   * Assert that a counter is in a range; min and max values are inclusive.
   * @param job job to query
   * @param counter counter to examine
   * @param min min value, if negative "no minimum"
   * @param max max value, if negative "no maximum"
   * @throws IOException IO problem
   */
  void assertCounterInRange(Job job, Enum<?> counter, long min, long max)
      throws IOException {
    Counter c = job.getCounters().findCounter(counter);
    long value = c.getValue();
    String description =
        String.format("%s value %s", c.getDisplayName(), value, false);

    if (min >= 0) {
      assertTrue(value >= min, description + " too below minimum " + min);
    }
    if (max >= 0) {
      assertTrue(value <= max, description + " above maximum " + max);
    }
  }

  /**
   * Do a distcp from the local source to the destination filesystem.
   * This is executed as part of
   * {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be
   * overidden or wrapped by subclasses which wish to add more assertions.
   *
   * Life is complicated here by the way that the src/dest paths
   * on a distcp is different with -update.
   * @param destDir output directory used by the initial distcp
   * @return the distcp job
   */
  protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
      throws Exception {
    describe("Now do an incremental update with deletion of missing files");
    Path srcDir = inputDir;
    LOG.info("Source directory = {}, dest={}", srcDir, destDir);

    ContractTestUtils.assertPathsExist(localFS,
        "Paths for test are wrong",
        inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);

    modifySourceDirectories();

    Job job = distCpUpdate(srcDir, destDir);

    Path outputFileNew1 = new Path(outputSubDir2, "newfile1");

    lsR("Updated Remote", remoteFS, destDir);

    ContractTestUtils.assertPathDoesNotExist(remoteFS,
        " deleted from " + inputFile1, outputFile1);
    ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
    ContractTestUtils.assertPathsDoNotExist(remoteFS,
        "DistCP should have deleted",
        outputFile3, outputFile4, outputSubDir4);
    assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1);
    assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
    return job;
  }

  /**
   * Run distcp -update srcDir destDir.
   * @param srcDir local source directory
   * @param destDir remote destination directory.
   * @return the completed job
   * @throws Exception any failure.
   */
  private Job distCpUpdate(final Path srcDir, final Path destDir)
      throws Exception {
    describe("\nDistcp -update from " + srcDir + " to " + destDir);
    lsR("Local to update", localFS, srcDir);
    lsR("Remote before update", remoteFS, destDir);
    return runDistCp(buildWithStandardOptions(
        new DistCpOptions.Builder(
            Collections.singletonList(srcDir), destDir)
            .withDeleteMissing(true)
            .withSyncFolder(true)
            .withSkipCRC(true)
            .withDirectWrite(shouldUseDirectWrite())
            .withOverwrite(false)));
  }

  /**
   * Run distcp -update srcDir destDir.
   * @param srcDir local source directory
   * @param destDir remote destination directory.
   * @return the completed job
   * @throws Exception any failure.
   */
  private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
      FileSystem sourceFs, FileSystem targetFs)
      throws Exception {
    describe("\nDistcp -update from " + srcDir + " to " + destDir);
    lsR("Source Fs to update", sourceFs, srcDir);
    lsR("Target Fs before update", targetFs, destDir);
    return runDistCp(buildWithStandardOptions(
        new DistCpOptions.Builder(
            Collections.singletonList(srcDir), destDir)
            .withDeleteMissing(true)
            .withSyncFolder(true)
            .withSkipCRC(false)
            .withDirectWrite(shouldUseDirectWrite())
            .withOverwrite(false)));
  }

  /**
   * Update the source directories as various tests expect,
   * including adding a new file.
   * @return the path to the newly created file
   * @throws IOException IO failure
   */
  private Path modifySourceDirectories() throws IOException {
    localFS.delete(inputFile1, false);
    localFS.delete(inputFile3, false);
    // delete all of subdir4, so input/output file 4 & 5 will go
    localFS.delete(inputSubDir4, true);
    // add one new file
    Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
    ContractTestUtils.touch(localFS, inputFileNew1);
    return inputFileNew1;
  }


  @Test
  public void testTrackDeepDirectoryStructureToRemote() throws Exception {
    describe("copy a deep directory structure from local to remote");

    Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
        remoteDir);
    ContractTestUtils.assertIsDirectory(remoteFS, destDir);

    describe("Now do an incremental update and save of missing files");
    Path srcDir = inputDir;
    // same path setup as in deepDirectoryStructure()
    Path trackDir = new Path(localDir, "trackDir");


    describe("\nDirectories\n");
    lsR("Local to update", localFS, srcDir);
    lsR("Remote before update", remoteFS, destDir);


    ContractTestUtils.assertPathsExist(localFS,
        "Paths for test are wrong",
        inputFile2, inputFile3, inputFile4, inputFile5);

    Path inputFileNew1 = modifySourceDirectories();

    // Distcp set to track but not delete
    runDistCp(buildWithStandardOptions(
        new DistCpOptions.Builder(
            Collections.singletonList(srcDir),
            inputDirUnderOutputDir)
            .withTrackMissing(trackDir)
            .withSyncFolder(true)
            .withDirectWrite(shouldUseDirectWrite())
            .withOverwrite(false)));

    lsR("tracked udpate", remoteFS, destDir);
    // new file went over
    Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
    ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);

    ContractTestUtils.assertPathExists(localFS, "tracking directory",
        trackDir);

    // now read in the listings
    Path sortedSourceListing = new Path(trackDir,
        DistCpConstants.SOURCE_SORTED_FILE);
    ContractTestUtils.assertIsFile(localFS, sortedSourceListing);
    Path sortedTargetListing = new Path(trackDir,
        DistCpConstants.TARGET_SORTED_FILE);
    ContractTestUtils.assertIsFile(localFS, sortedTargetListing);
    // deletion didn't happen
    ContractTestUtils.assertPathsExist(remoteFS,
        "DistCP should have retained",
        outputFile2, outputFile3, outputFile4, outputSubDir4);

    // now scan the table and see that things are there.
    Map<String, Path> sourceFiles = new HashMap<>(10);
    Map<String, Path> targetFiles = new HashMap<>(10);

    try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
        SequenceFile.Reader.file(sortedSourceListing));
         SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
             SequenceFile.Reader.file(sortedTargetListing))) {
      CopyListingFileStatus copyStatus = new CopyListingFileStatus();
      Text name = new Text();
      while(sourceReader.next(name, copyStatus)) {
        String key = name.toString();
        Path path = copyStatus.getPath();
        LOG.info("{}: {}", key, path);
        sourceFiles.put(key, path);
      }
      while(targetReader.next(name, copyStatus)) {
        String key = name.toString();
        Path path = copyStatus.getPath();
        LOG.info("{}: {}", key, path);
        targetFiles.put(name.toString(), copyStatus.getPath());
      }
    }

    // look for the new file in both lists
    assertTrue(sourceFiles.containsValue(inputFileNew1),
        "No " + outputFileNew1 + " in source listing");
    assertTrue(targetFiles.containsValue(outputFileNew1),
        "No " + outputFileNew1 + " in target listing");
    assertTrue(targetFiles.containsValue(outputSubDir4),
        "No " + outputSubDir4 + " in target listing");
    assertFalse(sourceFiles.containsValue(inputSubDir4),
        "Found " + inputSubDir4 + " in source listing");

  }

  public void lsR(final String description,
      final FileSystem fs,
      final Path dir) throws IOException {
    RemoteIterator<LocatedFileStatus> files = fs.listFiles(dir, true);
    LOG.info("{}: {}:", description, dir);
    StringBuilder sb = new StringBuilder();
    while(files.hasNext()) {
      LocatedFileStatus status = files.next();
      sb.append(String.format("  %s; type=%s; length=%d",
          status.getPath(),
          status.isDirectory()? "dir" : "file",
          status.getLen()));
    }
    LOG.info("{}", sb);
  }

  @Test
  public void largeFilesToRemote() throws Exception {
    describe("copy multiple large files from local to remote");
    largeFiles(localFS, localDir, remoteFS, remoteDir);
  }

  @Test
  public void testDeepDirectoryStructureFromRemote() throws Exception {
    describe("copy a deep directory structure from remote to local");
    distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
  }

  @Test
  public void testLargeFilesFromRemote() throws Exception {
    describe("copy multiple large files from remote to local");
    largeFiles(remoteFS, remoteDir, localFS, localDir);
  }

  @Test
  public void testSetJobId() throws Exception {
    describe("check jobId is set in the conf");
    remoteFS.create(new Path(remoteDir, "file1")).close();
    DistCpTestUtils
        .assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
            localDir.toString(), getDefaultCLIOptionsOrNull(), conf);
    assertNotNull(conf.get(CONF_LABEL_DISTCP_JOB_ID),
        "DistCp job id isn't set");
  }

  /**
   * Executes a DistCp using a file system sub-tree with multiple nesting
   * levels.
   * The filenames are those of the fields initialized in setup.
   *
   * @param srcFS source FileSystem
   * @param srcDir source directory
   * @param dstFS destination FileSystem
   * @param dstDir destination directory
   * @return the target directory of the copy
   * @throws Exception if there is a failure
   */
  private Path distCpDeepDirectoryStructure(FileSystem srcFS,
      Path srcDir,
      FileSystem dstFS,
      Path dstDir) throws Exception {
    initPathFields(srcDir, dstDir);

    mkdirs(srcFS, inputSubDir1);
    mkdirs(srcFS, inputSubDir2);
    byte[] data1 = dataset(100, 33, 43);
    createFile(srcFS, inputFile1, true, data1);
    byte[] data2 = dataset(200, 43, 53);
    createFile(srcFS, inputFile2, true, data2);
    byte[] data3 = dataset(300, 53, 63);
    createFile(srcFS, inputFile3, true, data3);
    createFile(srcFS, inputFile4, true, dataset(400, 53, 63));
    createFile(srcFS, inputFile5, true, dataset(500, 53, 63));
    Path target = new Path(dstDir, "outputDir");
    runDistCp(inputDir, target);
    ContractTestUtils.assertIsDirectory(dstFS, target);
    lsR("Destination tree after distcp", dstFS, target);
    verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
    verifyFileContents(dstFS,
        new Path(target, "inputDir/subDir1/file2"), data2);
    verifyFileContents(dstFS,
        new Path(target, "inputDir/subDir2/subDir2/file3"), data3);
    return target;
  }

  /**
   * Executes a test using multiple large files.
   *
   * @param srcFS source FileSystem
   * @param srcDir source directory
   * @param dstFS destination FileSystem
   * @param dstDir destination directory
   * @throws Exception if there is a failure
   */
  private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
      Path dstDir) throws Exception {
    int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
        getDefaultDistCPSizeKb());
    if (fileSizeKb < 1) {
      skip("File size in " + SCALE_TEST_DISTCP_FILE_SIZE_KB + " is zero");
    }
    initPathFields(srcDir, dstDir);
    Path largeFile1 = new Path(inputDir, "file1");
    Path largeFile2 = new Path(inputDir, "file2");
    Path largeFile3 = new Path(inputDir, "file3");
    int fileSizeMb = fileSizeKb / 1024;
    getLogger().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
    byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
    createFile(srcFS, largeFile1, true, data1);
    byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53);
    createFile(srcFS, largeFile2, true, data2);
    byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63);
    createFile(srcFS, largeFile3, true, data3);
    Path target = new Path(dstDir, "outputDir");
    runDistCp(inputDir, target);
    verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
    verifyFileContents(dstFS, new Path(target, "inputDir/file2"), data2);
    verifyFileContents(dstFS, new Path(target, "inputDir/file3"), data3);
  }

  /**
   * Override point. What is the default distcp size
   * for large files if not overridden by
   * {@link #SCALE_TEST_DISTCP_FILE_SIZE_KB}.
   * If 0 then, unless overridden in the configuration,
   * the large file tests will not run.
   * @return file size.
   */
  protected int getDefaultDistCPSizeKb() {
    return DEFAULT_DISTCP_SIZE_KB;
  }

  /**
   * Executes DistCp and asserts that the job finished successfully.
   * The choice of direct/indirect is based on the value of
   *  {@link #shouldUseDirectWrite()}.
   * @param src source path
   * @param dst destination path
   * @throws Exception if there is a failure
   */
  private void runDistCp(Path src, Path dst) throws Exception {
    if (shouldUseDirectWrite()) {
      runDistCpDirectWrite(src, dst);
    } else {
      runDistCpWithRename(src, dst);
    }
  }

  /**
   * Run the distcp job.
   * @param options distcp options
   * @return the job. It will have already completed.
   * @throws Exception failure
   */
  private Job runDistCp(final DistCpOptions options) throws Exception {
    Job job = new DistCp(conf, options).execute();
    assertNotNull(job, "Unexpected null job returned from DistCp execution.");
    assertTrue(job.isComplete(), "DistCp job did not complete.");
    assertTrue(job.isSuccessful(), "DistCp job did not complete successfully.");
    return job;
  }

  /**
   * Add any standard options and then build.
   * @param builder DistCp option builder
   * @return the build options
   */
  private DistCpOptions buildWithStandardOptions(
      DistCpOptions.Builder builder) {
    return builder
        .withNumListstatusThreads(DistCpOptions.MAX_NUM_LISTSTATUS_THREADS)
        .build();
  }

  /**
   * Creates a directory and any ancestor directories required.
   *
   * @param fs FileSystem in which to create directories
   * @param dir path of directory to create
   * @throws Exception if there is a failure
   */
  private static void mkdirs(FileSystem fs, Path dir) throws Exception {
    assertTrue(fs.mkdirs(dir), "Failed to mkdir " + dir);
  }

  @Test
  public void testDirectWrite() throws Exception {
    describe("copy file from local to remote using direct write option");
    if (shouldUseDirectWrite()) {
      skip("not needed as all other tests use the -direct option.");
    }
    directWrite(localFS, localDir, remoteFS, remoteDir, true);
  }

  @Test
  public void testNonDirectWrite() throws Exception {
    describe("copy file from local to remote without using direct write " +
        "option");
    directWrite(localFS, localDir, remoteFS, remoteDir, false);
  }

  @Test
  public void testDistCpWithIterator() throws Exception {
    describe("Build listing in distCp using the iterator option.");
    Path source = new Path(remoteDir, "src");
    Path dest = new Path(localDir, "dest");
    dest = localFS.makeQualified(dest);

    GenericTestUtils
        .createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());

    GenericTestUtils.LogCapturer log =
        GenericTestUtils.LogCapturer.captureLogs(SimpleCopyListing.LOG);

    String options = "-useiterator -update -delete" + getDefaultCLIOptions();
    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
        dest.toString(), options, conf);

    // Check the target listing was also done using iterator.
    Assertions.assertThat(log.getOutput()).contains(
        "Building listing using iterator mode for " + dest.toString());

    Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
        .describedAs("files").hasSize(getTotalFiles());
  }

  public int getDepth() {
    return DEFAULT_DEPTH;
  }

  public int getWidth() {
    return DEFAULT_WIDTH;
  }

  private int getTotalFiles() {
    int totalFiles = 0;
    for (int i = 1; i <= getDepth(); i++) {
      totalFiles += Math.pow(getWidth(), i);
    }
    return totalFiles;
  }

  /**
   * Override point: should direct write always be used?
   * false by default; enable for stores where rename is slow.
   * @return true if direct write should be used in all tests.
   */
  protected boolean shouldUseDirectWrite() {
    return false;
  }

  /**
   * Return the default options for distcp, including,
   * if {@link #shouldUseDirectWrite()} is true,
   * the -direct option.
   * Append or prepend this to string CLIs.
   * @return default options.
   */
  protected String getDefaultCLIOptions() {
    return shouldUseDirectWrite()
        ? " -direct "
        : "";
  }

  /**
   * Return the default options for distcp, including,
   * if {@link #shouldUseDirectWrite()} is true,
   * the -direct option, null if there are no
   * defaults.
   * @return default options.
   */
  protected String getDefaultCLIOptionsOrNull() {
    return shouldUseDirectWrite()
        ? " -direct "
        : null;
  }

  /**
   * Executes a test with support for using direct write option.
   *
   * @param srcFS source FileSystem
   * @param srcDir source directory
   * @param dstFS destination FileSystem
   * @param dstDir destination directory
   * @param directWrite whether to use -directwrite option
   * @throws Exception if there is a failure
   */
  private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
          Path dstDir, boolean directWrite) throws Exception {
    initPathFields(srcDir, dstDir);

    // Create 2 test files
    mkdirs(srcFS, inputSubDir1);
    byte[] data1 = dataset(64, 33, 43);
    createFile(srcFS, inputFile1, true, data1);
    byte[] data2 = dataset(200, 43, 53);
    createFile(srcFS, inputFile2, true, data2);
    Path target = new Path(dstDir, "outputDir");
    if (directWrite) {
      runDistCpDirectWrite(inputDir, target);
    } else {
      runDistCpWithRename(inputDir, target);
    }
    ContractTestUtils.assertIsDirectory(dstFS, target);
    lsR("Destination tree after distcp", dstFS, target);

    // Verify copied file contents
    verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
    verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"),
        data2);
  }

  /**
   * Run distcp -direct srcDir destDir.
   * @param srcDir local source directory
   * @param destDir remote destination directory
   * @return the completed job
   * @throws Exception any failure.
   */
  private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
          throws Exception {
    describe("\nDistcp -direct from " + srcDir + " to " + destDir);
    return runDistCp(buildWithStandardOptions(
            new DistCpOptions.Builder(
                    Collections.singletonList(srcDir), destDir)
                    .withDirectWrite(true)));
  }
  /**
   * Run distcp srcDir destDir.
   * @param srcDir local source directory
   * @param destDir remote destination directory
   * @return the completed job
   * @throws Exception any failure.
   */
  private Job runDistCpWithRename(Path srcDir, final Path destDir)
          throws Exception {
    describe("\nDistcp from " + srcDir + " to " + destDir);
    return runDistCp(buildWithStandardOptions(
            new DistCpOptions.Builder(
                    Collections.singletonList(srcDir), destDir)
                    .withDirectWrite(false)));
  }

  @Test
  public void testDistCpWithFile() throws Exception {
    describe("Distcp only file");

    Path source = new Path(remoteDir, "file");
    Path dest = new Path(localDir, "file");
    dest = localFS.makeQualified(dest);

    mkdirs(localFS, localDir);

    int len = 4;
    int base = 0x40;
    byte[] block = dataset(len, base, base + len);
    ContractTestUtils.createFile(remoteFS, source, true, block);
    verifyPathExists(remoteFS, "", source);
    verifyPathExists(localFS, "", localDir);

    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
        dest.toString(), getDefaultCLIOptionsOrNull(), conf);

    Assertions
        .assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
        .describedAs("files").hasSize(1);
    verifyFileContents(localFS, dest, block);
  }

  @Test
  public void testDistCpWithUpdateExistFile() throws Exception {
    describe("Now update an existing file.");

    Path source = new Path(remoteDir, "file");
    Path dest = new Path(localDir, "file");
    dest = localFS.makeQualified(dest);

    int len = 4;
    int base = 0x40;
    byte[] block = dataset(len, base, base + len);
    byte[] destBlock = dataset(len, base, base + len + 1);
    ContractTestUtils.createFile(remoteFS, source, true, block);
    ContractTestUtils.createFile(localFS, dest, true, destBlock);

    verifyPathExists(remoteFS, "", source);
    verifyPathExists(localFS, "", dest);
    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
        dest.toString(), "-delete -update" + getDefaultCLIOptions(), conf);

    Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
        .hasSize(1);
    verifyFileContents(localFS, dest, block);
  }

  @Test
  public void testDistCpUpdateCheckFileSkip() throws Exception {
    describe("Distcp update to check file skips.");

    Path source = new Path(remoteDir, "file");
    Path dest = new Path(localDir, "file");

    Path source0byte = new Path(remoteDir, "file_0byte");
    Path dest0byte = new Path(localDir, "file_0byte");
    dest = localFS.makeQualified(dest);
    dest0byte = localFS.makeQualified(dest0byte);

    // Creating a source file with certain dataset.
    byte[] sourceBlock = dataset(10, 'a', 'z');

    // Write the dataset.
    ContractTestUtils
        .writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
            1024, true);

    // Create 0 byte source and target files.
    ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
    ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);

    // Execute the distcp -update job.
    Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);

    // First distcp -update would normally copy the source to dest.
    verifyFileContents(localFS, dest, sourceBlock);
    // Verify 1 file was skipped in the distcp -update (The 0 byte file).
    // Verify 1 file was copied in the distcp -update (The new source file).
    verifySkipAndCopyCounter(job, 1, 1);

    // Remove the source file and replace with a file with same name and size
    // but different content.
    remoteFS.delete(source, false);
    Path updatedSource = new Path(remoteDir, "file");
    byte[] updatedSourceBlock = dataset(10, 'b', 'z');
    ContractTestUtils.writeDataset(remoteFS, updatedSource,
        updatedSourceBlock, updatedSourceBlock.length, 1024, true);

    // For testing purposes we would take the modification time of the
    // updated Source file and add an offset or subtract the offset and set
    // that time as the modification time for target file, this way we can
    // ensure that our test can emulate a scenario where source is either more
    // recently changed after -update so that copy takes place or target file
    // is more recently changed which would skip the copying since the source
    // has not been recently updated.
    FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
    long modTimeSourceUpd = fsSourceUpd.getModificationTime();

    // Add by an offset which would ensure enough gap for the test to
    // not fail due to race conditions.
    long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
    localFS.setTimes(dest, newTargetModTimeNew, -1);

    // Execute the distcp -update job.
    Job updatedSourceJobOldSrc =
        distCpUpdateWithFs(remoteDir, localDir, remoteFS,
            localFS);

    // File contents should remain same since the mod time for target is
    // newer than the updatedSource which indicates that the sync happened
    // more recently and there is no update.
    verifyFileContents(localFS, dest, sourceBlock);
    // Skipped both 0 byte file and sourceFile (since mod time of target is
    // older than the source it is perceived that source is of older version
    // and we can skip it's copy).
    verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);

    // Subtract by an offset which would ensure enough gap for the test to
    // not fail due to race conditions.
    long newTargetModTimeOld =
        Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
    localFS.setTimes(dest, newTargetModTimeOld, -1);

    // Execute the distcp -update job.
    Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
        remoteFS,
        localFS);

    // Verifying the target directory have both 0 byte file and the content
    // file.
    Assertions
        .assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
        .hasSize(2);
    // Now the copy should take place and the file contents should change
    // since the mod time for target is older than the source file indicating
    // that there was an update to the source after the last sync took place.
    verifyFileContents(localFS, dest, updatedSourceBlock);
    // Verifying we skipped the 0 byte file and copied the updated source
    // file (since the modification time of the new source is older than the
    // target now).
    verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
  }

  /**
   * Method to check the skipped and copied counters of a distcp job.
   *
   * @param job               job to check.
   * @param skipExpectedValue expected skip counter value.
   * @param copyExpectedValue expected copy counter value.
   * @throws IOException throw in case of failures.
   */
  private void verifySkipAndCopyCounter(Job job,
      int skipExpectedValue, int copyExpectedValue) throws IOException {
    // get the skip and copy counters from the job.
    long skipActualValue = job.getCounters()
        .findCounter(CopyMapper.Counter.SKIP).getValue();
    long copyActualValue = job.getCounters()
        .findCounter(CopyMapper.Counter.COPY).getValue();
    // Verify if the actual values equals the expected ones.
    assertEquals(copyExpectedValue,
        copyActualValue, "Mismatch in COPY counter value");
    assertEquals(skipExpectedValue,
        skipActualValue, "Mismatch in SKIP counter value");
  }
}