ITestTreewalkProblems.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.assertNoUploadsAt;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.magicPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.toPathList;
import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.ToolRunner.run;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromArray;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* Test behavior of treewalking when there are pending
* uploads. All commands MUST work.
* Currently, the only one which doesn't is distcp;
* some tests do have different assertions about directories
* found.
*/
public class ITestTreewalkProblems extends AbstractS3ACostTest {
/**
* Exit code to expect on a shell failure.
*/
public static final int SHELL_FAILURE = 1;
/**
* Are directory listings potentially inconsistent?
*/
private boolean listingInconsistent;
@Override
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
MAGIC_COMMITTER_ENABLED);
conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
}
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
final S3AFileSystem fs = getFileSystem();
final Path path = methodPath();
assertHasPathCapabilities(fs, path, DIRECTORY_OPERATIONS_PURGE_UPLOADS);
listingInconsistent = fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT);
clearAnyUploads(fs, path);
}
@Test
public void testListFilesDeep() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
LOG.info("listFiles({}, true)", src);
foreach(fs.listFiles(src, true), e -> LOG.info("{}", e));
LOG.info("listFiles({}, true)", src);
foreach(fs.listLocatedStatus(src), e -> LOG.info("{}", e));
// and just verify a cleanup works
Assertions.assertThat(fs.getS3AInternals().abortMultipartUploads(src))
.describedAs("Aborted uploads under %s", src)
.isEqualTo(1);
assertNoUploadsAt(fs, src);
}
/**
* Create a directory methodPath()/src with a magic upload underneath,
* with the upload pointing at {@code src/subdir/file.txt}.
* @return the directory created
* @throws IOException creation problems
*/
private Path createDirWithUpload() throws IOException {
final S3AFileSystem fs = getFileSystem();
final Path src = new Path(methodPath(), "src");
// create a magic file.
createMagicFile(fs, src);
fs.delete(magicPath(src), true);
return src;
}
@Test
public void testLocatedFileStatusFetcher() throws Throwable {
describe("Validate mapreduce LocatedFileStatusFetcher");
final Path src = createDirWithUpload();
Configuration listConfig = new Configuration(getConfiguration());
listConfig.setInt(LIST_STATUS_NUM_THREADS, 2);
LocatedFileStatusFetcher fetcher =
new LocatedFileStatusFetcher(listConfig, new Path[]{src}, true, HIDDEN_FILE_FILTER, true);
Assertions.assertThat(fetcher.getFileStatuses()).hasSize(0);
}
@Test
public void testGetContentSummaryDirsAndFiles() throws Throwable {
describe("FileSystem.getContentSummary()");
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
fs.mkdirs(new Path(src, "child"));
final Path path = methodPath();
file(new Path(path, "file"));
final int dirs = listingInconsistent ? 3 : 3;
assertContentSummary(path, dirs, 1);
}
/**
* Execute getContentSummary() down a directory tree which only
* contains a single real directory.
* This test case has been a bit inconsistent between different store
* types.
*/
@Test
public void testGetContentSummaryPendingDir() throws Throwable {
describe("FileSystem.getContentSummary() with pending dir");
assertContentSummary(createDirWithUpload(), 1, 0);
}
/**
* Make an assertions about the content summary of a path.
* @param path path to scan
* @param dirs number of directories to find.
* @param files number of files to find
* @throws IOException scanning problems
*/
private void assertContentSummary(
final Path path,
final int dirs,
final int files) throws IOException {
ContentSummary summary = getFileSystem().getContentSummary(path);
Assertions.assertThat(summary.getDirectoryCount())
.describedAs("dir count under %s of %s", path, summary)
.isEqualTo(dirs);
Assertions.assertThat(summary.getFileCount())
.describedAs("filecount count under %s of %s", path, summary)
.isEqualTo(files);
}
/**
* Execute getContentSummary() down a directory tree which only
* contains a single real directory.
*/
@Test
public void testGetContentSummaryFiles() throws Throwable {
describe("FileSystem.getContentSummary()");
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
fs.mkdirs(new Path(src, "child"));
final Path base = methodPath();
touch(fs, new Path(base, "file"));
assertContentSummary(base, 3, 1);
}
/**
* Test all the various filesystem.list* calls.
* Bundled into one test case to reduce setup/teardown overhead.
*/
@Test
public void testListStatusOperations() throws Throwable {
describe("FileSystem liststtus calls");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path file = new Path(base, "file");
final Path dir2 = new Path(base, "dir2");
// path in a child dir
final Path childfile = new Path(dir2, "childfile");
file(childfile);
file(file);
fs.mkdirs(dir2);
Assertions.assertThat(toPathList(fs.listStatusIterator(base)))
.describedAs("listStatusIterator(%s)", base)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(remoteIteratorFromArray(fs.listStatus(base))))
.describedAs("listStatusIterator(%s, false)", false)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(fs.listFiles(base, false)))
.describedAs("listfiles(%s, false)", false)
.containsExactly(file);
Assertions.assertThat(toPathList(fs.listFiles(base, true)))
.describedAs("listfiles(%s, true)", false)
.containsExactlyInAnyOrder(file, childfile);
Assertions.assertThat(toPathList(fs.listLocatedStatus(base, (p) -> true)))
.describedAs("listLocatedStatus(%s, true)", false)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(fs.listLocatedStatus(base)))
.describedAs("listLocatedStatus(%s, true)", false)
.contains(src, dir2, file);
}
@Test
public void testShellList() throws Throwable {
describe("Validate hadoop fs -ls sorted and unsorted");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
createDirWithUpload();
fs.mkdirs(new Path(base, "dir2"));
// recursive not sorted
shell(base, "-ls", "-R", base.toUri().toString());
// recursive sorted
shell(base, "-ls", "-R", "-S", base.toUri().toString());
}
@Test
public void testShellDu() throws Throwable {
describe("Validate hadoop fs -du");
final Path base = methodPath();
createDirWithUpload();
shell(base, "-du", base.toUri().toString());
}
@Test
public void testShellDf() throws Throwable {
describe("Validate hadoop fs -df");
final Path base = methodPath();
final String p = base.toUri().toString();
shell(SHELL_FAILURE, base, "-df", p);
createDirWithUpload();
shell(base, "-df", p);
}
@Test
public void testShellFind() throws Throwable {
describe("Validate hadoop fs -ls -R");
final Path base = methodPath();
final String p = base.toUri().toString();
shell(SHELL_FAILURE, base, "-find", p, "-print");
createDirWithUpload();
shell(base, "-find", p, "-print");
}
@Test
public void testDistCp() throws Throwable {
describe("Validate distcp");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
file(new Path(src, "real-file"));
final String options = "-useiterator -update -delete -direct";
if (!fs.hasPathCapability(base, DIRECTORY_LISTING_INCONSISTENT)) {
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
options, getConfiguration());
} else {
// distcp fails if uploads are visible
intercept(org.junit.ComparisonFailure.class, () -> {
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
options, getConfiguration());
});
}
}
@Test
public void testDistCpNoIterator() throws Throwable {
describe("Validate distcp");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
file(new Path(src, "real-file"));
final String options = "-update -delete -direct";
if (!fs.hasPathCapability(base, DIRECTORY_LISTING_INCONSISTENT)) {
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
options, getConfiguration());
} else {
// distcp fails if uploads are visible
intercept(org.junit.ComparisonFailure.class, () -> {
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
options, getConfiguration());
});
}
}
/**
* CTU is also doing treewalking, though it's test only.
*/
@Test
public void testContractTestUtilsTreewalk() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
createDirWithUpload();
final ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, base);
ContractTestUtils.TreeScanResults listing =
new ContractTestUtils.TreeScanResults(fs.listFiles(base, true));
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing, treeWalk.getFiles(),
listing.getFiles());
}
/**
* Globber is already resilient to missing directories; a relic
* of the time when HEAD requests on s3 objects could leave the
* 404 in S3 front end cache.
*/
@Test
public void testGlobberTreewalk() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
// this is the pending dir
final Path subdir = new Path(src, "subdir/");
final Path dest = new Path(base, "dest");
final Path monday = new Path(dest, "day=monday");
final Path realFile = file(new Path(monday, "real-file.parquet"));
assertGlob(fs, new Path(base, "*/*/*.parquet"), realFile);
if (listingInconsistent) {
assertGlob(fs, new Path(base, "*"), src, dest);
assertGlob(fs, new Path(base, "*/*"), subdir, monday);
} else {
assertGlob(fs, new Path(base, "*"), src, dest);
assertGlob(fs, new Path(base, "*/*"), monday);
}
}
private static void assertGlob(final S3AFileSystem fs,
final Path pattern,
final Path... expected)
throws IOException {
final FileStatus[] globbed = fs.globStatus(pattern,
(f) -> true);
final List<Path> paths = Arrays.stream(globbed).map(s -> s.getPath())
.collect(Collectors.toList());
Assertions.assertThat(paths)
.describedAs("glob(%s)", pattern)
.containsExactlyInAnyOrder(expected);
}
@Test
public void testFileInputFormatSplits() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
final Path monday = new Path(dest, "day=monday");
final int count = 4;
List<Path> files = new ArrayList<>();
for (int i = 0; i < count; i++) {
files.add(file(new Path(monday, "file-0" + i + ".parquet")));
}
final JobContextImpl jobContext = new JobContextImpl(getConfiguration(), new JobID("job", 1));
final JobConf jc = (JobConf) jobContext.getConfiguration();
jc.set("mapreduce.input.fileinputformat.inputdir", base.toUri().toString());
jc.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
final TextInputFormat inputFormat = new TextInputFormat();
final List<Path> paths = inputFormat.getSplits(jobContext).stream().map(s ->
((FileSplit) s).getPath())
.collect(Collectors.toList());
Assertions.assertThat(paths)
.describedAs("input split of base directory")
.containsExactlyInAnyOrderElementsOf(files);
}
/**
* Exec a shell command; require it to succeed.
* @param base base dir
* @param command command sequence
* @throws Exception failure
*/
private void shell(final Path base, final String... command) throws Exception {
shell(0, base, command);
}
/**
* Exec a shell command; require the result to match the expected outcome.
* @param expected expected outcome
* @param base base dir
* @param command command sequence
* @throws Exception failure
*/
private void shell(int expected, final Path base, final String... command) throws Exception {
Assertions.assertThat(run(getConfiguration(), new FsShell(), command))
.describedAs("%s %s", command[0], base)
.isEqualTo(expected);
}
/**
* Assert the upload count under a dir is the expected value.
* Failure message will include the list of entries.
* @param dir dir
* @param expected expected count
* @throws IOException listing problem
*/
private void assertUploadCount(final Path dir, final int expected) throws IOException {
Assertions.assertThat(toList(listUploads(dir)))
.describedAs("uploads under %s", dir)
.hasSize(expected);
}
/**
* List uploads; use the same APIs that the directory operations use,
* so implicitly validating them.
* @param dir directory to list
* @return full list of entries
* @throws IOException listing problem
*/
private RemoteIterator<MultipartUpload> listUploads(Path dir) throws IOException {
final S3AFileSystem fs = getFileSystem();
try (AuditSpan ignored = span()) {
final StoreContext sc = fs.createStoreContext();
return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir));
}
}
}