ITestS3AHugeMagicCommits.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
/**
* Write a huge file via the magic commit mechanism,
* commit it and verify that it is there. This is needed to
* verify that the pending-upload mechanism works with multipart files
* of more than one part.
*
* This is a scale test.
*/
public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
private static final Logger LOG = LoggerFactory.getLogger(
ITestS3AHugeMagicCommits.class);
private static final int COMMITTER_THREADS = 64;
private static final String JOB_ID = "123";
private Path magicDir;
private Path jobDir;
/** file used as the destination for the write;
* it is never actually created. */
private Path magicOutputFile;
/** The file with the JSON data about the commit. */
private Path pendingDataFile;
private Path finalDirectory;
/**
* Use fast upload on disk.
* @return the upload buffer mechanism.
*/
protected String getBlockOutputBufferName() {
return Constants.FAST_UPLOAD_BUFFER_DISK;
}
/**
* The suite name; required to be unique.
* @return the test suite name
*/
@Override
public String getTestSuiteName() {
return "ITestS3AHugeMagicCommits";
}
@Override
protected boolean expectImmediateFileVisibility() {
return false;
}
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
CommitUtils.verifyIsMagicCommitFS(getFileSystem());
// set up the paths for the commit operation
finalDirectory = new Path(getScaleTestDir(), "commit");
magicDir = new Path(finalDirectory, MAGIC_PATH_PREFIX + JOB_ID);
jobDir = new Path(magicDir, "job_" + JOB_ID);
String filename = "commit.bin";
setHugefile(new Path(finalDirectory, filename));
magicOutputFile = new Path(jobDir, filename);
pendingDataFile = new Path(jobDir, filename + PENDING_SUFFIX);
}
/**
* Returns the path to the commit metadata file, not that of the huge file.
* @return a file in the job dir
*/
@Override
protected Path getPathOfFileToCreate() {
return magicOutputFile;
}
@Test
public void test_000_CleanupPendingUploads() throws IOException {
describe("Cleanup any existing pending uploads");
final S3AFileSystem fs = getFileSystem();
final String key = fs.pathToKey(finalDirectory);
final AuditSpan span = fs.getAuditSpanSource().createSpan(
MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED.getSymbol(),
key, null);
final int count = fs.createWriteOperationHelper(span)
.abortMultipartUploadsUnderPath(key + "/");
LOG.info("Aborted {} uploads under {}", count, key);
}
@Override
public void test_030_postCreationAssertions() throws Throwable {
describe("Committing file");
assertPathDoesNotExist("final file exists", getHugefile());
assertPathExists("No pending file", pendingDataFile);
S3AFileSystem fs = getFileSystem();
// as a 0-byte marker is created, there is a file at the end path,
// it just MUST be 0-bytes long
FileStatus status = fs.getFileStatus(magicOutputFile);
assertEquals(0, status.getLen(),
"Non empty marker file " + status);
final Map<String, byte[]> xAttr = fs.getXAttrs(magicOutputFile);
final String header = XA_MAGIC_MARKER;
Assertions.assertThat(xAttr)
.describedAs("Header %s of %s", header, magicOutputFile)
.containsKey(header);
Assertions.assertThat(extractXAttrLongValue(xAttr.get(header)))
.describedAs("Decoded header %s of %s", header, magicOutputFile)
.get()
.isEqualTo(getFilesize());
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
CommitOperations operations = new CommitOperations(fs);
Path destDir = getHugefile().getParent();
assertPathExists("Magic dir", new Path(destDir, MAGIC_PATH_PREFIX + JOB_ID));
String destDirKey = fs.pathToKey(destDir);
Assertions.assertThat(listMultipartUploads(fs, destDirKey))
.describedAs("Pending uploads")
.hasSize(1);
assertNotNull(jobDir, "jobDir");
try(CommitContext commitContext
= operations.createCommitContextForTesting(jobDir, null, COMMITTER_THREADS)) {
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
results = operations.loadSinglePendingCommits(jobDir, false, commitContext
);
for (SinglePendingCommit singlePendingCommit :
results.getKey().getCommits()) {
commitContext.commitOrFail(singlePendingCommit);
}
}
timer.end("time to commit %s", pendingDataFile);
// upload is no longer pending
Assertions.assertThat(operations.listPendingUploadsUnderPath(destDir))
.describedAs("Pending uploads undedr path")
.isEmpty();
// at this point, the huge file exists, so the normal assertions
// on that file must be valid. Verify.
super.test_030_postCreationAssertions();
}
private void skipQuietly(String text) {
describe("Skipping: %s", text);
}
@Override
public void test_040_PositionedReadHugeFile() {
skipQuietly("test_040_PositionedReadHugeFile");
}
@Override
public void test_050_readHugeFile() {
skipQuietly("readHugeFile");
}
@Override
public void test_100_renameHugeFile() {
skipQuietly("renameHugeFile");
}
@Override
public void test_800_DeleteHugeFiles() throws IOException {
if (getFileSystem() != null) {
try {
getFileSystem().abortOutstandingMultipartUploads(0);
} catch (IOException e) {
LOG.info("Exception while purging old uploads", e);
}
}
try {
super.test_800_DeleteHugeFiles();
} finally {
ContractTestUtils.rm(getFileSystem(), magicDir, true, false);
}
}
}