Committer.java
/*
* ByteDance Volcengine EMR, Copyright 2022.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.commit.ops.PendingOps;
import org.apache.hadoop.fs.tosfs.commit.ops.PendingOpsFactory;
import org.apache.hadoop.fs.tosfs.common.Tasks;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
public class Committer extends PathOutputCommitter {
private static final Logger LOG = LoggerFactory.getLogger(Committer.class);
public static final String COMMITTER_THREADS = "fs.job.committer.threads";
public static final String COMMITTER_SUMMARY_REPORT_DIR =
"fs.job.committer.summary.report.directory";
public static final int DEFAULT_COMMITTER_THREADS = Runtime.getRuntime().availableProcessors();
public static final String THREADS_PREFIX = "job-committer-thread-pool";
private final String jobId;
private final Path outputPath;
// This is the directory for all intermediate work, where the output format will write data.
// This may not be on the final file system
private Path workPath;
private final String role;
private final Configuration conf;
private final FileSystem destFs;
private final ObjectStorage storage;
private final PendingOps ops;
public Committer(Path outputPath, TaskAttemptContext context) throws IOException {
this(outputPath, context, String.format("Task committer %s", context.getTaskAttemptID()));
this.workPath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
LOG.info("Task attempt {} has work path {}", context.getTaskAttemptID(), getWorkPath());
}
public Committer(Path outputPath, JobContext context) throws IOException {
this(outputPath, context, String.format("Job committer %s", context.getJobID()));
}
private Committer(Path outputPath, JobContext context, String role) throws IOException {
super(outputPath, context);
this.jobId = CommitUtils.buildJobId(context);
this.outputPath = outputPath;
this.role = role;
this.conf = context.getConfiguration();
this.destFs = outputPath.getFileSystem(conf);
LOG.info("{} instantiated for job '{}' ID {} with destination {}",
role,
CommitUtils.jobName(context),
jobId, outputPath);
// Initialize the object storage.
this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(),
outputPath.toUri().getAuthority(), conf);
this.ops = PendingOpsFactory.create(destFs, storage);
}
@Override
public Path getOutputPath() {
return outputPath;
}
@Override
public Path getWorkPath() {
return workPath;
}
@Override
public void setupJob(JobContext context) throws IOException {
checkJobId(context);
LOG.info("Setup Job {}", jobId);
Path jobOutput = getOutputPath();
// delete the success marker if exists.
destFs.delete(CommitUtils.successMarker(jobOutput), false);
// create the destination directory.
destFs.mkdirs(jobOutput);
logUncompletedMPUIfPresent(jobOutput);
// Reset the job path, and create the job path with job attempt sub path.
Path jobPath = CommitUtils.magicJobPath(jobId, outputPath);
Path jobAttemptPath = CommitUtils.magicJobAttemptPath(context, outputPath);
destFs.delete(jobPath, true);
destFs.mkdirs(jobAttemptPath);
}
private void logUncompletedMPUIfPresent(Path jobOutput) {
// do a scan and add warn log message for active uploads.
int nums = 0;
for (MultipartUpload upload : storage.listUploads(ObjectUtils.pathToKey(jobOutput, true))) {
if (nums++ > 10) {
LOG.warn("There are more than 10 uncompleted multipart uploads under path {}.", jobOutput);
break;
}
LOG.warn("Uncompleted multipart upload {} is under path {}, either jobs are running"
+ " concurrently or failed jobs are not being cleaned up.", upload, jobOutput);
}
}
@Override
public void commitJob(JobContext context) throws IOException {
checkJobId(context);
LOG.info("{}: committing job {}", role, jobId);
String stage = null;
Exception failure = null;
SuccessData successData = null;
ExecutorService threadPool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
List<FileStatus> pendingSets = Lists.newArrayList();
try {
// Step.1 List active pending commits.
stage = "preparing";
CommitUtils.listFiles(destFs, CommitUtils.magicJobAttemptPath(context, outputPath), true,
f -> {
if (f.getPath().toString().endsWith(CommitUtils.PENDINGSET_SUFFIX)) {
pendingSets.add(f);
}
});
// Step.2 Load and commit those active pending commits.
stage = "commit";
CommitContext commitCtxt = new CommitContext(pendingSets);
loadAndCommitPendingSets(threadPool, commitCtxt);
// Step.3 Save the success marker.
stage = "marker";
successData = createSuccessData(commitCtxt.destKeys());
CommitUtils.triggerError(() -> new IOException("Mock error of success marker."), stage);
CommitUtils.save(destFs, CommitUtils.successMarker(outputPath), successData);
// Step.4 Abort those orphan multipart uploads and cleanup the staging dir.
stage = "clean";
cleanup(threadPool, true);
} catch (Exception e) {
failure = e;
LOG.warn("Commit failure for job {} stage {}", CommitUtils.buildJobId(context), stage, e);
// Revert all pending sets when marker step fails.
if (stage.equals("marker")) {
CommonUtils.runQuietly(
() -> loadAndRevertPendingSets(threadPool, new CommitContext(pendingSets)));
}
CommonUtils.runQuietly(() -> cleanup(threadPool, true));
throw e;
} finally {
saveSummaryReportQuietly(stage, context, successData, failure);
CommonUtils.runQuietly(threadPool::shutdown);
cleanupResources();
}
}
private SuccessData createSuccessData(Iterable<String> filenames) {
SuccessData data = SuccessData.builder()
.setName(SuccessData.class.getName())
.setCommitter(CommitUtils.COMMITTER_NAME)
.setTimestamp(System.currentTimeMillis())
.setHostname(NetUtils.getHostname())
.setDescription(role)
.setJobId(jobId)
.addFileNames(filenames)
.build();
data.addDiagnosticInfo(COMMITTER_THREADS, Integer.toString(commitThreads()));
return data;
}
private void saveSummaryReportQuietly(String activeStage, JobContext context, SuccessData report,
Throwable thrown) {
Configuration jobConf = context.getConfiguration();
String reportDir = jobConf.get(COMMITTER_SUMMARY_REPORT_DIR, "");
if (reportDir.isEmpty()) {
LOG.debug("Summary directory conf: {} is not set", COMMITTER_SUMMARY_REPORT_DIR);
return;
}
Path path = CommitUtils.summaryReport(new Path(reportDir), jobId);
LOG.debug("Summary report path is {}", path);
try {
if (report == null) {
report = createSuccessData(null);
}
if (thrown != null) {
report.recordJobFailure(thrown);
}
report.addDiagnosticInfo("stage", activeStage);
CommitUtils.save(path.getFileSystem(jobConf), path, report);
LOG.info("Job summary saved to {}", path);
} catch (Exception e) {
LOG.warn("Failed to save summary to {}", path, e);
}
}
private void loadAndCommitPendingSets(ExecutorService outerPool, CommitContext commitContext) {
ExecutorService innerPool =
ThreadPools.newWorkerPool("commit-pending-files-pool", commitThreads());
try {
Tasks.foreach(commitContext.pendingSets())
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(outerPool)
.abortWith(pendingSet -> loadAndAbort(innerPool, pendingSet))
.revertWith(pendingSet -> loadAndRevert(innerPool, pendingSet))
.run(pendingSet -> loadAndCommit(commitContext, innerPool, pendingSet));
} finally {
CommonUtils.runQuietly(innerPool::shutdown);
}
}
private void loadAndRevertPendingSets(ExecutorService outerPool, CommitContext commitContext) {
Tasks.foreach(commitContext.pendingSets())
.throwFailureWhenFinished()
.executeWith(outerPool)
.run(pendingSet -> loadAndRevert(outerPool, pendingSet));
}
/**
* Load {@link PendingSet} from file and abort those {@link Pending} commits.
*/
private void loadAndAbort(ExecutorService pool, FileStatus pendingSetFile) {
PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
Tasks.foreach(pendingSet.commits())
.suppressFailureWhenFinished()
.executeWith(pool)
.run(ops::abort);
}
/**
* Load {@link PendingSet} from file and revert those {@link Pending} commits.
*/
private void loadAndRevert(ExecutorService pool, FileStatus pendingSetFile) {
PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
Tasks.foreach(pendingSet.commits())
.suppressFailureWhenFinished()
.executeWith(pool)
.run(ops::revert);
}
/**
* Load {@link PendingSet} from file and commit those {@link Pending} commits.
*/
private void loadAndCommit(CommitContext commitCtxt, ExecutorService pool,
FileStatus pendingSetFile) {
PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
// Verify that whether the job id is matched.
String jobID = pendingSet.jobId();
if (!StringUtils.isNoneEmpty(jobID) && !Objects.equals(jobID, jobId())) {
throw new IllegalStateException(
String.format("Mismatch in Job ID (%s) and commit job ID (%s)", jobId(), jobID));
}
Tasks.foreach(pendingSet.commits())
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(pool)
.onFailure((pending, exception) -> ops.abort(pending))
.abortWith(ops::abort)
.revertWith(ops::revert)
.run(pending -> {
ops.commit(pending);
commitCtxt.addDestKey(pending.destKey());
});
}
@Override
public void abortJob(JobContext context, JobStatus.State state) {
checkJobId(context);
LOG.info("{}: aborting job {} in state {}", role, jobId, state);
ExecutorService service = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
try {
cleanup(service, false);
} finally {
service.shutdown();
cleanupResources();
}
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
checkJobId(context);
LOG.info("Setup Task {}", context.getTaskAttemptID());
Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
// Delete the task attempt path if somehow it was there.
destFs.delete(taskAttemptBasePath, true);
// Make an empty directory.
destFs.mkdirs(taskAttemptBasePath);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return true;
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
checkJobId(context);
LOG.info("Commit task {}", context);
ExecutorService pool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
try {
PendingSet commits = innerCommitTask(pool, context);
LOG.info("Task {} committed {} files", context.getTaskAttemptID(), commits.size());
} catch (IOException e) {
LOG.error("Failed to commit task {}", context.getTaskAttemptID(), e);
throw e;
} finally {
// Shutdown the thread pool quietly.
CommonUtils.runQuietly(pool::shutdown);
// Delete the task attempt path quietly.
Path taskAttemptPath = CommitUtils.magicTaskAttemptPath(context, outputPath);
LOG.info("Delete task attempt path {}", taskAttemptPath);
CommonUtils.runQuietly(() -> destFs.delete(taskAttemptPath, true));
}
}
private PendingSet innerCommitTask(ExecutorService pool, TaskAttemptContext context)
throws IOException {
Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
PendingSet pendingSet = new PendingSet(jobId);
try {
// Load the pending files and fill them into the pending set.
List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath);
// Use the thread-safe collection to collect the pending list.
List<Pending> pendings = Collections.synchronizedList(Lists.newArrayList());
Tasks.foreach(pendingFiles)
.throwFailureWhenFinished()
.executeWith(pool)
.run(f -> {
try {
byte[] data = CommitUtils.load(destFs, f.getPath());
pendings.add(Pending.deserialize(data));
} catch (IOException e) {
LOG.warn("Failed to load .pending file {}", f.getPath(), e);
throw new UncheckedIOException(e);
}
});
pendingSet.addAll(pendings);
// Add the extra task attempt id property.
String taskId = String.valueOf(context.getTaskAttemptID());
pendingSet.addExtraData(CommitUtils.TASK_ATTEMPT_ID, taskId);
// Save the pending set to file system.
Path taskOutput = CommitUtils.magicTaskPendingSetPath(context, outputPath);
LOG.info("Saving work of {} to {}", taskId, taskOutput);
CommitUtils.save(destFs, taskOutput, pendingSet.serialize());
} catch (Exception e) {
LOG.error("Encounter error when loading pending set from {}", taskAttemptBasePath, e);
if (!pendingSet.commits().isEmpty()) {
Tasks.foreach(pendingSet.commits())
.executeWith(pool)
.suppressFailureWhenFinished()
.run(ops::abort);
}
throw e;
}
return pendingSet;
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
checkJobId(context);
Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
try {
// Load the pending files from the underlying filesystem.
List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath);
Tasks.foreach(pendingFiles)
.throwFailureWhenFinished()
.run(f -> {
try {
byte[] serializedData = CommitUtils.load(destFs, f.getPath());
ops.abort(Pending.deserialize(serializedData));
} catch (FileNotFoundException e) {
LOG.debug("Listed file already deleted: {}", f);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
final FileStatus pendingFile = f;
CommonUtils.runQuietly(() -> destFs.delete(pendingFile.getPath(), false));
}
});
} finally {
CommonUtils.runQuietly(() -> destFs.delete(taskAttemptBasePath, true));
}
}
@Override
public void recoverTask(TaskAttemptContext context) {
checkJobId(context);
String taskId = context.getTaskAttemptID().toString();
throw new UnsupportedOperationException(
String.format("Unable to recover task %s, output: %s", taskId, outputPath));
}
private int commitThreads() {
return conf.getInt(COMMITTER_THREADS, DEFAULT_COMMITTER_THREADS);
}
private void cleanup(ExecutorService pool, boolean suppress) {
LOG.info("Cleanup the job by abort the multipart uploads and clean staging dir, suppress {}",
suppress);
try {
Path jobOutput = getOutputPath();
Iterable<MultipartUpload> pending = storage.listUploads(
ObjectUtils.pathToKey(CommitUtils.magicJobPath(jobId, jobOutput), true));
Tasks.foreach(pending)
.executeWith(pool)
.suppressFailureWhenFinished()
.run(u -> storage.abortMultipartUpload(u.key(), u.uploadId()));
} catch (Exception e) {
if (suppress) {
LOG.error("The following exception has been suppressed when cleanup job", e);
} else {
throw e;
}
} finally {
CommonUtils.runQuietly(this::cleanupStagingDir);
}
}
private void cleanupStagingDir() throws IOException {
// Note: different jobs share the same __magic folder, like,
// tos://bucket/path/to/table/__magic/job-A/..., and
// tos://bucket/path/to/table/__magic/job-B/...
// Job should only delete its own job folder to avoid the failure of other jobs,
// and, folder __magic should be deleted by the last job.
// This design does not assure the security of two jobs that one job founds there
// isn't another job be running, however, when it is deleting __magic but another
// job will visit it at the same time. We think the probability is low and we don't
// deal with it.
destFs.delete(CommitUtils.magicJobPath(jobId, outputPath), true);
Path magicPath = CommitUtils.magicPath(outputPath);
if (destFs.listStatus(magicPath).length == 0) {
destFs.delete(magicPath, true);
}
}
public String jobId() {
return jobId;
}
private void checkJobId(JobContext context) {
String jobIdInContext = CommitUtils.buildJobId(context);
Preconditions.checkArgument(Objects.equals(jobId, jobIdInContext), String.format(
"JobId set in the context: %s is not consistent with the initial jobId of the committer:"
+ " %s, please check you settings in your taskAttemptContext.",
jobIdInContext, jobId));
}
private void cleanupResources() {
CommonUtils.runQuietly(storage::close);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("role", role)
.add("jobId", jobId)
.add("outputPath", outputPath)
.toString();
}
}