Committer.java
/*
* ByteDance Volcengine EMR, Copyright 2022.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit.mapred;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Committer extends FileOutputCommitter {
private static final Logger LOG = LoggerFactory.getLogger(Committer.class);
private OutputCommitter wrapped = null;
private static Path getOutputPath(JobContext context) {
JobConf conf = context.getJobConf();
return FileOutputFormat.getOutputPath(conf);
}
private static Path getOutputPath(TaskAttemptContext context) {
JobConf conf = context.getJobConf();
return FileOutputFormat.getOutputPath(conf);
}
private OutputCommitter getWrapped(JobContext context) throws IOException {
if (wrapped == null) {
wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(),
getOutputPath(context)) ?
new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) :
new FileOutputCommitter();
LOG.debug("Using OutputCommitter implementation {}", wrapped.getClass().getName());
}
return wrapped;
}
@InterfaceAudience.Private
@Override
public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
Path out = getOutputPath(context);
return out == null ? null : getTaskAttemptPath(context, out);
}
private OutputCommitter getWrapped(TaskAttemptContext context) throws IOException {
if (wrapped == null) {
wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(),
getOutputPath(context)) ?
new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context) :
new FileOutputCommitter();
}
return wrapped;
}
@Override
public Path getWorkPath(TaskAttemptContext context, Path outputPath)
throws IOException {
if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) {
return ((org.apache.hadoop.fs.tosfs.commit.Committer) getWrapped(context)).getWorkPath();
}
return super.getWorkPath(context, outputPath);
}
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
if(workPath == null && out != null) {
if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) {
return CommitUtils.magicTaskAttemptPath(context, getOutputPath(context));
} else {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getTaskAttemptPath(context, out);
}
}
return workPath;
}
@Override
public void setupJob(JobContext context) throws IOException {
getWrapped(context).setupJob(context);
}
@Override
public void commitJob(JobContext context) throws IOException {
getWrapped(context).commitJob(context);
}
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
getWrapped(context).cleanupJob(context);
}
@Override
public void abortJob(JobContext context, int runState)
throws IOException {
JobStatus.State state;
if(runState == JobStatus.State.RUNNING.getValue()) {
state = JobStatus.State.RUNNING;
} else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
state = JobStatus.State.SUCCEEDED;
} else if(runState == JobStatus.State.FAILED.getValue()) {
state = JobStatus.State.FAILED;
} else if(runState == JobStatus.State.PREP.getValue()) {
state = JobStatus.State.PREP;
} else if(runState == JobStatus.State.KILLED.getValue()) {
state = JobStatus.State.KILLED;
} else {
throw new IllegalArgumentException(runState+" is not a valid runState.");
}
getWrapped(context).abortJob(context, state);
}
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
getWrapped(context).setupTask(context);
}
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
getWrapped(context).commitTask(context);
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
getWrapped(context).abortTask(context);
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
return getWrapped(context).needsTaskCommit(context);
}
@Override
@Deprecated
public boolean isRecoverySupported() {
return false;
}
@Override
public boolean isCommitJobRepeatable(JobContext context) throws IOException {
return getWrapped(context).isCommitJobRepeatable(context);
}
@Override
public boolean isRecoverySupported(JobContext context) throws IOException {
return getWrapped(context).isRecoverySupported(context);
}
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
getWrapped(context).recoverTask(context);
}
public String jobId() {
Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer.");
return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ?
((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).jobId() : null;
}
public Path getWorkPath() {
Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer.");
return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ?
((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).getWorkPath() : null;
}
}