ManifestCommitterTestSupport.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.util.functional.RemoteIterators;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO.toPath;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Support for committer tests.
*/
public final class ManifestCommitterTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(
ManifestCommitterTestSupport.class);
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
/**
* Build directory property.
* Value: {@value}.
*/
public static final String PROJECT_BUILD_DIRECTORY_PROPERTY
= "project.build.directory";
/**
* default number of task attempts for some tests.
* Value: {@value}.
*/
public static final int NUMBER_OF_TASK_ATTEMPTS = 2000;
/**
* Smaller number of task attempts for some tests against object
* stores where IO overhead is higher.
* Value: {@value}.
*/
public static final int NUMBER_OF_TASK_ATTEMPTS_SMALL = 200;
private ManifestCommitterTestSupport() {
}
/**
* Create a random Job ID using the fork ID as part of the number if
* set in the current process.
* @return fork ID string in a format parseable by Jobs
*/
public static String randomJobId() {
String testUniqueForkId = System.getProperty("test.unique.fork.id", "0001");
int l = testUniqueForkId.length();
String trailingDigits = testUniqueForkId.substring(l - 4, l);
int digitValue;
try {
digitValue = Integer.valueOf(trailingDigits);
} catch (NumberFormatException e) {
digitValue = 0;
}
return String.format("%s%04d_%04d",
FORMATTER.format(LocalDateTime.now()),
(long) (Math.random() * 1000),
digitValue);
}
public static File getProjectBuildDir() {
String propval = System.getProperty(PROJECT_BUILD_DIRECTORY_PROPERTY);
if (StringUtils.isEmpty(propval)) {
propval = "target";
}
return new File(propval).getAbsoluteFile();
}
/**
* Load a success file; fail if the file is empty/nonexistent.
* @param fs filesystem
* @param outputPath directory containing the success file.
* @return the loaded file.
* @throws IOException failure to find/load the file
* @throws AssertionError file is 0-bytes long,
*/
public static ManifestSuccessData loadSuccessFile(final FileSystem fs,
final Path outputPath) throws IOException {
Path success = new Path(outputPath, SUCCESS_MARKER);
return ManifestSuccessData.load(fs, success);
}
/**
* Load in the success data marker.
* @param fs filesystem
* @param outputDir ouptu path of job
* @param minimumFileCount minimum number of files to have been created
* @param jobId job ID, only verified if non-empty
* @return the success data
* @throws IOException IO failure
*/
public static ManifestSuccessData validateSuccessFile(
final FileSystem fs,
final Path outputDir,
final int minimumFileCount,
final String jobId) throws IOException {
Path successPath = new Path(outputDir, SUCCESS_MARKER);
ManifestSuccessData successData
= loadAndPrintSuccessData(fs, successPath);
assertThat(successData.getCommitter())
.describedAs("Committer field")
.isEqualTo(MANIFEST_COMMITTER_CLASSNAME);
assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSizeGreaterThanOrEqualTo(minimumFileCount);
if (isNotEmpty(jobId)) {
assertThat(successData.getJobId())
.describedAs("JobID")
.isEqualTo(jobId);
}
return successData;
}
/**
* Load in and print a success data manifest.
* @param fs filesystem
* @param successPath full path to success file.
* @return the success data
* @throws IOException IO failure
*/
public static ManifestSuccessData loadAndPrintSuccessData(
FileSystem fs,
Path successPath) throws IOException {
LOG.info("Manifest {}", successPath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
final ManifestPrinter showManifest = new ManifestPrinter(fs.getConf(), ps);
ManifestSuccessData successData = showManifest.loadAndPrintManifest(fs, successPath);
LOG.info("{}", baos);
return successData;
}
/**
* Validate all generated files from the manifest.
* All files in the manifest must exist.
* If the exclusive flag is set, only those must exist
* (ignoring all temp files and everything in the _temporary
* dir)
* @param fs filesystem
* @param destDir dest dir to scan
* @param successData manifest
* @param exclusive expect exclusive and complete data.
* @return the files and their status
* @throws IOException IO failure.
*/
public static Map<Path, LocatedFileStatus> validateGeneratedFiles(
FileSystem fs,
Path destDir,
ManifestSuccessData successData,
boolean exclusive) throws IOException {
Map<Path, LocatedFileStatus> fileListing = new HashMap<>();
RemoteIterators.foreach(fs.listFiles(destDir, true),
e -> {
if (!e.getPath().getName().startsWith("_")) {
fileListing.put(e.getPath(), e);
}
});
final List<Path> actual = fileListing.keySet().stream()
.sorted(Comparator.comparing(Path::getName))
.collect(Collectors.toList());
// map has all files other than temp ones and the success marker
// what do we expect
final List<Path> expected = filesInManifest(successData);
expected.sort(Comparator.comparing(Path::getName));
// all of those must be found
Assertions.assertThat(actual)
.describedAs("Files in FS expected to contain all listed in manifest")
.containsAll(expected);
// and if exclusive, that too
if (exclusive) {
Assertions.assertThat(actual)
.describedAs("Files in FS expected to be exclusively of the job")
.hasSize(expected.size())
.containsExactlyInAnyOrderElementsOf(expected);
}
return fileListing;
}
/**
* Given a manifest, get the list of filenames
* and convert to paths.
* @param successData data
* @return the paths.
*/
public static List<Path> filesInManifest(ManifestSuccessData successData) {
return successData.getFilenames().stream()
.map(AbstractManifestData::unmarshallPath)
.collect(Collectors.toList());
}
/**
* List a directory/directory tree.
* @param fileSystem FS
* @param path path
* @param recursive do a recursive listing?
* @return the number of files found.
* @throws IOException failure.
*/
public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
throws Exception {
if (path == null) {
// surfaces when someone calls getParent() on something at the top
// of the path
LOG.info("Empty path");
return 0;
}
return RemoteIterators.foreach(fileSystem.listFiles(path, recursive),
(status) -> LOG.info("{}", status));
}
/**
* Assert that a file or dir entry matches the given parameters.
* Matching on paths, not strings, helps validate marshalling.
* @param fileOrDir file or directory
* @param src source path
* @param dest dest path
* @param l length
*/
static void assertFileEntryMatch(
final FileEntry fileOrDir,
final Path src,
final Path dest,
final long l) {
String entry = fileOrDir.toString();
assertThat(fileOrDir.getSourcePath())
.describedAs("Source path of " + entry)
.isEqualTo(src);
assertThat(fileOrDir.getDestPath())
.describedAs("Dest path of " + entry)
.isEqualTo(dest);
assertThat(fileOrDir.getSize())
.describedAs("Size of " + entry)
.isEqualTo(l);
}
/**
* Assert that a dir entry matches the given parameters.
* Matching on paths, not strings, helps validate marshalling.
* @param fileOrDir file or directory
* @param dest dest path
* @param type type
*/
static void assertDirEntryMatch(
final DirEntry fileOrDir,
final Path dest,
final long type) {
String entry = fileOrDir.toString();
assertThat(fileOrDir.getDestPath())
.describedAs("Dest path of " + entry)
.isEqualTo(dest);
assertThat(fileOrDir.getType())
.describedAs("type of " + entry)
.isEqualTo(type);
}
/**
* Assert that none of the named statistics have any failure counts,
* which may be from being null or 0.
* @param iostats statistics
* @param names base name of the statistics (i.e. without ".failures" suffix)
*/
public static void assertNoFailureStatistics(IOStatistics iostats, String... names) {
final Map<String, Long> counters = iostats.counters();
for (String name : names) {
Assertions.assertThat(counters.get(name + ".failures"))
.describedAs("Failure count of %s", name)
.matches(f -> f == null || f == 0);
}
}
/**
* Save a manifest to an entry file; returning the loaded manifest data.
* Caller MUST clean up the temp file.
* @param entryFileIO IO class
* @param manifest manifest to process.
* @return info about the load
* @throws IOException write failure
*/
public static LoadedManifestData saveManifest(EntryFileIO entryFileIO, TaskManifest manifest)
throws IOException {
final File tempFile = File.createTempFile("entries", ".seq");
final SequenceFile.Writer writer = entryFileIO.createWriter(tempFile);
return new LoadedManifestData(
manifest.getDestDirectories(),
toPath(tempFile),
EntryFileIO.write(writer, manifest.getFilesToCommit(), true));
}
/**
* Closeable which can be used to safely close writers in
* a try-with-resources block..
*/
public static final class CloseWriter<K, V> implements AutoCloseable {
private final RecordWriter<K, V> writer;
private final TaskAttemptContext context;
public CloseWriter(RecordWriter<K, V> writer,
TaskAttemptContext context) {
this.writer = writer;
this.context = context;
}
@Override
public void close() {
try {
writer.close(context);
} catch (IOException | InterruptedException e) {
LOG.error("When closing {} on context {}",
writer, context, e);
}
}
}
public static final String ATTEMPT_STRING =
"attempt_%s_m_%06d_%d";
/**
* Creates a random JobID and then as many tasks
* with the specific number of task attempts.
*/
public static final class JobAndTaskIDsForTests {
/** Job ID; will be created uniquely for each instance. */
private final String jobId;
/**
* Store the details as strings; generate
* IDs on demand.
*/
private final String[][] taskAttempts;
/**
* Constructor.
* @param tasks number of tasks.
* @param attempts number of attempts.
*/
public JobAndTaskIDsForTests(int tasks, int attempts) {
this(randomJobId(), tasks, attempts);
}
public JobAndTaskIDsForTests(final String jobId,
int tasks, int attempts) {
this.jobId = jobId;
this.taskAttempts = new String[tasks][attempts];
for (int i = 0; i < tasks; i++) {
for (int j = 0; j < attempts; j++) {
String a = String.format(ATTEMPT_STRING,
jobId, i, j);
this.taskAttempts[i][j] = a;
}
}
}
/**
* Get the job ID.
* @return job ID string.
*/
public String getJobId() {
return jobId;
}
/**
* Get the job ID as the MR type.
* @return job ID type.
*/
public JobID getJobIdType() {
return getTaskIdType(0).getJobID();
}
/**
* Get a task attempt ID.
* @param task task index
* @param attempt attempt number.
* @return the task attempt.
*/
public String getTaskAttempt(int task, int attempt) {
return taskAttempts[task][attempt];
}
/**
* Get task attempt ID as the MR type.
* @param task task index
* @param attempt attempt number.
* @return the task attempt type
*/
public TaskAttemptID getTaskAttemptIdType(int task, int attempt) {
return TaskAttemptID.forName(getTaskAttempt(task, attempt));
}
/**
* Get task ID as the MR type.
* @param task task index
* @return the task ID type
*/
public TaskID getTaskIdType(int task) {
return TaskAttemptID.forName(getTaskAttempt(task, 0)).getTaskID();
}
/**
* Get task ID as a string.
* @param task task index
* @return the task ID
*/
public String getTaskId(int task) {
return getTaskIdType(task).toString();
}
}
}