ITestLocatedFileStatusFetcher.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.s3a.auth.ITestRestrictedReadAccess;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
/**
* Test the LocatedFileStatusFetcher can do.
* This is related to HADOOP-16458.
* There's basic tests in ITestS3AFSMainOperations; this
* is see if we can create better corner cases.
* <p></p>
* Much of the class is based on tests in {@link ITestRestrictedReadAccess},
* but whereas that tests failure paths, this looks at the performance
* of successful invocations.
*/
public class ITestLocatedFileStatusFetcher extends AbstractS3ATestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestLocatedFileStatusFetcher.class);
/** Filter to select everything. */
private static final PathFilter EVERYTHING = t -> true;
/** Filter to select .txt files. */
private static final PathFilter TEXT_FILE =
path -> path.toUri().toString().endsWith(".txt");
/** The same path filter used in FileInputFormat. */
private static final PathFilter HIDDEN_FILE_FILTER =
(p) -> {
String n = p.getName();
return !n.startsWith("_") && !n.startsWith(".");
};
/**
* Text found in LocatedFileStatusFetcher exception when the glob
* returned "null".
*/
private static final String DOES_NOT_EXIST = "does not exist";
/**
* Text found in LocatedFileStatusFetcher exception when
* the glob returned an empty list.
*/
private static final String MATCHES_0_FILES = "matches 0 files";
/**
* Text used in files.
*/
public static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
/**
* How many list calls are expected in a run which collects them: {@value}.
*/
private static final int EXPECTED_LIST_COUNT = 4;
private Path basePath;
private Path emptyDir;
private Path emptyFile;
private Path subDir;
private Path subdirFile;
private Path subDir2;
private Path subdir2File1;
private Path subdir2File2;
private Configuration listConfig;
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
S3AFileSystem fs
= getFileSystem();
basePath = methodPath();
// define the paths and create them.
describe("Creating test directories and files");
// an empty directory directory under the noReadDir
emptyDir = new Path(basePath, "emptyDir");
fs.mkdirs(emptyDir);
// an empty file directory under the noReadDir
emptyFile = new Path(basePath, "emptyFile.txt");
touch(fs, emptyFile);
// a subdirectory
subDir = new Path(basePath, "subDir");
// and a file in that subdirectory
subdirFile = new Path(subDir, "subdirFile.txt");
createFile(fs, subdirFile, true, HELLO);
subDir2 = new Path(subDir, "subDir2");
subdir2File1 = new Path(subDir2, "subdir2File1.txt");
subdir2File2 = new Path(subDir2, "subdir2File2.txt");
createFile(fs, subdir2File1, true, HELLO);
createFile(fs, subdir2File2, true, HELLO);
listConfig = new Configuration(getConfiguration());
}
/**
* Assert that the fetcher stats logs the expected number of calls.
* @param fetcher fetcher
* @param expectedListCount expected number of list calls
*/
private void assertListCount(final LocatedFileStatusFetcher fetcher,
final int expectedListCount) {
IOStatistics iostats = extractStatistics(fetcher);
LOG.info("Statistics of fetcher: {}", iostats);
assertThatStatisticCounter(iostats,
OBJECT_LIST_REQUEST)
.describedAs("stats of %s", iostats)
.isEqualTo(expectedListCount);
}
/**
* Run a located file status fetcher against the directory tree.
*/
@Test
public void testSingleThreadedLocatedFileStatus() throws Throwable {
describe("LocatedFileStatusFetcher operations");
// use the same filter as FileInputFormat; single thread.
listConfig.setInt(LIST_STATUS_NUM_THREADS, 1);
LocatedFileStatusFetcher fetcher =
new LocatedFileStatusFetcher(
listConfig,
new Path[]{basePath},
true,
HIDDEN_FILE_FILTER,
true);
Iterable<FileStatus> stats = fetcher.getFileStatuses();
Assertions.assertThat(stats)
.describedAs("result of located scan")
.flatExtracting(FileStatus::getPath)
.containsExactlyInAnyOrder(
emptyFile,
subdirFile,
subdir2File1,
subdir2File2);
assertListCount(fetcher, EXPECTED_LIST_COUNT);
}
/**
* Run a located file status fetcher against the directory tree.
*/
@Test
public void testLocatedFileStatusFourThreads() throws Throwable {
// four threads and the text filter.
int threads = 4;
describe("LocatedFileStatusFetcher with %d", threads);
listConfig.setInt(LIST_STATUS_NUM_THREADS, threads);
LocatedFileStatusFetcher fetcher =
new LocatedFileStatusFetcher(
listConfig,
new Path[]{basePath},
true,
EVERYTHING,
true);
Iterable<FileStatus> stats = fetcher.getFileStatuses();
IOStatistics iostats = extractStatistics(fetcher);
LOG.info("Statistics of fetcher: {}", iostats);
Assertions.assertThat(stats)
.describedAs("result of located scan")
.isNotNull()
.flatExtracting(FileStatus::getPath)
.containsExactlyInAnyOrder(
emptyFile,
subdirFile,
subdir2File1,
subdir2File2);
assertListCount(fetcher, EXPECTED_LIST_COUNT);
}
/**
* Run a located file status fetcher against a file.
*/
@Test
public void testLocatedFileStatusScanFile() throws Throwable {
// pass in a file as the base of the scan.
describe("LocatedFileStatusFetcher with file %s", subdirFile);
listConfig.setInt(LIST_STATUS_NUM_THREADS, 16);
LocatedFileStatusFetcher fetcher
= new LocatedFileStatusFetcher(
listConfig,
new Path[]{subdirFile},
true,
TEXT_FILE,
true);
Iterable<FileStatus> stats = fetcher.getFileStatuses();
Assertions.assertThat(stats)
.describedAs("result of located scan")
.isNotNull()
.flatExtracting(FileStatus::getPath)
.containsExactly(subdirFile);
IOStatistics ioStatistics = fetcher.getIOStatistics();
Assertions.assertThat(ioStatistics)
.describedAs("IO statistics of %s", fetcher)
.isNull();
}
}