S3ATestUtils.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.EtagSource;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
import org.apache.hadoop.fs.s3a.impl.S3ExpressStorage;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.FutureIO;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.Assume;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.io.ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.internal.checksums.S3ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_S3;
import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics;
import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.requireDefaultExternalDataFile;
import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Utilities for the S3A tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class S3ATestUtils {
private static final Logger LOG = LoggerFactory.getLogger(
S3ATestUtils.class);
/** Many threads for scale performance: {@value}. */
public static final int EXECUTOR_THREAD_COUNT = 64;
/**
* For submitting work.
*/
private static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations"));
/**
* Value to set a system property to (in maven) to declare that
* a property has been unset.
*/
public static final String UNSET_PROPERTY = "unset";
public static final int PURGE_DELAY_SECONDS = 60 * 60;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
// STS endpoint configuration option
Configuration.DeprecationDelta[] deltas = {
// STS endpoint configuration option
new Configuration.DeprecationDelta(
S3ATestConstants.TEST_STS_ENDPOINT,
ASSUMED_ROLE_STS_ENDPOINT)
};
if (deltas.length > 0) {
Configuration.addDeprecations(deltas);
Configuration.reloadExistingConfigurations();
}
}
static {
addDeprecatedKeys();
}
/**
* Get S3A FS name.
* @param conf configuration.
* @return S3A fs name.
*/
public static String getFsName(Configuration conf) {
return conf.getTrimmed(TEST_FS_S3A_NAME, "");
}
/**
* Create the test filesystem.
*
* If the test.fs.s3a.name property is not set, this will
* trigger a JUnit failure.
*
* Multipart purging is enabled.
* @param conf configuration
* @return the FS
* @throws IOException IO Problems
* @throws TestAbortedException if the FS is not named
*/
public static S3AFileSystem createTestFileSystem(Configuration conf)
throws IOException {
return createTestFileSystem(conf, false);
}
/**
* Create the test filesystem with or without multipart purging
*
* If the test.fs.s3a.name property is not set, this will
* trigger a JUnit failure.
* @param conf configuration
* @param purge flag to enable Multipart purging
* @return the FS
* @throws IOException IO Problems
*/
public static S3AFileSystem createTestFileSystem(Configuration conf,
boolean purge)
throws IOException {
String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
if (liveTest) {
testURI = URI.create(fsname);
liveTest = testURI.getScheme().equals(Constants.FS_S3A);
}
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
Assume.assumeTrue("No test filesystem in " + TEST_FS_S3A_NAME,
liveTest);
S3AFileSystem fs1 = new S3AFileSystem();
//enable purging in tests
if (purge) {
// purge with but a delay so that parallel multipart tests don't
// suddenly start timing out
enableMultipartPurge(conf, PURGE_DELAY_SECONDS);
}
fs1.initialize(testURI, conf);
return fs1;
}
public static void enableMultipartPurge(Configuration conf, int seconds) {
conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
conf.setInt(PURGE_EXISTING_MULTIPART_AGE, seconds);
}
/**
* Create a file context for tests.
*
* If the test.fs.s3a.name property is not set, this will
* trigger a JUnit failure.
*
* Multipart purging is enabled.
* @param conf configuration
* @return the FS
* @throws IOException IO Problems
*/
public static FileContext createTestFileContext(Configuration conf)
throws IOException {
String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
if (liveTest) {
testURI = URI.create(fsname);
liveTest = testURI.getScheme().equals(Constants.FS_S3A);
}
// This doesn't work with our JUnit 3 style test cases, so instead we'll
// make this whole class not run by default
Assume.assumeTrue("No test filesystem in " + TEST_FS_S3A_NAME,
liveTest);
FileContext fc = FileContext.getFileContext(testURI, conf);
return fc;
}
/**
* Skip if PathIOE occurred due to exception which contains a message which signals
* an incompatibility or throw the PathIOE.
*
* @param ioe PathIOE being parsed.
* @param messages messages found in the PathIOE that trigger a test to skip
* @throws PathIOException Throws PathIOE if it doesn't relate to any message in {@code messages}.
*/
public static void skipIfIOEContainsMessage(PathIOException ioe, String...messages)
throws PathIOException {
for (String message: messages) {
if (ioe.toString().contains(message)) {
skip("Skipping because: " + message);
}
}
throw ioe;
}
/**
* Get a long test property.
* <ol>
* <li>Look up configuration value (which can pick up core-default.xml),
* using {@code defVal} as the default value (if conf != null).
* </li>
* <li>Fetch the system property.</li>
* <li>If the system property is not empty or "(unset)":
* it overrides the conf value.
* </li>
* </ol>
* This puts the build properties in charge of everything. It's not a
* perfect design; having maven set properties based on a file, as ant let
* you do, is better for customization.
*
* As to why there's a special (unset) value, see
* {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven}
* @param conf config: may be null
* @param key key to look up
* @param defVal default value
* @return the evaluated test property.
*/
public static long getTestPropertyLong(Configuration conf,
String key, long defVal) {
return Long.valueOf(
getTestProperty(conf, key, Long.toString(defVal)));
}
/**
* Get a test property value in bytes, using k, m, g, t, p, e suffixes.
* {@link org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix#string2long(String)}
* <ol>
* <li>Look up configuration value (which can pick up core-default.xml),
* using {@code defVal} as the default value (if conf != null).
* </li>
* <li>Fetch the system property.</li>
* <li>If the system property is not empty or "(unset)":
* it overrides the conf value.
* </li>
* </ol>
* This puts the build properties in charge of everything. It's not a
* perfect design; having maven set properties based on a file, as ant let
* you do, is better for customization.
*
* As to why there's a special (unset) value, see
* {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven}
* @param conf config: may be null
* @param key key to look up
* @param defVal default value
* @return the evaluated test property.
*/
public static long getTestPropertyBytes(Configuration conf,
String key, String defVal) {
return org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix
.string2long(getTestProperty(conf, key, defVal));
}
/**
* Get an integer test property; algorithm described in
* {@link #getTestPropertyLong(Configuration, String, long)}.
* @param key key to look up
* @param defVal default value
* @return the evaluated test property.
*/
public static int getTestPropertyInt(Configuration conf,
String key, int defVal) {
return (int) getTestPropertyLong(conf, key, defVal);
}
/**
* Get a boolean test property; algorithm described in
* {@link #getTestPropertyLong(Configuration, String, long)}.
* @param key key to look up
* @param defVal default value
* @return the evaluated test property.
*/
public static boolean getTestPropertyBool(Configuration conf,
String key,
boolean defVal) {
return Boolean.valueOf(
getTestProperty(conf, key, Boolean.toString(defVal)));
}
/**
* Get a string test property.
* <ol>
* <li>Look up configuration value (which can pick up core-default.xml),
* using {@code defVal} as the default value (if conf != null).
* </li>
* <li>Fetch the system property.</li>
* <li>If the system property is not empty or "(unset)":
* it overrides the conf value.
* </li>
* </ol>
* This puts the build properties in charge of everything. It's not a
* perfect design; having maven set properties based on a file, as ant let
* you do, is better for customization.
*
* As to why there's a special (unset) value, see
* @see <a href="http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven">
* Stack Overflow</a>
* @param conf config: may be null
* @param key key to look up
* @param defVal default value
* @return the evaluated test property.
*/
public static String getTestProperty(Configuration conf,
String key,
String defVal) {
String confVal = conf != null ? conf.getTrimmed(key, defVal) : defVal;
String propval = System.getProperty(key);
return isNotEmpty(propval) && !UNSET_PROPERTY.equals(propval)
? propval : confVal;
}
/**
* Get the test CSV file; assume() that it is not empty.
* @param conf test configuration
* @return test file.
* @deprecated Retained only to assist cherrypicking patches
*/
@Deprecated
public static String getCSVTestFile(Configuration conf) {
return getExternalData(conf).toUri().toString();
}
/**
* Get the test CSV path; assume() that it is not empty.
* @param conf test configuration
* @return test file as a path.
* @deprecated Retained only to assist cherrypicking patches
*/
@Deprecated
public static Path getCSVTestPath(Configuration conf) {
return getExternalData(conf);
}
/**
* Get the test CSV file; assume() that it is not modified (i.e. we haven't
* switched to a new storage infrastructure where the bucket is no longer
* read only).
* @return test file.
* @param conf test configuration
* @deprecated Retained only to assist cherrypicking patches
*/
@Deprecated
public static String getLandsatCSVFile(Configuration conf) {
return requireDefaultExternalDataFile(conf);
}
/**
* Get the test CSV file; assume() that it is not modified (i.e. we haven't
* switched to a new storage infrastructure where the bucket is no longer
* read only).
* @param conf test configuration
* @return test file as a path.
* @deprecated Retained only to assist cherrypicking patches
*/
@Deprecated
public static Path getLandsatCSVPath(Configuration conf) {
return getExternalData(conf);
}
/**
* Verify the class of an exception. If it is not as expected, rethrow it.
* Comparison is on the exact class, not subclass-of inference as
* offered by {@code instanceof}.
* @param clazz the expected exception class
* @param ex the exception caught
* @return the exception, if it is of the expected class
* @throws AssertionError if the exception is {@code null}.
* @throws Exception the exception passed in if it is of a different type
*/
public static <E extends Throwable> E verifyExceptionClass(Class<E> clazz,
Exception ex)
throws Exception {
Assertions.assertThat(ex)
.describedAs("Exception expected of class %s", clazz)
.isNotNull();
if (!(ex.getClass().equals(clazz))) {
LOG.warn("Rethrowing exception: {} as it is not an instance of {}",
ex, clazz, ex);
throw ex;
}
return (E)ex;
}
/**
* Turn off FS Caching: use if a filesystem with different options from
* the default is required.
* @param conf configuration to patch
*/
public static void disableFilesystemCaching(Configuration conf) {
conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, true);
}
/**
* Disable S3Express createSession calls.
* @param conf configuration to patch
* @return the configuration.
*/
public static Configuration disableCreateSession(Configuration conf) {
conf.setBoolean(S3EXPRESS_CREATE_SESSION, false);
return conf;
}
/**
* Skip a test if encryption tests are disabled,
* or the bucket is an S3Express bucket.
* @param configuration configuration to probe
*/
public static void skipIfEncryptionTestsDisabled(
Configuration configuration) {
skipIfNotEnabled(configuration, KEY_ENCRYPTION_TESTS, "Skipping encryption tests");
skipIfS3ExpressBucket(configuration);
}
/**
* Skip a test suite/casee if a configuration has been explicitly disabled.
* @param configuration configuration to probe
* @param key key to resolve
* @param message assertion text
*/
public static void skipIfNotEnabled(final Configuration configuration,
final String key,
final String message) {
if (!configuration.getBoolean(key, true)) {
skip(message);
}
}
/**
* Skip a test if storage class tests are disabled,
* or the bucket is an S3Express bucket.
* @param configuration configuration to probe
*/
public static void skipIfStorageClassTestsDisabled(
Configuration configuration) {
skipIfNotEnabled(configuration, KEY_STORAGE_CLASS_TESTS_ENABLED,
"Skipping storage class tests");
skipIfS3ExpressBucket(configuration);
}
/**
* Skip a test if ACL class tests are disabled,
* or the bucket is an S3Express bucket.
* @param configuration configuration to probe
*/
public static void skipIfACLTestsDisabled(
Configuration configuration) {
skipIfNotEnabled(configuration, KEY_ACL_TESTS_ENABLED,
"Skipping storage class ACL tests");
skipIfS3ExpressBucket(configuration);
}
/**
* Skip a test if the test bucket is an S3Express bucket.
* @param configuration configuration to probe
*/
public static void skipIfS3ExpressBucket(
Configuration configuration) {
assume("Skipping test as bucket is an S3Express bucket",
!isS3ExpressTestBucket(configuration));
}
/**
* Skip a test if the test bucket is not an S3Express bucket.
* @param configuration configuration to probe
*/
public static void skipIfNotS3ExpressBucket(
Configuration configuration) {
assume("Skipping test as bucket is not an S3Express bucket",
isS3ExpressTestBucket(configuration));
}
/**
* Is the test bucket an S3Express bucket?
* @param conf configuration
* @return true if the bucket is an S3Express bucket.
*/
public static boolean isS3ExpressTestBucket(final Configuration conf) {
return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
}
/**
* Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
* @param configuration configuration to probe
*/
public static void skipIfAnalyticsAcceleratorEnabled(
Configuration configuration, String message) {
assume(message,
!isAnalyticsAcceleratorEnabled(configuration));
}
public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) {
return conf.get(INPUT_STREAM_TYPE,
INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS);
}
/**
* Skip a test if the filesystem lacks a required capability.
* @param fs filesystem
* @param capability capability
*/
public static void assumePathCapability(FileSystem fs, String capability) {
try {
assume("Filesystem lacks capability " + capability,
fs.hasPathCapability(new Path("/"), capability));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} /**
* Skip a test if the filesystem has a required capability.
* @param fs filesystem
* @param capability capability
*/
public static void assumePathCapabilityFalse(FileSystem fs, String capability) {
try {
assume("Filesystem has capability " + capability,
!fs.hasPathCapability(new Path("/"), capability));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* Create a test path, using the value of
* {@link S3ATestConstants#TEST_UNIQUE_FORK_ID} if it is set.
* This path is *not* qualified.
* @param defVal default value
* @return a path
*/
public static Path createTestPath(Path defVal) {
String testUniqueForkId =
System.getProperty(TEST_UNIQUE_FORK_ID);
return testUniqueForkId == null ? defVal :
new Path("/" + testUniqueForkId, "test");
}
/**
* Reset all metrics in a list.
* @param metrics metrics to reset
*/
public static void reset(S3ATestUtils.MetricDiff... metrics) {
for (S3ATestUtils.MetricDiff metric : metrics) {
metric.reset();
}
}
/**
* Print all metrics in a list.
* @param log log to print the metrics to.
* @param metrics metrics to process
*/
public static void print(Logger log, S3ATestUtils.MetricDiff... metrics) {
for (S3ATestUtils.MetricDiff metric : metrics) {
log.info(metric.toString());
}
}
/**
* Unset encryption options.
* @param conf configuration
*/
public static void unsetEncryption(Configuration conf) {
removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_ALGORITHM);
}
/**
* Removes all encryption-related properties.
*
* <p>This method unsets various encryption settings specific to the test bucket. It removes
* bucket-specific overrides for multiple encryption-related properties, including both
* client-side and server-side encryption settings.
*
* @param conf The Configuration object from which to remove the encryption properties.
* This object will be modified by this method.
*/
public static void unsetAllEncryptionPropertiesForBaseAndBucket(Configuration conf) {
removeBaseAndBucketOverrides(getTestBucketName(conf),
conf,
S3_ENCRYPTION_ALGORITHM,
S3_ENCRYPTION_KEY,
SERVER_SIDE_ENCRYPTION_ALGORITHM,
SERVER_SIDE_ENCRYPTION_KEY,
S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME,
S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED,
S3_ENCRYPTION_CSE_KMS_REGION);
}
/**
* Print all metrics in a list, then reset them.
* @param log log to print the metrics to.
* @param metrics metrics to process
*/
public static void printThenReset(Logger log,
S3ATestUtils.MetricDiff... metrics) {
print(log, metrics);
reset(metrics);
}
/**
* Variant of {@code LambdaTestUtils#intercept() which closes the Closeable
* returned by the invoked operation, and using its toString() value
* for exception messages.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type and contents
*/
public static <E extends Throwable, T extends Closeable> E interceptClosing(
Class<E> clazz,
String contained,
Callable<T> eval)
throws Exception {
return intercept(clazz, contained,
() -> {
try (Closeable c = eval.call()) {
return c.toString();
}
});
}
/**
* Patch a configuration for testing.
* This includes setting up the local
* FS temp dir and anything else needed for test runs.
* @param conf configuration to patch
* @return the now-patched configuration
*/
public static Configuration prepareTestConfiguration(final Configuration conf) {
// set hadoop temp dir to a default value
String testUniqueForkId =
System.getProperty(TEST_UNIQUE_FORK_ID);
String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test");
if (testUniqueForkId != null) {
// patch temp dir for the specific branch
tmpDir = tmpDir + File.separator + testUniqueForkId;
conf.set(HADOOP_TMP_DIR, tmpDir);
}
conf.set(BUFFER_DIR, tmpDir);
conf.set(INPUT_STREAM_TYPE,
getTestProperty(conf, INPUT_STREAM_TYPE, INPUT_STREAM_TYPE_DEFAULT));
return conf;
}
/**
* build dir.
* @return the directory for the project's build, as set by maven,
* falling back to pwd + "target" if running from an IDE;
*/
public static File getProjectBuildDir() {
String propval = System.getProperty(PROJECT_BUILD_DIRECTORY_PROPERTY);
if (StringUtils.isEmpty(propval)) {
propval = "target";
}
return new File(propval).getAbsoluteFile();
}
/**
* Clear any Hadoop credential provider path.
* This is needed if people's test setups switch to credential providers,
* and the test case is altering FS login details: changes made in the
* config will not be picked up.
* @param conf configuration to update
*/
public static void unsetHadoopCredentialProviders(final Configuration conf) {
conf.unset(HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH);
}
/**
* Build AWS credentials to talk to the STS. Also where checks for the
* session tests being disabled are implemented.
* @return a set of credentials
* @throws IOException on a failure
*/
public static AwsCredentialsProvider buildAwsCredentialsProvider(
final Configuration conf)
throws IOException {
assumeSessionTestsEnabled(conf);
S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys(
URI.create("s3a://foobar"), conf);
if (!login.hasLogin()) {
skip("testSTS disabled because AWS credentials not configured");
}
return new SimpleAWSCredentialsProvider(login);
}
/**
* Skip the current test if STS tess are not enabled.
* @param conf configuration to examine
*/
public static void assumeSessionTestsEnabled(final Configuration conf) {
skipIfNotEnabled(conf, TEST_STS_ENABLED, "STS functional tests disabled");
}
/**
* Request session credentials for the default time (900s).
* @param conf configuration to use for login
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @return the credentials
* @throws IOException on a failure
*/
public static MarshalledCredentials requestSessionCredentials(
final Configuration conf,
final String bucket)
throws IOException {
return requestSessionCredentials(conf, bucket,
TEST_SESSION_TOKEN_DURATION_SECONDS);
}
/**
* Request session credentials.
* @param conf The Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param duration duration in seconds.
* @return the credentials
* @throws IOException on a failure
*/
public static MarshalledCredentials requestSessionCredentials(
final Configuration conf,
final String bucket,
final int duration)
throws IOException {
assumeSessionTestsEnabled(conf);
MarshalledCredentials sc = MarshalledCredentialBinding
.requestSessionCredentials(
buildAwsCredentialsProvider(conf),
conf,
conf.getTrimmed(ASSUMED_ROLE_STS_ENDPOINT,
DEFAULT_ASSUMED_ROLE_STS_ENDPOINT),
conf.getTrimmed(ASSUMED_ROLE_STS_ENDPOINT_REGION,
ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT),
duration,
new Invoker(new S3ARetryPolicy(conf), Invoker.LOG_EVENT),
bucket);
sc.validate("requested session credentials: ",
MarshalledCredentials.CredentialTypeRequired.SessionOnly);
return sc;
}
/**
* Round trip a writable to a new instance.
* @param source source object
* @param conf configuration
* @param <T> type
* @return an unmarshalled instance of the type
* @throws Exception on any failure.
*/
@SuppressWarnings("unchecked")
public static <T extends Writable> T roundTrip(
final T source,
final Configuration conf)
throws Exception {
DataOutputBuffer dob = new DataOutputBuffer();
source.write(dob);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), dob.getLength());
T after = ReflectionUtils.newInstance((Class<T>) source.getClass(), conf);
after.readFields(dib);
return after;
}
/**
* Get the name of the test bucket.
* @param conf configuration to scan.
* @return the bucket name from the config.
* @throws NullPointerException: no test bucket
*/
public static String getTestBucketName(final Configuration conf) {
String bucket = checkNotNull(conf.get(TEST_FS_S3A_NAME),
"No test bucket");
return URI.create(bucket).getHost();
}
/**
* Remove any values from a bucket.
* @param bucket bucket whose overrides are to be removed. Can be null/empty
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBucketOverrides(final String bucket,
final Configuration conf,
final String... options) {
if (StringUtils.isEmpty(bucket)) {
return;
}
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket + '.';
for (String option : options) {
final String stripped = option.substring("fs.s3a.".length());
String target = bucketPrefix + stripped;
String v = conf.get(target);
if (v != null) {
LOG.debug("Removing option {}; was {}", target, v);
conf.unset(target);
}
String extended = bucketPrefix + option;
if (conf.get(extended) != null) {
LOG.debug("Removing option {}", extended);
conf.unset(extended);
}
}
}
/**
* Remove any values from a bucket and the base values too.
* @param bucket bucket whose overrides are to be removed. Can be null/empty.
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBaseAndBucketOverrides(final String bucket,
final Configuration conf,
final String... options) {
for (String option : options) {
conf.unset(option);
}
removeBucketOverrides(bucket, conf, options);
}
/**
* Remove any values from the test bucket and the base values too.
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBaseAndBucketOverrides(
final Configuration conf,
final String... options) {
for (String option : options) {
conf.unset(option);
}
removeBaseAndBucketOverrides(getTestBucketName(conf), conf, options);
}
/**
* Call a function; any exception raised is logged at info.
* This is for test teardowns.
* @param log log to use.
* @param operation operation to invoke
* @param <T> type of operation.
*/
public static <T> void callQuietly(final Logger log,
final CallableRaisingIOE<T> operation) {
try {
operation.apply();
} catch (Exception e) {
log.info(e.toString(), e);
}
}
/**
* Deploy a hadoop service: init and start it.
* @param conf configuration to use
* @param service service to configure
* @param <T> type of service
* @return the started service
*/
public static <T extends Service> T deployService(
final Configuration conf,
final T service) {
service.init(conf);
service.start();
return service;
}
/**
* Terminate a service, returning {@code null} cast at compile-time
* to the type of the service, for ease of setting fields to null.
* @param service service.
* @param <T> type of the service
* @return null, always
*/
@SuppressWarnings("ThrowableNotThrown")
public static <T extends Service> T terminateService(final T service) {
ServiceOperations.stopQuietly(LOG, service);
return null;
}
/**
* Get a file status from S3A with the {@code needEmptyDirectoryFlag}
* state probed.
* This accesses a package-private method in the
* S3A filesystem.
* @param fs filesystem
* @param dir directory
* @return a status
* @throws IOException
*/
public static S3AFileStatus getStatusWithEmptyDirFlag(
final S3AFileSystem fs,
final Path dir) throws IOException {
return fs.innerGetFileStatus(dir, true,
StatusProbeEnum.ALL);
}
/**
* Create mock implementation of store context.
* @param multiDelete
* @param accessors
* @return
* @throws URISyntaxException
* @throws IOException
*/
public static StoreContext createMockStoreContext(
boolean multiDelete,
ContextAccessors accessors)
throws URISyntaxException, IOException {
URI name = new URI("s3a://bucket");
Configuration conf = new Configuration();
return new StoreContextBuilder().setFsURI(name)
.setBucket("bucket")
.setConfiguration(conf)
.setUsername("alice")
.setOwner(UserGroupInformation.getCurrentUser())
.setExecutor(BlockingThreadPoolExecutorService.newInstance(
4,
4,
10, TimeUnit.SECONDS,
"s3a-transfer-shared"))
.setExecutorCapacity(DEFAULT_EXECUTOR_CAPACITY)
.setInvoker(
new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
.setInstrumentation(new EmptyS3AStatisticsContext())
.setStorageStatistics(new S3AStorageStatistics())
.setInputPolicy(S3AInputPolicy.Normal)
.setChangeDetectionPolicy(
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false))
.setMultiObjectDeleteEnabled(multiDelete)
.setUseListV1(false)
.setContextAccessors(accessors)
.setPerformanceFlags(createFlagSet(
PerformanceFlagEnum.class,
FS_S3A_PERFORMANCE_FLAGS))
.build();
}
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(StandardCharsets.UTF_8));
return path;
}
});
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<>(fileCount),
new ArrayList<>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories.
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}
/**
* Given a RemoteIterator to a list of file statuses, return a list of paths.
* @param i iterator
* @return list of paths
* @param <T> type of status
* @throws IOException failure retrieving values from the iterator
*/
public static <T extends FileStatus> List<Path> toPathList(RemoteIterator<T> i)
throws IOException {
return toList(mappingRemoteIterator(i, FileStatus::getPath));
}
/**
* Expect an error code from the exception.
* @param code error code
* @param error exception
*/
public static void expectErrorCode(final int code, final ExitUtil.ExitException error) {
if (error.getExitCode() != code) {
throw error;
}
}
/**
* Require a test case to be against Amazon S3 Express store.
*/
public static void assumeS3ExpressFileSystem(final FileSystem fs) throws IOException {
assumePathCapability(fs, STORE_CAPABILITY_S3_EXPRESS_STORAGE);
}
/**
* Require a test case to be against a standard S3 store.
*/
public static void assumeNotS3ExpressFileSystem(final FileSystem fs) {
assumePathCapabilityFalse(fs, STORE_CAPABILITY_S3_EXPRESS_STORAGE);
}
/**
* Require a store to be hosted by Amazon -i.e. not a third party store.
*/
public static void assumeStoreAwsHosted(final FileSystem fs) {
assume("store is not AWS S3",
NetworkBinding.isAwsEndpoint(fs.getConf()
.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
}
/**
* Skip if conditional creation is not enabled.
*/
public static void assumeConditionalCreateEnabled(Configuration conf) {
skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_CREATE_ENABLED,
"conditional create is disabled");
}
/**
* Modify the config by setting the performance flags and return the modified config.
*
* @param conf The configuration object.
* @param flagStr The performance flag string.
* @return The modified configuration object.
*/
public static Configuration setPerformanceFlags(final Configuration conf,
final String flagStr) {
removeBaseAndBucketOverrides(
conf,
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
if (flagStr != null) {
conf.set(FS_S3A_PERFORMANCE_FLAGS, flagStr);
}
return conf;
}
/**
* Helper class to do diffs of metrics.
*/
public static final class MetricDiff {
private final S3AFileSystem fs;
private final Statistic statistic;
private long startingValue;
/**
* Constructor.
* Invokes {@link #reset()} so it is immediately capable of measuring the
* difference in metric values.
*
* @param fs the filesystem to monitor
* @param statistic the statistic to monitor.
*/
public MetricDiff(S3AFileSystem fs, Statistic statistic) {
this.fs = fs;
this.statistic = statistic;
reset();
}
/**
* Reset the starting value to the current value.
* Diffs will be against this new value.
*/
public void reset() {
startingValue = currentValue();
}
/**
* Get the current value of the metric.
* @return the latest value.
*/
public long currentValue() {
return fs.getInstrumentation().getCounterValue(statistic);
}
/**
* Get the difference between the the current value and
* {@link #startingValue}.
* @return the difference.
*/
public long diff() {
return currentValue() - startingValue;
}
@Override
public String toString() {
long c = currentValue();
final StringBuilder sb = new StringBuilder(statistic.getSymbol());
sb.append(" starting=").append(startingValue);
sb.append(" current=").append(c);
sb.append(" diff=").append(c - startingValue);
return sb.toString();
}
/**
* Assert that the value of {@link #diff()} matches that expected.
* @param message message to print; metric name is appended
* @param expected expected value.
*/
public void assertDiffEquals(String message, long expected) {
String text = message + ": " + statistic.getSymbol();
long diff = diff();
if (expected != diff) {
// Log in error ensures that the details appear in the test output
LOG.error(text + " expected {}, actual {}", expected, diff);
}
assertEquals(expected, diff, text);
}
/**
* Assert that the value of {@link #diff()} matches that expected.
* @param expected expected value.
*/
public void assertDiffEquals(long expected) {
assertDiffEquals("Count of " + this, expected);
}
/**
* Assert that the value of {@link #diff()} matches that of another
* instance.
* @param that the other metric diff instance.
*/
public void assertDiffEquals(MetricDiff that) {
assertEquals(this.diff(), that.diff(),
this.toString() + " != " + that);
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code ==} the other's
*/
public boolean diffEquals(MetricDiff that) {
return this.diff() == that.diff();
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code <} the other's
*/
public boolean diffLessThan(MetricDiff that) {
return this.diff() < that.diff();
}
/**
* Comparator for assertions.
* @param that other metric diff
* @return true if the value is {@code <=} the other's
*/
public boolean diffLessThanOrEquals(MetricDiff that) {
return this.diff() <= that.diff();
}
/**
* Get the statistic.
* @return the statistic
*/
public Statistic getStatistic() {
return statistic;
}
/**
* Get the starting value; that set in the last {@link #reset()}.
* @return the starting value for diffs.
*/
public long getStartingValue() {
return startingValue;
}
}
/**
* Asserts that {@code obj} is an instance of {@code expectedClass} using a
* descriptive assertion message.
* @param expectedClass class
* @param obj object to check
*/
public static void assertInstanceOf(Class<?> expectedClass, Object obj) {
assertTrue(expectedClass.isAssignableFrom(obj.getClass()),
String.format("Expected instance of class %s, but is %s.",
expectedClass, obj.getClass()));
}
/**
* Builds a comma-separated list of class names.
* @param classes list of classes
* @return comma-separated list of class names
*/
public static <T extends Class<?>> String buildClassListString(
List<T> classes) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < classes.size(); ++i) {
if (i > 0) {
sb.append(',');
}
sb.append(classes.get(i).getName());
}
return sb.toString();
}
/**
* This class should not be instantiated.
*/
private S3ATestUtils() {
}
/**
* Verify the core size, block size and timestamp values of a file.
* @param status status entry to check
* @param size file size
* @param blockSize block size
* @param modTime modified time
*/
public static void verifyFileStatus(FileStatus status, long size,
long blockSize, long modTime) {
verifyFileStatus(status, size, 0, modTime, 0, blockSize, null, null, null);
}
/**
* Verify the status entry of a file matches that expected.
* @param status status entry to check
* @param size file size
* @param replication replication factor (may be 0)
* @param modTime modified time
* @param accessTime access time (may be 0)
* @param blockSize block size
* @param owner owner (may be null)
* @param group user group (may be null)
* @param permission permission (may be null)
*/
public static void verifyFileStatus(FileStatus status,
long size,
int replication,
long modTime,
long accessTime,
long blockSize,
String owner,
String group,
FsPermission permission) {
String details = status.toString();
assertFalse(status.isDirectory(), "Not a dir: " + details);
assertEquals(modTime, status.getModificationTime(), "Mod time: " + details);
assertEquals(size, status.getLen(), "File size: " + details);
assertEquals(blockSize, status.getBlockSize(), "Block size: " + details);
if (replication > 0) {
assertEquals(replication, status.getReplication(),
"Replication value: " + details);
}
if (accessTime != 0) {
assertEquals(accessTime, status.getAccessTime(),
"Access time: " + details);
}
if (owner != null) {
assertEquals("Owner: " + details, owner, status.getOwner());
}
if (group != null) {
assertEquals("Group: " + details, group, status.getGroup());
}
if (permission != null) {
assertEquals(permission, status.getPermission(),
"Permission: " + details);
}
}
/**
* Verify the status entry of a directory matches that expected.
* @param status status entry to check
* @param replication replication factor
* @param owner owner
*/
public static void verifyDirStatus(S3AFileStatus status,
int replication,
String owner) {
String details = status.toString();
assertTrue(status.isDirectory(), "Is a dir: " + details);
assertEquals(0, status.getLen(), "zero length: " + details);
// S3AFileStatus always assigns modTime = System.currentTimeMillis()
assertTrue(status.getModificationTime() > 0, "Mod time: " + details);
assertEquals(replication, status.getReplication(),
"Replication value: " + details);
assertEquals(0, status.getAccessTime(),
"Access time: " + details);
assertEquals("Owner: " + details, owner, status.getOwner());
// S3AFileStatus always assigns group=owner
assertEquals("Group: " + details, owner, status.getGroup());
// S3AFileStatus always assigns permission = default
assertEquals(FsPermission.getDefault(), status.getPermission(),
"Permission: " + details);
}
/**
* Assert that a configuration option matches the expected value.
* @param conf configuration
* @param key option key
* @param expected expected value
*/
public static void assertOptionEquals(Configuration conf,
String key,
String expected) {
String actual = conf.get(key);
String origin = actual == null
? "(none)"
: "[" + StringUtils.join(conf.getPropertySources(key), ", ") + "]";
Assertions.assertThat(actual)
.describedAs("Value of %s with origin %s", key, origin)
.isEqualTo(expected);
}
/**
* Assume that a condition is met. If not: log at WARN and
* then throw an {@link TestAbortedException}.
* @param message
* @param condition
*/
public static void assume(String message, boolean condition) {
if (!condition) {
LOG.warn(message);
}
Assumptions.assumeThat(condition).
describedAs(message)
.isTrue();
}
/**
* Convert a throwable to an assumption failure.
* @param t thrown exception.
*/
public static void raiseAsAssumption(Throwable t) {
throw new TestAbortedException(t.toString(), t);
}
/**
* Get the statistics from a wrapped block output stream.
* @param out output stream
* @return the (active) stats of the write
*/
public static BlockOutputStreamStatistics
getOutputStreamStatistics(FSDataOutputStream out) {
S3ABlockOutputStream blockOutputStream
= (S3ABlockOutputStream) out.getWrappedStream();
return blockOutputStream.getStatistics();
}
/**
* Read in a file and convert to an ascii string.
* @param fs filesystem
* @param path path to read
* @return the bytes read and converted to a string
* @throws IOException IO problems
*/
public static String read(FileSystem fs,
Path path) throws IOException {
FileStatus status = fs.getFileStatus(path);
try (FSDataInputStream in = fs.open(path)) {
byte[] buf = new byte[(int)status.getLen()];
in.readFully(0, buf);
return new String(buf);
}
}
/**
* Read in a file and convert to an ascii string, using the openFile
* builder API and the file status.
* If the status is an S3A FileStatus, any etag or versionId used
* will be picked up.
* @param fs filesystem
* @param status file status, including path
* @return the bytes read and converted to a string
* @throws IOException IO problems
*/
public static String readWithStatus(
final FileSystem fs,
final FileStatus status) throws IOException {
final CompletableFuture<FSDataInputStream> future =
fs.openFile(status.getPath())
.withFileStatus(status)
.build();
try (FSDataInputStream in = FutureIO.awaitFuture(future)) {
byte[] buf = new byte[(int) status.getLen()];
in.readFully(0, buf);
return new String(buf);
}
}
/**
* List a directory/directory tree.
* @param fileSystem FS
* @param path path
* @param recursive do a recursive listing?
* @return the number of files found.
* @throws IOException failure.
*/
public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
throws Exception {
if (path == null) {
// surfaces when someone calls getParent() on something at the top
// of the path
LOG.info("Empty path");
return 0;
}
return S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
(status) -> LOG.info("{}", status));
}
/**
* Date format used for mapping upload initiation time to human string.
*/
public static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
/**
* Probe for the configuration containing a specific credential provider.
* If the list is empty, there will be no match, even if the named provider
* is on the default list.
*
* @param conf configuration
* @param providerClassname provider class
* @return true if the configuration contains that classname.
*/
public static boolean authenticationContains(Configuration conf,
String providerClassname) {
return conf.getTrimmedStringCollection(AWS_CREDENTIALS_PROVIDER)
.contains(providerClassname);
}
public static void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
throws IOException {
final RemoteIterator<LocatedFileStatus> listIter =
fs.listFiles(filePath.getParent(), false);
while (listIter.hasNext()) {
final LocatedFileStatus lfs = listIter.next();
assertNotEquals(filePath, lfs.getPath(),
"Listing was not supposed to include " + filePath);
}
LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
for (FileStatus fileStatus : fileStatuses) {
assertNotEquals(filePath, fileStatus.getPath(),
"Listing was not supposed to include " + filePath);
}
LOG.info("{}; file omitted from listStatus as expected.", filePath);
}
public static void checkListingContainsPath(S3AFileSystem fs, Path filePath)
throws IOException {
boolean listFilesHasIt = false;
boolean listStatusHasIt = false;
final RemoteIterator<LocatedFileStatus> listIter =
fs.listFiles(filePath.getParent(), false);
while (listIter.hasNext()) {
final LocatedFileStatus lfs = listIter.next();
if (filePath.equals(lfs.getPath())) {
listFilesHasIt = true;
}
}
final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
for (FileStatus fileStatus : fileStatuses) {
if (filePath.equals(fileStatus.getPath())) {
listStatusHasIt = true;
}
}
assertTrue(listFilesHasIt,
"fs.listFiles didn't include " + filePath);
assertTrue(listStatusHasIt,
"fs.listStatus didn't include " + filePath);
}
/**
* This creates a set containing all current threads and some well-known
* thread names whose existence should not fail test runs.
* They are generally static cleaner threads created by various classes
* on instantiation.
* @return a set of threads to use in later assertions.
*/
public static Set<String> listInitialThreadsForLifecycleChecks() {
Set<String> threadSet = getCurrentThreadNames();
// static filesystem statistics cleaner
threadSet.add(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner");
// AWS progress callbacks
threadSet.add("java-sdk-progress-listener-callback-thread");
// another AWS thread
threadSet.add("java-sdk-http-connection-reaper");
// java.lang.UNIXProcess. maybe if chmod is called?
threadSet.add("process reaper");
// once a quantile has been scheduled, the mutable quantile thread pool
// is initialized; it has a minimum thread size of 1.
threadSet.add("MutableQuantiles-0");
// IDE?
threadSet.add("Attach Listener");
return threadSet;
}
/**
* Get a set containing the names of all active threads,
* stripping out all test runner threads.
* @return the current set of threads.
*/
public static Set<String> getCurrentThreadNames() {
TreeSet<String> threads = Thread.getAllStackTraces().keySet()
.stream()
.map(Thread::getName)
.filter(n -> n.startsWith("JUnit"))
.filter(n -> n.startsWith("surefire"))
.collect(Collectors.toCollection(TreeSet::new));
return threads;
}
/**
* Call the package-private {@code innerGetFileStatus()} method
* on the passed in FS.
* @param fs filesystem
* @param path path
* @param needEmptyDirectoryFlag look for empty directory
* @param probes file status probes to perform
* @return the status
* @throws IOException
*/
public static S3AFileStatus innerGetFileStatus(
S3AFileSystem fs,
Path path,
boolean needEmptyDirectoryFlag,
Set<StatusProbeEnum> probes) throws IOException {
return fs.innerGetFileStatus(
path,
needEmptyDirectoryFlag,
probes);
}
/**
* Skip a test if encryption algorithm or encryption key is not set.
*
* @param configuration configuration to probe.
* @param s3AEncryptionMethods list of encryption algorithms to probe.
* @throws IOException if the secret lookup fails.
*/
public static void skipIfEncryptionNotSet(Configuration configuration,
S3AEncryptionMethods... s3AEncryptionMethods) throws IOException {
if (s3AEncryptionMethods == null || s3AEncryptionMethods.length == 0) {
throw new IllegalArgumentException("Specify at least one encryption method");
}
// if S3 encryption algorithm is not set to desired method or AWS encryption
// key is not set, then skip.
String bucket = getTestBucketName(configuration);
final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
boolean encryptionMethodMatching = Arrays.stream(s3AEncryptionMethods).anyMatch(
s3AEncryptionMethod -> s3AEncryptionMethod.getMethod()
.equals(secrets.getEncryptionMethod().getMethod()));
if (!encryptionMethodMatching || StringUtils.isBlank(secrets.getEncryptionKey())) {
skip(S3_ENCRYPTION_KEY + " is not set or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
+ Arrays.stream(s3AEncryptionMethods).map(S3AEncryptionMethods::getMethod)
.collect(Collectors.toList()) + " in " + secrets);
}
}
/**
* Skip a test if encryption algorithm is not empty, or if it is set to
* anything other than AES256.
*
* @param configuration configuration
*/
public static void skipForAnyEncryptionExceptSSES3(Configuration configuration) {
String bucket = getTestBucketName(configuration);
try {
final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
S3AEncryptionMethods s3AEncryptionMethods = secrets.getEncryptionMethod();
if (s3AEncryptionMethods.getMethod().equals(SSE_S3.getMethod())
|| s3AEncryptionMethods.getMethod().isEmpty()) {
return;
}
skip("Encryption method is set to " + s3AEncryptionMethods.getMethod());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* Get the input stream statistics of an input stream.
* Raises an exception if the inner stream is not an S3A input stream
* or prefetching input stream
* @param in wrapper
* @return the statistics for the inner stream
*/
public static S3AInputStreamStatistics getInputStreamStatistics(
FSDataInputStream in) {
InputStream inner = in.getWrappedStream();
if (inner instanceof S3AInputStream) {
return ((S3AInputStream) inner).getS3AStreamStatistics();
} else if (inner instanceof S3APrefetchingInputStream) {
return ((S3APrefetchingInputStream) inner).getS3AStreamStatistics();
} else {
throw new AssertionError("Not an S3AInputStream or S3APrefetchingInputStream: " + inner);
}
}
/**
* Get the inner stream of an input stream.
* Raises an exception if the inner stream is not an S3A input stream
* @param in wrapper
* @return the inner stream
* @throws AssertionError if the inner stream is of the wrong type
*/
public static S3AInputStream getS3AInputStream(
FSDataInputStream in) {
InputStream inner = in.getWrappedStream();
if (inner instanceof S3AInputStream) {
return (S3AInputStream) inner;
} else {
throw new AssertionError("Not an S3AInputStream: " + inner);
}
}
/**
* Get the inner stream of a FilterInputStream.
* Uses reflection to access a protected field.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnerStream(FilterInputStream fis) {
try {
final Field field = FilterInputStream.class.getDeclaredField("in");
field.setAccessible(true);
return (InputStream) field.get(fis);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new AssertionError("Failed to get inner stream: " + e, e);
}
}
/**
* Get the innermost stream of a chain of FilterInputStreams.
* This allows tests into the internals of an AWS SDK stream chain.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnermostStream(FilterInputStream fis) {
InputStream inner = fis;
while (inner instanceof FilterInputStream) {
inner = getInnerStream((FilterInputStream) inner);
}
return inner;
}
/**
* Verify that an s3a stream is not checksummed.
* The inner stream must be active.
*/
public static void assertStreamIsNotChecksummed(final S3AInputStream wrappedS3A) {
final ResponseInputStream<GetObjectResponse> wrappedStream =
wrappedS3A.getWrappedStream();
Assertions.assertThat(wrappedStream)
.describedAs("wrapped stream is not open: call read() on %s", wrappedS3A)
.isNotNull();
final InputStream inner = getInnermostStream(wrappedStream);
Assertions.assertThat(inner)
.describedAs("innermost stream of %s", wrappedS3A)
.isNotInstanceOf(ChecksumValidatingInputStream.class)
.isNotInstanceOf(S3ChecksumValidatingInputStream.class);
}
/**
* Disable Prefetching streams from S3AFileSystem in tests.
* @param conf Configuration to remove the prefetch property from.
* @return patched config
*/
public static Configuration disablePrefetching(Configuration conf) {
removeBaseAndBucketOverrides(conf,
PREFETCH_ENABLED_KEY,
INPUT_STREAM_TYPE);
return conf;
}
/**
*Enable Prefetching streams from S3AFileSystem in tests.
* @param conf Configuration to update
* @return patched config
*/
public static Configuration enablePrefetching(Configuration conf) {
removeBaseAndBucketOverrides(conf,
PREFETCH_ENABLED_KEY,
INPUT_STREAM_TYPE);
conf.setEnum(INPUT_STREAM_TYPE, Prefetch);
return conf;
}
/**
* Enable analytics stream for S3A S3AFileSystem in tests.
* @param conf Configuration to update
* @return patched config
*/
public static Configuration enableAnalyticsAccelerator(Configuration conf) {
removeBaseAndBucketOverrides(conf,
INPUT_STREAM_TYPE);
conf.setEnum(INPUT_STREAM_TYPE, Analytics);
return conf;
}
/**
* Probe for a filesystem having a specific stream type;
* this is done through filesystem capabilities.
* @param fs filesystem
* @param type stream type
* @return true if the fs has the specific type.
*/
public static boolean hasInputStreamType(FileSystem fs, InputStreamType type) {
return uncheckIOExceptions(() ->
fs.hasPathCapability(new Path("/"),
type.capability()));
}
/**
* What is the stream type of this filesystem?
* @param fs filesystem to probe
* @return the stream type
*/
public static InputStreamType streamType(S3AFileSystem fs) {
return fs.getS3AInternals().getStore().streamType();
}
/**
* Skip root tests if the system properties/config says so.
* @param conf configuration to check
*/
public static void maybeSkipRootTests(Configuration conf) {
assume("Root tests disabled",
getTestPropertyBool(conf, ROOT_TESTS_ENABLED, DEFAULT_ROOT_TESTS_ENABLED));
}
/**
* Does this FS support multi object delete?
* @param fs filesystem
* @return true if multi-delete is enabled.
*/
public static boolean isBulkDeleteEnabled(FileSystem fs) {
return fs.getConf().getBoolean(Constants.ENABLE_MULTI_DELETE,
true);
}
/**
* Does this FS have create performance enabled?
* @param fs filesystem
* @return true if create performance is enabled
* @throws IOException IO problems
*/
public static boolean isCreatePerformanceEnabled(FileSystem fs)
throws IOException {
return fs.hasPathCapability(new Path("/"), FS_S3A_CREATE_PERFORMANCE_ENABLED);
}
/**
* Is the filesystem connector bonded to S3Express storage?
* @param fs filesystem.
* @return true if the store has the relevant path capability.
* @throws IOException IO failure
*/
public static boolean isS3ExpressStorage(FileSystem fs) throws IOException {
return fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE);
}
/**
* Get an etag from a FileStatus which must implement
* the {@link EtagSource} interface -which S3AFileStatus does.
*
* @param status the status.
* @return the etag
*/
public static String etag(FileStatus status) {
Preconditions.checkArgument(status instanceof EtagSource,
"Not an EtagSource: %s", status);
return ((EtagSource) status).getEtag();
}
/**
* Create an SDK client exception.
* @param message message
* @param cause nullable cause
* @return the exception
*/
public static SdkClientException sdkClientException(
String message, Throwable cause) {
return SdkClientException.builder()
.message(message)
.cause(cause)
.build();
}
/**
* Create an SDK client exception using the string value of the cause
* as the message.
* @param cause nullable cause
* @return the exception
*/
public static SdkClientException sdkClientException(
Throwable cause) {
return SdkClientException.builder()
.message(cause.toString())
.cause(cause)
.build();
}
private static final String BYTES_PREFIX = "bytes=";
/**
* Given a range header, split into start and end.
* Based on AWSRequestAnalyzer.
* @param rangeHeader header string
* @return parse range, or (-1, -1) for problems
*/
public static Pair<Long, Long> requestRange(String rangeHeader) {
if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) {
String[] values = rangeHeader
.substring(BYTES_PREFIX.length())
.split("-");
if (values.length == 2) {
try {
long start = Long.parseUnsignedLong(values[0]);
long end = Long.parseUnsignedLong(values[1]);
return Pair.of(start, end);
} catch (NumberFormatException e) {
LOG.warn("Failed to parse range header {}", rangeHeader, e);
}
}
}
// error case
return Pair.of(-1L, -1L);
}
}