AbstractContractMultipartUploaderTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Tests of multipart uploads.
* <p></p>
* <i>Note</i>: some of the tests get a random uploader between
* the two which are available. If tests fail intermittently,
* it may be because different uploaders are being selected.
*/
public abstract class AbstractContractMultipartUploaderTest extends
AbstractFSContractTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
/**
* Size of very small uploads.
* Enough to be non empty, not big enough to cause delays on uploads.
*/
protected static final int SMALL_FILE = 100;
protected static final int CONSISTENCY_INTERVAL = 1000;
private MultipartUploader uploader0;
private MultipartUploader uploader1;
private final Random random = new Random();
private UploadHandle activeUpload;
private Path activeUploadPath;
@Override
public void setup() throws Exception {
super.setup();
final FileSystem fs = getFileSystem();
Path testPath = getContract().getTestPath();
Assume.assumeTrue("Multipart uploader is not supported",
fs.hasPathCapability(testPath,
CommonPathCapabilities.FS_MULTIPART_UPLOADER));
uploader0 = fs.createMultipartUploader(testPath).build();
uploader1 = fs.createMultipartUploader(testPath).build();
}
@Override
public void teardown() throws Exception {
MultipartUploader uploader = getUploader(1);
if (uploader != null) {
if (activeUpload != null) {
abortUploadQuietly(activeUpload, activeUploadPath);
}
try {
// round off with an abort of all uploads
Path teardown = getContract().getTestPath();
LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
CompletableFuture<Integer> f
= uploader.abortUploadsUnderPath(teardown);
f.get();
LOG.info("Statistics {}",
ioStatisticsSourceToString(uploader));
} catch (Exception e) {
LOG.warn("Exeception in teardown", e);
}
}
cleanupWithLogger(LOG, uploader0, uploader1);
super.teardown();
}
/**
* Get a test path based on the method name.
* @return a path to use in the test
* @throws IOException failure to build the path name up.
*/
protected Path methodPath() throws IOException {
return path(getMethodName());
}
/**
* The payload is the part number repeated for the length of the part.
* This makes checking the correctness of the upload straightforward.
* @param partNumber part number
* @return the bytes to upload.
*/
private byte[] generatePayload(int partNumber) {
return generatePayload(partNumber, partSizeInBytes());
}
/**
* Generate a payload of a given size; part number is used
* for all the values.
* @param partNumber part number
* @param size size in bytes
* @return the bytes to upload.
*/
private byte[] generatePayload(final int partNumber, final int size) {
ByteBuffer buffer = ByteBuffer.allocate(size);
for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) {
buffer.putInt(partNumber);
}
return buffer.array();
}
/**
* Load a path, make an MD5 digest.
* @param path path to load
* @return the digest array
* @throws IOException failure to read or digest the file.
*/
protected byte[] digest(Path path) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (InputStream in = getFileSystem().open(path)) {
byte[] fdData = IOUtils.toByteArray(in);
MessageDigest newDigest = DigestUtils.getMd5Digest();
byte[] digest = newDigest.digest(fdData);
return digest;
} finally {
timer.end("Download and digest of path %s", path);
}
}
/**
* Get the partition size in bytes to use for each upload.
* @return a number > 0
*/
protected abstract int partSizeInBytes();
/**
* Get the number of test payloads to upload.
* @return a number > 1
*/
protected int getTestPayloadCount() {
return 10;
}
/**
* How long in milliseconds for propagation of
* store changes, including update/delete/list
* to be everywhere.
* If 0: the FS is consistent.
* @return a time in milliseconds.
*/
protected int timeToBecomeConsistentMillis() {
return 0;
}
/**
* Does a call to finalize an upload (either complete or abort) consume the
* uploadID immediately or is it reaped at a later point in time?
* @return true if the uploadID will be consumed immediately (and no longer
* resuable).
*/
protected abstract boolean finalizeConsumesUploadIdImmediately();
/**
* Does the store support concurrent uploads to the same destination path?
* @return true if concurrent uploads are supported.
*/
protected abstract boolean supportsConcurrentUploadsToSamePath();
/**
* Pick a multipart uploader from the index value.
* @param index index of upload
* @return an uploader
*/
protected MultipartUploader getUploader(int index) {
return (index % 2 == 0) ? uploader0 : uploader1;
}
/**
* Pick a multipart uploader at random.
* @return an uploader
*/
protected MultipartUploader getRandomUploader() {
return getUploader(random.nextInt(10));
}
/**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testSingleUpload() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
int size = SMALL_FILE;
byte[] payload = generatePayload(1, size);
origDigest.update(payload);
// use a single uploader
// note: the same is used here as it found a bug in the S3Guard
// DDB bulk operation state upload -the previous operation had
// added an entry to the ongoing state; this second call
// was interpreted as an inconsistent write.
MultipartUploader completer = uploader0;
// and upload with uploader 1 to validate cross-uploader uploads
PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload);
partHandles.put(1, partHandle);
PathHandle fd = complete(completer, uploadHandle, file,
partHandles);
validateUpload(file, origDigest, size);
// verify that if the implementation processes data immediately
// then a second attempt at the upload will fail.
if (finalizeConsumesUploadIdImmediately()) {
intercept(FileNotFoundException.class,
() -> complete(completer, uploadHandle, file, partHandles));
} else {
// otherwise, the same or other uploader can try again.
PathHandle fd2 = complete(completer, uploadHandle, file, partHandles);
assertArrayEquals("Path handles differ", fd.toByteArray(),
fd2.toByteArray());
}
}
/**
* Complete IO for a specific uploader; await the response.
* @param uploader uploader
* @param uploadHandle Identifier
* @param file Target path for upload
* @param partHandles handles map of part number to part handle
* @return unique PathHandle identifier for the uploaded file.
*/
protected PathHandle complete(
final MultipartUploader uploader,
final UploadHandle uploadHandle,
final Path file,
final Map<Integer, PartHandle> partHandles)
throws IOException {
try (DurationInfo d =
new DurationInfo(LOG, "Complete upload to %s", file)) {
return awaitFuture(
uploader.complete(uploadHandle, file, partHandles));
}
}
/**
* start an upload.
* This saves the path and upload handle as the active
* upload, for aborting in teardown
* @param dest destination
* @return the handle
* @throws IOException failure to initialize
*/
protected UploadHandle startUpload(final Path dest) throws IOException {
activeUploadPath = dest;
activeUpload = awaitFuture(getRandomUploader().startUpload(dest));
return activeUpload;
}
/**
* Generate then upload a part.
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param origDigest digest to build up. May be null
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle buildAndPutPart(
final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final MessageDigest origDigest) throws IOException {
byte[] payload = generatePayload(index);
if (origDigest != null) {
origDigest.update(payload);
}
return putPart(file, uploadHandle, index, isLastPart, payload);
}
/**
* Put a part.
* The entire byte array is uploaded.
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param isLastPart is last part of the upload ?
* @param payload byte array of payload
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle putPart(final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final byte[] payload) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PartHandle partHandle;
try (DurationInfo d =
new DurationInfo(LOG, "Put part %d (size %s) %s",
index,
payload.length,
file)) {
partHandle = awaitFuture(getUploader(index)
.putPart(uploadHandle, index, isLastPart, file,
new ByteArrayInputStream(payload),
payload.length));
}
timer.end("Uploaded part %s", index);
LOG.info("Upload bandwidth {} MB/s",
timer.bandwidthDescription(payload.length));
return partHandle;
}
/**
* Complete an upload with a random uploader.
* @param file destination
* @param uploadHandle handle
* @param partHandles map of handles
* @param origDigest digest of source data (may be null)
* @param expectedLength expected length of result.
* @return the path handle from the upload.
* @throws IOException IO failure
*/
private PathHandle completeUpload(final Path file,
final UploadHandle uploadHandle,
final Map<Integer, PartHandle> partHandles,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
PathHandle fd = complete(file, uploadHandle, partHandles);
validateUpload(file, origDigest, expectedLength);
return fd;
}
/**
* Complete an upload with a random uploader.
* @param file destination
* @param origDigest digest of source data (may be null)
* @param expectedLength expected length of result.
* @throws IOException IO failure
*/
private void validateUpload(final Path file,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
verifyPathExists(getFileSystem(),
"Completed file", file);
verifyFileLength(file, expectedLength);
if (origDigest != null) {
verifyContents(file, origDigest, expectedLength);
}
}
/**
* Verify the contents of a file.
* @param file path
* @param origDigest digest
* @param expectedLength expected length (for logging download bandwidth)
* @throws IOException IO failure
*/
protected void verifyContents(final Path file,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
Assertions.assertThat(digest(file))
.describedAs("digest of uploaded file %s", file)
.isEqualTo(origDigest.digest());
timer2.end("Completed digest", file);
LOG.info("Download bandwidth {} MB/s",
timer2.bandwidthDescription(expectedLength));
}
/**
* Verify the length of a file.
* @param file path
* @param expectedLength expected length
* @throws IOException IO failure
*/
private void verifyFileLength(final Path file, final long expectedLength)
throws IOException {
FileStatus st = getFileSystem().getFileStatus(file);
Assertions.assertThat(st)
.describedAs("Uploaded file %s", st)
.matches(FileStatus::isFile)
.extracting(FileStatus::getLen)
.isEqualTo(expectedLength);
}
/**
* Perform the inner complete without verification.
* @param file destination path
* @param uploadHandle upload handle
* @param partHandles map of parts
* @return the path handle from the upload.
* @throws IOException IO failure
*/
private PathHandle complete(final Path file,
final UploadHandle uploadHandle,
final Map<Integer, PartHandle> partHandles) throws IOException {
return complete(getRandomUploader(), uploadHandle, file,
partHandles);
}
/**
* Abort an upload.
* @param uploadHandle handle
* @param file path
* @throws IOException failure
*/
private void abortUpload(UploadHandle uploadHandle,
final Path file)
throws IOException {
try (DurationInfo d =
new DurationInfo(LOG, "Abort upload to %s", file)) {
awaitFuture(getRandomUploader().abort(uploadHandle, file));
}
}
/**
* Abort an upload; swallows exceptions.
* @param uploadHandle handle
* @param file path
*/
private void abortUploadQuietly(UploadHandle uploadHandle, Path file) {
try {
abortUpload(uploadHandle, file);
} catch (FileNotFoundException ignored) {
} catch (Exception e) {
LOG.info("aborting {}: {}", file, e.toString());
}
}
/**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testMultipartUpload() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount,
origDigest);
partHandles.put(i, partHandle);
}
completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
}
/**
* Assert that a multipart upload is successful when a single empty part is
* uploaded.
* @throws Exception failure
*/
@Test
public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload");
try (MultipartUploader uploader =
fs.createMultipartUploader(file).build()) {
UploadHandle uploadHandle = uploader.startUpload(file).get();
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
byte[] payload = new byte[0];
origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = awaitFuture(
uploader.putPart(uploadHandle, 1, true, file, is, payload.length));
partHandles.put(1, partHandle);
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
}
}
/**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testUploadEmptyBlock() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0]));
completeUpload(file, uploadHandle, partHandles, null, 0);
}
/**
* Assert that a multipart upload is successful even when the parts are
* given in the reverse order.
*/
@Test
public void testMultipartUploadReverseOrder() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
byte[] payload = generatePayload(i);
origDigest.update(payload);
}
for (int i = payloadCount; i > 0; --i) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
}
/**
* Assert that a multipart upload is successful even when the parts are
* given in reverse order and the part numbers are not contiguous.
*/
@Test
public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
throws Exception {
describe("Upload in reverse order and the part numbers are not contiguous");
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = 2 * getTestPayloadCount();
for (int i = 2; i <= payloadCount; i += 2) {
byte[] payload = generatePayload(i);
origDigest.update(payload);
}
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = payloadCount; i > 0; i -= 2) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
getTestPayloadCount() * partSizeInBytes());
}
/**
* Assert that when we abort a multipart upload, the resulting file does
* not show up.
*/
@Test
public void testMultipartUploadAbort() throws Exception {
describe("Upload and then abort it before completing");
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = 12; i > 10; i--) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null));
}
abortUpload(uploadHandle, file);
String contents = "ThisIsPart49\n";
int len = contents.getBytes(StandardCharsets.UTF_8).length;
InputStream is = IOUtils.toInputStream(contents, StandardCharsets.UTF_8);
intercept(IOException.class,
() -> awaitFuture(
uploader0.putPart(uploadHandle, 49, true, file, is, len)));
intercept(IOException.class,
() -> complete(uploader0, uploadHandle, file, partHandles));
assertPathDoesNotExist("Uploaded file should not exist", file);
// A second abort should be an FileNotFoundException if the UploadHandle is
// consumed by finalization operations (complete, abort).
if (finalizeConsumesUploadIdImmediately()) {
intercept(FileNotFoundException.class,
() -> abortUpload(uploadHandle, file));
} else {
abortUpload(uploadHandle, file);
}
}
/**
* Trying to abort from an invalid handle must fail.
*/
@Test
public void testAbortUnknownUpload() throws Exception {
Path file = methodPath();
ByteBuffer byteBuffer = ByteBuffer.wrap(
"invalid-handle".getBytes(StandardCharsets.UTF_8));
intercept(FileNotFoundException.class,
() -> abortUpload(BBUploadHandle.from(byteBuffer), file));
}
/**
* Trying to abort an upload with no data does not create a file.
*/
@Test
public void testAbortEmptyUpload() throws Exception {
describe("initialize upload and abort before uploading data");
Path file = methodPath();
abortUpload(startUpload(file), file);
assertPathDoesNotExist("Uploaded file should not exist", file);
}
/**
* Trying to abort an upload with no data does not create a file.
*/
@Test
public void testAbortAllPendingUploads() throws Exception {
describe("initialize upload and abort the pending upload");
Path path = methodPath();
Path file = new Path(path, "child");
UploadHandle upload = startUpload(file);
try {
CompletableFuture<Integer> oF
= getRandomUploader().abortUploadsUnderPath(path.getParent());
int abortedUploads = awaitFuture(oF);
if (abortedUploads >= 0) {
// uploads can be aborted
Assertions.assertThat(abortedUploads)
.describedAs("Number of uploads aborted")
.isGreaterThanOrEqualTo(1);
assertPathDoesNotExist("Uploaded file should not exist", file);
}
} finally {
abortUploadQuietly(upload, file);
}
}
/**
* Trying to abort with a handle of size 0 must fail.
*/
@Test
public void testAbortEmptyUploadHandle() throws Exception {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
intercept(IllegalArgumentException.class,
() -> abortUpload(BBUploadHandle.from(byteBuffer), methodPath()));
}
/**
* When we complete with no parts provided, it must fail.
*/
@Test
public void testCompleteEmptyUpload() throws Exception {
describe("Expect an empty MPU to fail, but still be abortable");
Path dest = methodPath();
UploadHandle handle = startUpload(dest);
intercept(IllegalArgumentException.class,
() -> complete(uploader0, handle, dest, new HashMap<>()));
abortUpload(handle, dest);
}
/**
* When we pass empty uploadID, putPart throws IllegalArgumentException.
*/
@Test
public void testPutPartEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when putPart uploadID is empty");
Path dest = methodPath();
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
byte[] payload = generatePayload(1);
InputStream is = new ByteArrayInputStream(payload);
intercept(IllegalArgumentException.class,
() -> uploader0.putPart(emptyHandle, 1, true, dest, is,
payload.length));
}
/**
* When we pass empty uploadID, complete throws IllegalArgumentException.
*/
@Test
public void testCompleteEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when complete uploadID is empty");
Path dest = methodPath();
UploadHandle realHandle = startUpload(dest);
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
Map<Integer, PartHandle> partHandles = new HashMap<>();
PartHandle partHandle = putPart(dest, realHandle, 1, true,
generatePayload(1, SMALL_FILE));
partHandles.put(1, partHandle);
intercept(IllegalArgumentException.class,
() -> complete(uploader0, emptyHandle, dest, partHandles));
// and, while things are setup, attempt to complete with
// a part index of 0
partHandles.clear();
partHandles.put(0, partHandle);
intercept(IllegalArgumentException.class,
() -> complete(uploader0, realHandle, dest, partHandles));
}
/**
* Assert that upon completion, a directory in the way of the file will
* result in a failure. This test only applies to backing stores with a
* concept of directories.
* @throws Exception failure
*/
@Test
public void testDirectoryInTheWay() throws Exception {
FileSystem fs = getFileSystem();
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
int size = SMALL_FILE;
PartHandle partHandle = putPart(file, uploadHandle, 1, true,
generatePayload(1, size));
partHandles.put(1, partHandle);
fs.mkdirs(file);
intercept(IOException.class,
() -> completeUpload(file, uploadHandle, partHandles, null,
size));
// abort should still work
abortUpload(uploadHandle, file);
}
@Test
public void testConcurrentUploads() throws Throwable {
// if the FS doesn't support concurrent uploads, this test is
// required to fail during the second initialization.
boolean concurrent = supportsConcurrentUploadsToSamePath();
describe("testing concurrent uploads, MPU support for this is "
+ concurrent);
Path file = methodPath();
int size1 = SMALL_FILE;
int partId1 = 1;
byte[] payload1 = generatePayload(partId1, size1);
MessageDigest digest1 = DigestUtils.getMd5Digest();
digest1.update(payload1);
UploadHandle upload1 = startUpload(file);
Map<Integer, PartHandle> partHandles1 = new HashMap<>();
// initiate part 2
// by using a different size, it's straightforward to see which
// version is visible, before reading/digesting the contents
int size2 = size1 * 2;
int partId2 = 2;
byte[] payload2 = generatePayload(partId1, size2);
MessageDigest digest2 = DigestUtils.getMd5Digest();
digest2.update(payload2);
UploadHandle upload2;
try {
upload2 = startUpload(file);
Assume.assumeTrue(
"The Filesystem is unexpectedly supporting concurrent uploads",
concurrent);
} catch (IOException e) {
if (!concurrent) {
// this is expected, so end the test
LOG.debug("Expected exception raised on concurrent uploads", e);
return;
} else {
throw e;
}
}
Map<Integer, PartHandle> partHandles2 = new HashMap<>();
assertNotEquals("Upload handles match", upload1, upload2);
// put part 1
partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1));
// put part2
partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2));
// complete part u1. expect its size and digest to
// be as expected.
completeUpload(file, upload1, partHandles1, digest1, size1);
// now upload part 2.
complete(file, upload2, partHandles2);
// and await the visible length to match, if this FS is not
// consistent.
final int consistencyDelay = timeToBecomeConsistentMillis();
if (consistencyDelay > 0) {
eventually(consistencyDelay,
() -> verifyFileLength(file, size2),
new LambdaTestUtils.ProportionalRetryInterval(
CONSISTENCY_INTERVAL,
consistencyDelay));
}
verifyContents(file, digest2, size2);
}
@Test
public void testPathCapabilities() throws Throwable {
FileSystem fs = getFileSystem();
Assertions.assertThat(fs.hasPathCapability(getContract().getTestPath(),
CommonPathCapabilities.FS_MULTIPART_UPLOADER))
.describedAs("fs %s, lacks multipart upload capability", fs)
.isTrue();
}
}