CommitUtils.java
/*
* ByteDance Volcengine EMR, Copyright 2022.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.tosfs.commit.mapred.Committer;
import org.apache.hadoop.fs.tosfs.util.Serializer;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
public final class CommitUtils {
private CommitUtils() {
}
public static final String COMMITTER_NAME = Committer.class.getName();
/**
* Support scheme for tos committer.
*/
public static final String FS_STORAGE_OBJECT_SCHEME = "fs.object-storage.scheme";
public static final String DEFAULT_FS_STORAGE_OBJECT_SCHEME = "tos,oss,s3,s3a,s3n,obs,filestore";
/**
* Path for "magic" writes: path and {@link #PENDING_SUFFIX} files: {@value}.
*/
public static final String MAGIC = "__magic";
/**
* Marker of the start of a directory tree for calculating the final path names: {@value}.
*/
public static final String BASE = "__base";
/**
* Suffix applied to pending commit metadata: {@value}.
*/
public static final String PENDING_SUFFIX = ".pending";
/**
* Suffix applied to multiple pending commit metadata: {@value}.
*/
public static final String PENDINGSET_SUFFIX = ".pendingset";
/**
* Marker file to create on success: {@value}.
*/
public static final String SUCCESS = "_SUCCESS";
/**
* Format string used to build a summary file from a Job ID.
*/
public static final String SUMMARY_FILENAME_FORMAT = "summary-%s.json";
/**
* Extra Data key for task attempt in pendingset files.
*/
public static final String TASK_ATTEMPT_ID = "task.attempt.id";
/**
* The UUID for jobs: {@value}.
* This was historically created in Spark 1.x's SQL queries, see SPARK-33230.
*/
public static final String SPARK_WRITE_UUID = "spark.sql.sources.writeJobUUID";
/**
* Get the magic location for the output path.
* Format: ${out}/__magic
*
* @param out the base output directory.
* @return the location of magic job attempts.
*/
public static Path magicPath(Path out) {
return new Path(out, MAGIC);
}
/**
* Compute the "magic" path for a job. <br>
* Format: ${jobOutput}/__magic/${jobId}
*
* @param jobId unique Job ID.
* @param jobOutput the final output directory.
* @return the path to store job attempt data.
*/
public static Path magicJobPath(String jobId, Path jobOutput) {
return new Path(magicPath(jobOutput), jobId);
}
/**
* Get the Application Attempt ID for this job.
*
* @param context the context to look in
* @return the Application Attempt ID for a given job, or 0
*/
public static int appAttemptId(JobContext context) {
return context.getConfiguration().getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
}
/**
* Compute the "magic" path for a job attempt. <br>
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}
*
* @param jobId unique Job ID.
* @param appAttemptId the ID of the application attempt for this job.
* @param jobOutput the final output directory.
* @return the path to store job attempt data.
*/
public static Path magicJobAttemptPath(String jobId, int appAttemptId, Path jobOutput) {
return new Path(magicPath(jobOutput), formatAppAttemptDir(jobId, appAttemptId));
}
/**
* Compute the "magic" path for a job attempt. <br>
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}
*
* @param context the context of the job.
* @param jobOutput the final output directory.
* @return the path to store job attempt data.
*/
public static Path magicJobAttemptPath(JobContext context, Path jobOutput) {
String jobId = buildJobId(context);
return magicJobAttemptPath(jobId, appAttemptId(context), jobOutput);
}
private static String formatAppAttemptDir(String jobId, int appAttemptId) {
return String.format("%s/%02d", jobId, appAttemptId);
}
/**
* Compute the path where the output of magic task attempts are stored. <br>
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks
*
* @param jobId unique Job ID.
* @param jobOutput The output path to commit work into.
* @param appAttemptId the ID of the application attempt for this job.
* @return the path where the output of magic task attempts are stored.
*/
public static Path magicTaskAttemptsPath(String jobId, Path jobOutput, int appAttemptId) {
return new Path(magicJobAttemptPath(jobId, appAttemptId, jobOutput), "tasks");
}
/**
* Compute the path where the output of a task attempt is stored until that task is committed.
* This path is marked as a base path for relocations, so subdirectory information is preserved.
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base
*
* @param context the context of the task attempt.
* @param jobId unique Job ID.
* @param jobOutput The output path to commit work into.
* @return the path where a task attempt should be stored.
*/
public static Path magicTaskAttemptBasePath(TaskAttemptContext context, String jobId,
Path jobOutput) {
return new Path(magicTaskAttemptPath(context, jobId, jobOutput), BASE);
}
/**
* Compute the path where the output of a task attempt is stored until that task is committed.
* This path is marked as a base path for relocations, so subdirectory information is preserved.
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base
*
* @param context the context of the task attempt.
* @param jobOutput The output path to commit work into.
* @return the path where a task attempt should be stored.
*/
public static Path magicTaskAttemptBasePath(TaskAttemptContext context, Path jobOutput) {
String jobId = buildJobId(context);
return magicTaskAttemptBasePath(context, jobId, jobOutput);
}
/**
* Get the magic task attempt path, without any annotations to mark relative references.
* If there is an app attempt property in the context configuration, that is included.
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}
*
* @param context the context of the task attempt.
* @param jobId unique Job ID.
* @param jobOutput The output path to commit work into.
* @return the path under which all attempts go.
*/
public static Path magicTaskAttemptPath(TaskAttemptContext context, String jobId,
Path jobOutput) {
return new Path(magicTaskAttemptsPath(jobId, jobOutput, appAttemptId(context)),
String.valueOf(context.getTaskAttemptID()));
}
/**
* Get the magic task attempt path, without any annotations to mark relative references.
* If there is an app attempt property in the context configuration, that is included.
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}
*
* @param context the context of the task attempt.
* @param jobOutput The output path to commit work into.
* @return the path under which all attempts go.
*/
public static Path magicTaskAttemptPath(TaskAttemptContext context, Path jobOutput) {
String jobId = buildJobId(context);
return magicTaskAttemptPath(context, jobId, jobOutput);
}
/**
* Get the magic task pendingset path.
* Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/${taskId}.pendingset
*
* @param context the context of the task attempt.
* @param jobOutput The output path to commit work into.
* @return the magic pending set path.
*/
public static Path magicTaskPendingSetPath(TaskAttemptContext context, Path jobOutput) {
String taskId = String.valueOf(context.getTaskAttemptID().getTaskID());
return new Path(magicJobAttemptPath(context, jobOutput),
String.format("%s%s", taskId, PENDINGSET_SUFFIX));
}
public static String buildJobId(Configuration conf, JobID jobId) {
String jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
if (!jobUUID.isEmpty()) {
if (jobUUID.startsWith(JobID.JOB)) {
return jobUUID;
} else {
return String.format("%s_%s", JobID.JOB, jobUUID);
}
}
// if no other option was supplied, return the job ID.
// This is exactly what MR jobs expect, but is not what
// Spark jobs can do as there is a risk of jobID collision.
return jobId != null ? jobId.toString() : "NULL_JOB_ID";
}
public static String buildJobId(JobContext context) {
return buildJobId(context.getConfiguration(), context.getJobID());
}
/**
* Get a job name; returns meaningful text if there is no name.
*
* @param context job context
* @return a string for logs
*/
public static String jobName(JobContext context) {
String name = context.getJobName();
return (name != null && !name.isEmpty()) ? name : "(anonymous)";
}
/**
* Format: ${output}/_SUCCESS.
*
* @param output the output path.
* @return the success marker file path.
*/
public static Path successMarker(Path output) {
return new Path(output, SUCCESS);
}
/**
* Format: ${reportDir}/summary-xxxxx.json.
*
* @param reportDir the report directory.
* @param jobId the job id.
* @return the summary report file path.
*/
public static Path summaryReport(Path reportDir, String jobId) {
return new Path(reportDir, String.format(SUMMARY_FILENAME_FORMAT, jobId));
}
public static void save(FileSystem fs, Path path, byte[] data) throws IOException {
// By default, fs.create(path) will create parent folder recursively, and overwrite
// it if it's already exist.
try (FSDataOutputStream out = fs.create(path)) {
IOUtils.copy(new ByteArrayInputStream(data), out);
}
}
public static void save(FileSystem fs, Path path, Serializer instance) throws IOException {
save(fs, path, instance.serialize());
}
public static byte[] load(FileSystem fs, Path path) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (FSDataInputStream in = fs.open(path)) {
IOUtils.copy(in, out);
}
return out.toByteArray();
}
public static List<FileStatus> listPendingFiles(FileSystem fs, Path dir) throws IOException {
List<FileStatus> pendingFiles = Lists.newArrayList();
CommitUtils.listFiles(fs, dir, true, f -> {
if (f.getPath().toString().endsWith(CommitUtils.PENDING_SUFFIX)) {
pendingFiles.add(f);
}
});
return pendingFiles;
}
public static void listFiles(FileSystem fs, Path dir, boolean recursive, FileVisitor visitor)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(dir, recursive);
while (iter.hasNext()) {
FileStatus f = iter.next();
visitor.visit(f);
}
}
public interface FileVisitor {
void visit(FileStatus f);
}
public static boolean supportObjectStorageCommit(Configuration conf, Path outputPath) {
return supportSchemes(conf).contains(outputPath.toUri().getScheme());
}
private static List<String> supportSchemes(Configuration conf) {
String schemes = conf.get(FS_STORAGE_OBJECT_SCHEME, DEFAULT_FS_STORAGE_OBJECT_SCHEME);
Preconditions.checkNotNull(schemes, "%s cannot be null", FS_STORAGE_OBJECT_SCHEME);
return Arrays.asList(schemes.split(","));
}
private static Set<String> errorStage = new HashSet<>();
private static boolean testMode = false;
public static void injectError(String stage) {
errorStage.add(stage);
testMode = true;
}
public static void removeError(String stage) {
errorStage.remove(stage);
}
public static <T extends Exception> void triggerError(Supplier<T> error, String stage) throws T {
if (testMode && errorStage.contains(stage)) {
throw error.get();
}
}
}