ITestCommitOperations.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.Lists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitterTestHelper.assertIsMagicStream;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test the low-level binding of the S3A FS to the magic commit mechanism,
* and handling of the commit operations.
*/
public class ITestCommitOperations extends AbstractCommitITest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestCommitOperations.class);
private static final byte[] DATASET = dataset(1000, 'a', 32);
private static final String S3A_FACTORY_KEY = String.format(
COMMITTER_FACTORY_SCHEME_PATTERN, "s3a");
private static final String JOB_ID = UUID.randomUUID().toString();
private ProgressCounter progress;
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
bindCommitter(conf, CommitConstants.S3A_COMMITTER_FACTORY,
CommitConstants.COMMITTER_NAME_MAGIC);
return conf;
}
@Override
public void setup() throws Exception {
FileSystem.closeAll();
super.setup();
verifyIsMagicCommitFS(getFileSystem());
progress = new ProgressCounter();
progress.assertCount("progress", 0);
}
@Test
public void testCreateTrackerNormalPath() throws Throwable {
S3AFileSystem fs = getFileSystem();
MagicCommitIntegration integration
= new MagicCommitIntegration(fs, true);
String filename = "notdelayed.txt";
Path destFile = methodSubPath(filename);
String origKey = fs.pathToKey(destFile);
PutTracker tracker = integration.createTracker(destFile, origKey,
EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS);
Assertions.assertThat(tracker)
.describedAs("Tracker for %s", destFile)
.isNotInstanceOf(MagicCommitTracker.class);
}
/**
* On a magic path, the magic tracker is returned.
* @throws Throwable failure
*/
@Test
public void testCreateTrackerMagicPath() throws Throwable {
S3AFileSystem fs = getFileSystem();
MagicCommitIntegration integration
= new MagicCommitIntegration(fs, true);
String filename = "delayed.txt";
Path destFile = methodSubPath(filename);
String origKey = fs.pathToKey(destFile);
Path pendingPath = makeMagic(destFile);
verifyIsMagicCommitPath(fs, pendingPath);
String pendingPathKey = fs.pathToKey(pendingPath);
Assertions.assertThat(pendingPathKey)
.describedAs("pending path")
.endsWith(filename);
final List<String> elements = splitPathToElements(pendingPath);
Assertions.assertThat(lastElement(elements))
.describedAs("splitPathToElements(%s)", pendingPath)
.isEqualTo(filename);
List<String> finalDestination = finalDestination(elements);
Assertions.assertThat(lastElement(finalDestination))
.describedAs("finalDestination(%s)", pendingPath)
.isEqualTo(filename);
Assertions.assertThat(elementsToKey(finalDestination))
.describedAs("destination key")
.isEqualTo(origKey);
PutTracker tracker = integration.createTracker(pendingPath,
pendingPathKey, EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS);
Assertions.assertThat(tracker)
.describedAs("Tracker for %s", pendingPathKey)
.isInstanceOf(MagicCommitTracker.class);
Assertions.assertThat(tracker.getDestKey())
.describedAs("tracker destination key")
.isEqualTo(origKey);
assertNotDelayedWrite(new Path(pendingPath,
"part-0000" + PENDING_SUFFIX));
assertNotDelayedWrite(new Path(pendingPath,
"part-0000" + PENDINGSET_SUFFIX));
}
private void assertNotDelayedWrite(Path pendingSuffixedPath) {
Assertions.assertThat(getFileSystem().isMagicCommitPath(pendingSuffixedPath))
.describedAs("Expected %s to not be magic/delayed write", pendingSuffixedPath)
.isFalse();
}
@Test
public void testCreateAbortEmptyFile() throws Throwable {
describe("create then abort an empty file; throttled");
S3AFileSystem fs = getFileSystem();
String filename = "empty-abort.txt";
Path destFile = methodSubPath(filename);
Path pendingFilePath = makeMagic(destFile);
touch(fs, pendingFilePath);
validateIntermediateAndFinalPaths(pendingFilePath, destFile);
Path pendingDataPath = validatePendingCommitData(filename,
pendingFilePath);
CommitOperations actions = newCommitOperations();
// abort,; rethrow on failure
LOG.info("Abort call");
Path parent = pendingDataPath.getParent();
try (CommitContext commitContext =
actions.createCommitContextForTesting(parent, JOB_ID, 0)) {
actions.abortAllSinglePendingCommits(parent, commitContext, true)
.maybeRethrow();
}
assertPathDoesNotExist("pending file not deleted", pendingDataPath);
assertPathDoesNotExist("dest file was created", destFile);
}
/**
* Create a new commit operations instance for the test FS.
* @return commit operations.
* @throws IOException IO failure.
*/
private CommitOperations newCommitOperations()
throws IOException {
return new CommitOperations(getFileSystem());
}
/**
* Create a new path which has the same filename as the dest file, but
* is in a magic directory under the destination dir.
* @param destFile final destination file
* @return magic path
*/
private static Path makeMagic(Path destFile) {
return new Path(destFile.getParent(),
MAGIC_PATH_PREFIX + JOB_ID + '/' + destFile.getName());
}
@Test
public void testCommitEmptyFile() throws Throwable {
describe("create then commit an empty magic file");
createCommitAndVerify("empty-commit.txt", new byte[0]);
}
@Test
public void testCommitSmallFile() throws Throwable {
describe("create then commit a small magic file");
createCommitAndVerify("small-commit.txt", DATASET);
}
@Test
public void testAbortNonexistentDir() throws Throwable {
describe("Attempt to abort a directory that does not exist");
Path destFile = methodSubPath("testAbortNonexistentPath");
final CommitOperations operations = newCommitOperations();
try (CommitContext commitContext
= operations.createCommitContextForTesting(destFile, JOB_ID, 0)) {
final CommitOperations.MaybeIOE outcome = operations
.abortAllSinglePendingCommits(destFile, commitContext, true);
outcome.maybeRethrow();
Assertions.assertThat(outcome)
.isEqualTo(CommitOperations.MaybeIOE.NONE);
}
}
@Test
public void testCommitterFactoryDefault() throws Throwable {
Configuration conf = new Configuration();
Path dest = methodPath();
conf.set(COMMITTER_FACTORY_CLASS,
MagicS3GuardCommitterFactory.CLASSNAME);
PathOutputCommitterFactory factory
= getCommitterFactory(dest, conf);
PathOutputCommitter committer = factory.createOutputCommitter(
methodPath(),
new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID(new TaskID(), 1)));
assertEquals("Wrong committer",
MagicS3GuardCommitter.class, committer.getClass());
}
@Test
public void testCommitterFactorySchema() throws Throwable {
Configuration conf = new Configuration();
Path dest = methodPath();
conf.set(S3A_FACTORY_KEY,
MagicS3GuardCommitterFactory.CLASSNAME);
PathOutputCommitterFactory factory
= getCommitterFactory(dest, conf);
// the casting is an implicit type check
MagicS3GuardCommitter s3a = (MagicS3GuardCommitter)
factory.createOutputCommitter(
methodPath(),
new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID(new TaskID(), 1)));
// should never get here
assertNotNull(s3a);
}
@Test
public void testBaseRelativePath() throws Throwable {
describe("Test creating file with a __base marker and verify that it ends" +
" up in where expected");
S3AFileSystem fs = getFileSystem();
Path destDir = methodSubPath("testBaseRelativePath");
fs.delete(destDir, true);
Path pendingBaseDir = new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID + "/child/" + BASE);
String child = "subdir/child.txt";
Path pendingChildPath = new Path(pendingBaseDir, child);
Path expectedDestPath = new Path(destDir, child);
assertPathDoesNotExist("dest file was found before upload",
expectedDestPath);
createFile(fs, pendingChildPath, true, DATASET);
commit("child.txt", pendingChildPath, expectedDestPath);
}
/**
* Verify that that when a marker file is renamed, its
* magic marker attribute is lost.
*/
@Test
public void testMarkerFileRename()
throws Exception {
S3AFileSystem fs = getFileSystem();
Path destFile = methodPath();
Path destDir = destFile.getParent();
fs.delete(destDir, true);
Path magicDest = makeMagic(destFile);
Path magicDir = magicDest.getParent();
fs.mkdirs(magicDest);
// use the builder API to verify it works exactly the
// same.
FSDataOutputStreamBuilder builder = fs.createFile(magicDest)
.overwrite(true);
builder.recursive();
// this has a broken return type; not sure why
builder.must(FS_S3A_CREATE_PERFORMANCE, true);
try (FSDataOutputStream stream = builder.build()) {
assertIsMagicStream(stream);
stream.write(DATASET);
}
Path magic2 = new Path(magicDir, "magic2");
// rename the marker
fs.rename(magicDest, magic2);
// the renamed file has no header
getTestHelper().assertFileLacksMarkerHeader(magic2);
// abort the upload, which is driven by the .pending files
// there must be 1 deleted file; during test debugging with aborted
// runs there may be more.
Assertions.assertThat(newCommitOperations()
.abortPendingUploadsUnderPath(destDir))
.describedAs("Aborting all pending uploads under %s", destDir)
.isGreaterThanOrEqualTo(1);
}
/**
* Create a file through the magic commit mechanism.
* @param filename file to create (with "MAGIC PATH".)
* @param data data to write
* @throws Exception failure
*/
private void createCommitAndVerify(String filename, byte[] data)
throws Exception {
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath(filename);
fs.delete(destFile.getParent(), true);
Path magicDest = makeMagic(destFile);
assertPathDoesNotExist("Magic file should not exist", magicDest);
long dataSize = data != null ? data.length : 0;
try (FSDataOutputStream stream = fs.create(magicDest, true)) {
assertIsMagicStream(stream);
if (dataSize > 0) {
stream.write(data);
}
}
getTestHelper().assertIsMarkerFile(magicDest, dataSize);
commit(filename, destFile);
verifyFileContents(fs, destFile, data);
// the destination file doesn't have the attribute
getTestHelper().assertFileLacksMarkerHeader(destFile);
}
/**
* Commit the file, with before and after checks on the dest and magic
* values.
* @param filename filename of file
* @param destFile destination path of file
* @throws Exception any failure of the operation
*/
private void commit(String filename,
Path destFile) throws Exception {
commit(filename, makeMagic(destFile), destFile);
}
/**
* Commit to a write to {@code magicFile} which is expected to
* be saved to {@code destFile}.
* @param magicFile path to write to
* @param destFile destination to verify
*/
private void commit(String filename,
Path magicFile,
Path destFile)
throws IOException {
final CommitOperations actions = newCommitOperations();
validateIntermediateAndFinalPaths(magicFile, destFile);
SinglePendingCommit commit = SinglePendingCommit.load(getFileSystem(),
validatePendingCommitData(filename, magicFile),
null,
SinglePendingCommit.serializer());
commitOrFail(destFile, commit, actions);
}
private void commitOrFail(final Path destFile,
final SinglePendingCommit commit, final CommitOperations actions)
throws IOException {
try (CommitContext commitContext
= actions.createCommitContextForTesting(destFile, JOB_ID, 0)) {
commitContext.commitOrFail(commit);
}
}
/**
* Perform any validation of paths.
* @param magicFilePath path to magic file
* @param destFile ultimate destination file
* @throws IOException IO failure
*/
private void validateIntermediateAndFinalPaths(Path magicFilePath,
Path destFile)
throws IOException {
assertPathDoesNotExist("dest file was created", destFile);
}
/**
* Validate that a pending commit data file exists, load it and validate
* its contents.
* @param filename short file name
* @param magicFile path that the file thinks that it was written to
* @return the path to the pending set
* @throws IOException IO problems
*/
private Path validatePendingCommitData(String filename,
Path magicFile) throws IOException {
S3AFileSystem fs = getFileSystem();
Path pendingDataPath = new Path(magicFile.getParent(),
filename + PENDING_SUFFIX);
FileStatus fileStatus = verifyPathExists(fs, "no pending file",
pendingDataPath);
assertTrue("No data in " + fileStatus, fileStatus.getLen() > 0);
String data = read(fs, pendingDataPath);
LOG.info("Contents of {}: \n{}", pendingDataPath, data);
// really read it in and parse
SinglePendingCommit persisted = SinglePendingCommit.serializer()
.load(fs, pendingDataPath);
persisted.validate();
Assertions.assertThat(persisted.getCreated())
.describedAs("Created timestamp in %s", persisted)
.isGreaterThan(0);
Assertions.assertThat(persisted.getSaved())
.describedAs("saved timestamp in %s", persisted)
.isGreaterThan(0);
List<String> etags = persisted.getEtags();
Assertions.assertThat(etags)
.describedAs("Etag list")
.hasSize(1);
Assertions.assertThat(CommitOperations.toPartEtags(etags))
.describedAs("Etags to parts")
.hasSize(1);
return pendingDataPath;
}
/**
* Get a method-relative path.
* @param filename filename
* @return new path
* @throws IOException failure to create/parse the path.
*/
private Path methodSubPath(String filename) throws IOException {
return new Path(methodPath(), filename);
}
@Test
public void testUploadEmptyFile() throws Throwable {
describe("Upload a zero byte file to a magic path");
File tempFile = File.createTempFile("commit", ".txt");
CommitOperations actions = newCommitOperations();
Path dest = methodSubPath("testUploadEmptyFile");
S3AFileSystem fs = getFileSystem();
fs.delete(dest, false);
SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile,
dest,
null,
DEFAULT_MULTIPART_SIZE,
progress);
assertPathDoesNotExist("pending commit", dest);
commitOrFail(dest, pendingCommit, actions);
progress.assertCount("Progress counter should be 1.",
1);
FileStatus status = verifyPathExists(fs,
"uploaded file commit", dest);
Assertions.assertThat(status.getLen())
.describedAs("Committed File file %s: %s", dest, status)
.isEqualTo(0);
getTestHelper().assertFileLacksMarkerHeader(dest);
}
@Test
public void testUploadSmallFile() throws Throwable {
File tempFile = File.createTempFile("commit", ".txt");
String text = "hello, world";
FileUtils.write(tempFile, text, StandardCharsets.UTF_8);
CommitOperations actions = newCommitOperations();
Path dest = methodSubPath("testUploadSmallFile");
S3AFileSystem fs = getFileSystem();
fs.delete(dest, true);
assertPathDoesNotExist("test setup", dest);
SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile,
dest,
null,
DEFAULT_MULTIPART_SIZE,
progress);
assertPathDoesNotExist("pending commit", dest);
LOG.debug("Postcommit validation");
commitOrFail(dest, pendingCommit, actions);
String s = readUTF8(fs, dest, -1);
Assertions.assertThat(s)
.describedAs("contents of committed file %s", dest)
.isEqualTo(text);
progress.assertCount("Progress counter should be 1.",
1);
}
@Test
public void testUploadMissingFile() throws Throwable {
File tempFile = File.createTempFile("commit", ".txt");
tempFile.delete();
CommitOperations actions = newCommitOperations();
Path dest = methodSubPath("testUploadMissingFile");
intercept(FileNotFoundException.class, () ->
actions.uploadFileToPendingCommit(tempFile, dest, null,
DEFAULT_MULTIPART_SIZE, progress));
progress.assertCount("Progress counter should be 0.",
0);
}
@Test
public void testRevertCommit() throws Throwable {
describe("Revert a commit; the destination file will be deleted");
Path destFile = methodSubPath("part-0000");
S3AFileSystem fs = getFileSystem();
touch(fs, destFile);
SinglePendingCommit commit = new SinglePendingCommit();
CommitOperations actions = newCommitOperations();
commit.setDestinationKey(fs.pathToKey(destFile));
newCommitOperations().revertCommit(commit);
assertPathDoesNotExist("should have been reverted", destFile);
}
@Test
public void testRevertMissingCommit() throws Throwable {
Path destFile = methodSubPath("part-0000");
S3AFileSystem fs = getFileSystem();
fs.delete(destFile, false);
SinglePendingCommit commit = new SinglePendingCommit();
commit.setDestinationKey(fs.pathToKey(destFile));
newCommitOperations().revertCommit(commit);
assertPathDoesNotExist("should have been reverted", destFile);
}
@Test
public void testFailuresInAbortListing() throws Throwable {
Path path = path("testFailuresInAbort");
getFileSystem().mkdirs(path);
LOG.info("Aborting");
newCommitOperations().abortPendingUploadsUnderPath(path);
LOG.info("Abort completed");
}
/**
* Test a normal stream still works as expected in a magic filesystem,
* with a call of {@code hasCapability()} to check that it is normal.
* @throws Throwable failure
*/
@Test
public void testWriteNormalStream() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path destFile = path("normal");
try (FSDataOutputStream out = fs.create(destFile, true)) {
out.writeChars("data");
assertFalse("stream has magic output: " + out,
out.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
}
FileStatus status = fs.getFileStatus(destFile);
Assertions.assertThat(status.getLen())
.describedAs("Normal file %s: %s", destFile, status)
.isGreaterThan(0);
}
/**
* Creates a bulk commit and commits multiple files.
*/
@Test
public void testBulkCommitFiles() throws Throwable {
describe("verify bulk commit");
File localFile = File.createTempFile("commit", ".txt");
CommitOperations actions = newCommitOperations();
Path destDir = methodSubPath("out");
S3AFileSystem fs = getFileSystem();
fs.delete(destDir, false);
Path destFile1 = new Path(destDir, "file1");
// this subdir will only be created in the commit of file 2
Path subdir = new Path(destDir, "subdir");
// file 2
Path destFile2 = new Path(subdir, "file2");
Path destFile3 = new Path(subdir, "file3 with space");
List<Path> destinations = Lists.newArrayList(destFile1, destFile2,
destFile3);
List<SinglePendingCommit> commits = new ArrayList<>(3);
for (Path destination: destinations) {
SinglePendingCommit commit1 =
actions.uploadFileToPendingCommit(localFile,
destination, null,
DEFAULT_MULTIPART_SIZE,
progress);
commits.add(commit1);
}
if (!isS3ExpressStorage(fs)) {
assertPathDoesNotExist("destination dir", destDir);
assertPathDoesNotExist("subdirectory", subdir);
}
LOG.info("Initiating commit operations");
try (CommitContext commitContext
= actions.createCommitContextForTesting(destDir, JOB_ID, 0)) {
LOG.info("Commit #1");
commitContext.commitOrFail(commits.get(0));
final String firstCommitContextString = commitContext.toString();
LOG.info("First Commit state {}", firstCommitContextString);
assertPathExists("destFile1", destFile1);
assertPathExists("destination dir", destDir);
LOG.info("Commit #2");
commitContext.commitOrFail(commits.get(1));
assertPathExists("subdirectory", subdir);
assertPathExists("destFile2", destFile2);
final String secondCommitContextString = commitContext.toString();
LOG.info("Second Commit state {}", secondCommitContextString);
LOG.info("Commit #3");
commitContext.commitOrFail(commits.get(2));
assertPathExists("destFile3", destFile3);
}
}
}