TestCopyListing.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
public class TestCopyListing extends SimpleCopyListing {
private static final Logger LOG = LoggerFactory.getLogger(TestCopyListing.class);
private static final Credentials CREDENTIALS = new Credentials();
private static final Configuration config = new Configuration();
private static MiniDFSCluster cluster;
@BeforeAll
public static void create() throws IOException {
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
.build();
}
@AfterAll
public static void destroy() {
if (cluster != null) {
cluster.shutdown();
}
}
public static Collection<Object[]> data() {
Object[][] data = new Object[][]{{1}, {2}, {10}, {20}};
return Arrays.asList(data);
}
public TestCopyListing() {
super(config, CREDENTIALS, 1, 0, false);
}
public void initTestCopyListing(int numListstatusThreads) {
initSimpleCopyListing(config, CREDENTIALS,
numListstatusThreads, 0, false);
}
@Override
protected long getBytesToCopy() {
return 0;
}
@Override
protected long getNumberOfPaths() {
return 0;
}
@Timeout(value = 10)
@ParameterizedTest
@MethodSource("data")
public void testMultipleSrcToFile(int pNumListstatusThreads) {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
try {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in/1"));
srcPaths.add(new Path("/tmp/in/2"));
final Path target = new Path("/tmp/out/1");
TestDistCpUtils.createFile(fs, "/tmp/in/1");
TestDistCpUtils.createFile(fs, "/tmp/in/2");
fs.mkdirs(target);
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.build();
validatePaths(new DistCpContext(options));
TestDistCpUtils.delete(fs, "/tmp");
//No errors
fs.create(target).close();
try {
validatePaths(new DistCpContext(options));
fail("Invalid inputs accepted");
} catch (InvalidInputException ignore) { }
TestDistCpUtils.delete(fs, "/tmp");
srcPaths.clear();
srcPaths.add(new Path("/tmp/in/1"));
fs.mkdirs(new Path("/tmp/in/1"));
fs.create(target).close();
try {
validatePaths(new DistCpContext(options));
fail("Invalid inputs accepted");
} catch (InvalidInputException ignore) { }
TestDistCpUtils.delete(fs, "/tmp");
} catch (IOException e) {
LOG.error("Exception encountered ", e);
fail("Test input validation failed");
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
@ParameterizedTest
@Timeout(value = 10)
@MethodSource("data")
public void testDuplicates(int pNumListstatusThreads) {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
try {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in/*/*"));
TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
Path target = new Path("/tmp/out");
Path listingFile = new Path("/tmp/list");
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.build();
final DistCpContext context = new DistCpContext(options);
CopyListing listing = CopyListing.getCopyListing(getConf(), CREDENTIALS,
context);
try {
listing.buildListing(listingFile, context);
fail("Duplicates not detected");
} catch (DuplicateFileException ignore) {
}
} catch (IOException e) {
LOG.error("Exception encountered in test", e);
fail("Test failed " + e.getMessage());
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
@ParameterizedTest
@Timeout(value = 10)
@MethodSource("data")
public void testDiffBasedSimpleCopyListing(int pNumListstatusThreads)
throws IOException {
initTestCopyListing(pNumListstatusThreads);
assertThrows(DuplicateFileException.class, () -> {
FileSystem fs = null;
Configuration configuration = getConf();
DistCpSync distCpSync = Mockito.mock(DistCpSync.class);
Path listingFile = new Path("/tmp/list");
// Throws DuplicateFileException as it recursively traverses src3 directory
// and also adds 3.txt,4.txt twice
configuration.setBoolean(
DistCpConstants.CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY, true);
try {
fs = FileSystem.get(getConf());
buildListingUsingSnapshotDiff(fs, configuration, distCpSync, listingFile);
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
});
}
@ParameterizedTest
@Timeout(value = 10)
@MethodSource("data")
public void testDiffBasedSimpleCopyListingWithoutTraverseDirectory(
int pNumListstatusThreads) throws IOException {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
Configuration configuration = getConf();
DistCpSync distCpSync = Mockito.mock(DistCpSync.class);
Path listingFile = new Path("/tmp/list");
// no exception expected in this case
configuration.setBoolean(
DistCpConstants.CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY, false);
try {
fs = FileSystem.get(getConf());
buildListingUsingSnapshotDiff(fs, configuration, distCpSync, listingFile);
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
private void buildListingUsingSnapshotDiff(FileSystem fs,
Configuration configuration, DistCpSync distCpSync, Path listingFile)
throws IOException {
List<Path> srcPaths = new ArrayList<>();
srcPaths.add(new Path("/tmp/in"));
TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src3/3.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src3/4.txt");
Path target = new Path("/tmp/out");
// adding below flags useDiff & sync only to enable context.shouldUseSnapshotDiff()
final DistCpOptions options =
new DistCpOptions.Builder(srcPaths, target).withUseDiff("snap1",
"snap2").withSyncFolder(true).build();
// Create a dummy DiffInfo List that contains a directory + paths inside
// that directory as part of the diff.
ArrayList<DiffInfo> diffs = new ArrayList<>();
diffs.add(new DiffInfo(new Path("/tmp/in/src3/"), new Path("/tmp/in/src3/"),
SnapshotDiffReport.DiffType.CREATE));
diffs.add(new DiffInfo(new Path("/tmp/in/src3/3.txt"),
new Path("/tmp/in/src3/3.txt"), SnapshotDiffReport.DiffType.CREATE));
diffs.add(new DiffInfo(new Path("/tmp/in/src3/4.txt"),
new Path("/tmp/in/src3/4.txt"), SnapshotDiffReport.DiffType.CREATE));
Mockito.when(distCpSync.prepareDiffListForCopyListing()).thenReturn(diffs);
final DistCpContext context = new DistCpContext(options);
CopyListing listing =
new SimpleCopyListing(configuration, CREDENTIALS, distCpSync);
listing.buildListing(listingFile, context);
}
@ParameterizedTest
@MethodSource("data")
public void testDuplicateSourcePaths(int pNumListstatusThreads) throws Exception {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
try {
srcPaths.add(new Path("/tmp/in"));
srcPaths.add(new Path("/tmp/in"));
TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
Path target = new Path("/tmp/out");
Path listingFile = new Path("/tmp/list");
final DistCpOptions options =
new DistCpOptions.Builder(srcPaths, target).build();
final DistCpContext context = new DistCpContext(options);
CopyListing listing =
CopyListing.getCopyListing(getConf(), CREDENTIALS, context);
listing.buildListing(listingFile, context);
assertTrue(fs.exists(listingFile));
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
@ParameterizedTest
@Timeout(value = 10)
@MethodSource("data")
public void testBuildListing(int pNumListstatusThreads) {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
try {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
Path p1 = new Path("/tmp/in/1");
Path p2 = new Path("/tmp/in/2");
Path p3 = new Path("/tmp/in2/2");
Path target = new Path("/tmp/out/1");
srcPaths.add(p1.getParent());
srcPaths.add(p3.getParent());
TestDistCpUtils.createFile(fs, "/tmp/in/1");
TestDistCpUtils.createFile(fs, "/tmp/in/2");
TestDistCpUtils.createFile(fs, "/tmp/in2/2");
fs.mkdirs(target);
OutputStream out = fs.create(p1);
out.write("ABC".getBytes());
out.close();
out = fs.create(p2);
out.write("DEF".getBytes());
out.close();
out = fs.create(p3);
out.write("GHIJ".getBytes());
out.close();
Path listingFile = new Path("/tmp/file");
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.withSyncFolder(true)
.build();
CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
try {
listing.buildListing(listingFile, new DistCpContext(options));
fail("Duplicates not detected");
} catch (DuplicateFileException ignore) {
}
assertThat(listing.getBytesToCopy()).isEqualTo(10);
assertThat(listing.getNumberOfPaths()).isEqualTo(3);
TestDistCpUtils.delete(fs, "/tmp");
try {
listing.buildListing(listingFile, new DistCpContext(options));
fail("Invalid input not detected");
} catch (InvalidInputException ignore) {
}
TestDistCpUtils.delete(fs, "/tmp");
} catch (IOException e) {
LOG.error("Exception encountered ", e);
fail("Test build listing failed");
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
@ParameterizedTest
@Timeout(value = 60)
@MethodSource("data")
public void testWithRandomFileListing(int pNumListstatusThreads)
throws IOException {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
try {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<>();
List<Path> srcFiles = new ArrayList<>();
Path target = new Path("/tmp/out/1");
final int pathCount = 25;
for (int i = 0; i < pathCount; i++) {
Path p = new Path("/tmp", String.valueOf(i));
srcPaths.add(p);
fs.mkdirs(p);
Path fileName = new Path(p, i + ".txt");
srcFiles.add(fileName);
try (OutputStream out = fs.create(fileName)) {
out.write(i);
}
}
Path listingFile = new Path("/tmp/file");
final DistCpOptions options = new DistCpOptions.Builder(srcPaths, target)
.withSyncFolder(true).build();
// Check without randomizing files
getConf().setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
listing.buildListing(listingFile, new DistCpContext(options));
assertEquals(listing.getNumberOfPaths(), pathCount);
validateFinalListing(listingFile, srcFiles);
fs.delete(listingFile, true);
// Check with randomized file listing
getConf().setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
listing = new SimpleCopyListing(getConf(), CREDENTIALS);
// Set the seed for randomness, so that it can be verified later
long seed = System.nanoTime();
listing.setSeedForRandomListing(seed);
listing.buildListing(listingFile, new DistCpContext(options));
assertEquals(listing.getNumberOfPaths(), pathCount);
// validate randomness
Collections.shuffle(srcFiles, new Random(seed));
validateFinalListing(listingFile, srcFiles);
} finally {
TestDistCpUtils.delete(fs, "/tmp");
}
}
private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
throws IOException {
FileSystem fs = pathToListFile.getFileSystem(config);
try (SequenceFile.Reader reader = new SequenceFile.Reader(
config, SequenceFile.Reader.file(pathToListFile))) {
CopyListingFileStatus currentVal = new CopyListingFileStatus();
Text currentKey = new Text();
int idx = 0;
while (reader.next(currentKey)) {
reader.getCurrentValue(currentVal);
assertEquals(fs.makeQualified(srcFiles.get(idx)),
currentVal.getPath(), "srcFiles.size=" + srcFiles.size() +
", idx=" + idx);
if (LOG.isDebugEnabled()) {
LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
}
idx++;
}
}
}
@ParameterizedTest
@Timeout(value = 10)
@MethodSource("data")
public void testBuildListingForSingleFile(int pNumListstatusThreads) {
initTestCopyListing(pNumListstatusThreads);
FileSystem fs = null;
String testRootString = "/singleFileListing";
Path testRoot = new Path(testRootString);
SequenceFile.Reader reader = null;
try {
fs = FileSystem.get(getConf());
if (fs.exists(testRoot))
TestDistCpUtils.delete(fs, testRootString);
Path sourceFile = new Path(testRoot, "/source/foo/bar/source.txt");
Path decoyFile = new Path(testRoot, "/target/moo/source.txt");
Path targetFile = new Path(testRoot, "/target/moo/target.txt");
TestDistCpUtils.createFile(fs, sourceFile.toString());
TestDistCpUtils.createFile(fs, decoyFile.toString());
TestDistCpUtils.createFile(fs, targetFile.toString());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(sourceFile);
DistCpOptions options = new DistCpOptions.Builder(srcPaths, targetFile)
.build();
CopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
final Path listFile = new Path(testRoot, "/tmp/fileList.seq");
listing.buildListing(listFile, new DistCpContext(options));
reader = new SequenceFile.Reader(getConf(), SequenceFile.Reader.file(listFile));
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
Text relativePath = new Text();
assertTrue(reader.next(relativePath, fileStatus));
assertTrue(relativePath.toString().equals(""));
}
catch (Exception e) {
fail("Unexpected exception encountered.");
LOG.error("Unexpected exception: ", e);
}
finally {
TestDistCpUtils.delete(fs, testRootString);
IOUtils.closeStream(reader);
}
}
@ParameterizedTest
@MethodSource("data")
public void testFailOnCloseError(int pNumListstatusThreads) throws IOException {
initTestCopyListing(pNumListstatusThreads);
File inFile = File.createTempFile("TestCopyListingIn", null);
inFile.deleteOnExit();
File outFile = File.createTempFile("TestCopyListingOut", null);
outFile.deleteOnExit();
List<Path> srcs = new ArrayList<Path>();
srcs.add(new Path(inFile.toURI()));
Exception expectedEx = new IOException("boom");
SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
doThrow(expectedEx).when(writer).close();
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
final DistCpOptions options = new DistCpOptions.Builder(srcs,
new Path(outFile.toURI())).build();
Exception actualEx = null;
try {
listing.doBuildListing(writer, new DistCpContext(options));
} catch (Exception e) {
actualEx = e;
}
assertNotNull(actualEx, "close writer didn't fail");
assertEquals(expectedEx, actualEx);
}
}