TestOpenFileSupport.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.test.HadoopTestBase;
import static java.util.Collections.singleton;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_CSV;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_HBASE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_JSON;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Normal;
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Random;
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Sequential;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit tests for {@link OpenFileSupport} and the associated
* seek policy lookup in {@link S3AInputPolicy}.
*/
public class TestOpenFileSupport extends HadoopTestBase {
private static final ChangeDetectionPolicy CHANGE_POLICY =
ChangeDetectionPolicy.createPolicy(
ChangeDetectionPolicy.Mode.Server,
ChangeDetectionPolicy.Source.None,
false);
private static final long READ_AHEAD_RANGE = 16;
private static final String USERNAME = "hadoop";
public static final S3AInputPolicy INPUT_POLICY = Sequential;
public static final String TESTFILE = "s3a://bucket/name";
private static final Path TESTPATH = new Path(TESTFILE);
/**
* Create a OpenFileSupport instance.
*/
private static final OpenFileSupport PREPARE =
new OpenFileSupport(
CHANGE_POLICY,
READ_AHEAD_RANGE,
USERNAME,
IO_FILE_BUFFER_SIZE_DEFAULT,
DEFAULT_ASYNC_DRAIN_THRESHOLD,
INPUT_POLICY);
@Test
public void testSimpleFile() throws Throwable {
ObjectAssert<OpenFileSupport.OpenFileInformation>
asst = assertFileInfo(
PREPARE.openSimpleFile(1024));
asst.extracting(f -> f.getChangePolicy())
.isEqualTo(CHANGE_POLICY);
asst.extracting(f -> f.getInputPolicy())
.isEqualTo(INPUT_POLICY);
asst.extracting(f -> f.getReadAheadRange())
.isEqualTo(READ_AHEAD_RANGE);
}
/**
* Initiate an assert from an open file information instance.
* @param fi file info
* @return an assert stream.
*/
private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
final OpenFileSupport.OpenFileInformation fi) {
return Assertions.assertThat(fi)
.describedAs("File Information %s", fi);
}
/**
* Create an assertion about the openFile information from a configuration
* with the given key/value option.
* @param key key to set.
* @param option option value.
* @return the constructed OpenFileInformation.
*/
public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
final String key,
final String option) throws IOException {
return assertFileInfo(prepareToOpenFile(params(key, option)));
}
@Test
public void testUnknownMandatoryOption() throws Throwable {
String key = "unknown";
intercept(IllegalArgumentException.class, key, () ->
prepareToOpenFile(params(key, "undefined")));
}
@Test
public void testSeekRandomIOPolicy() throws Throwable {
// ask for random IO
String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
// is picked up
assertOpenFile(INPUT_FADVISE, option)
.extracting(f -> f.getInputPolicy())
.isEqualTo(Random);
// and as neither status nor length was set: no file status
assertOpenFile(INPUT_FADVISE, option)
.extracting(f -> f.getStatus())
.isNull();
}
/**
* There's a standard policy name. 'adaptive',
* meaning 'whatever this stream does to adapt to the client's use'.
* On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
*/
@Test
public void testSeekPolicyAdaptive() throws Throwable {
// when caller asks for adaptive, they get "normal"
assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
.extracting(f -> f.getInputPolicy())
.isEqualTo(Normal);
}
/**
* Verify that an unknown seek policy falls back to
* {@link S3AInputPolicy#Normal}.
*/
@Test
public void testUnknownSeekPolicyS3AOption() throws Throwable {
// fall back to the normal seek policy.
assertOpenFile(INPUT_FADVISE, "undefined")
.extracting(f -> f.getInputPolicy())
.isEqualTo(INPUT_POLICY);
}
/**
* The S3A option also supports a list of values.
*/
@Test
public void testSeekPolicyListS3AOption() throws Throwable {
// fall back to the second seek policy if the first is unknown
assertOpenFile(INPUT_FADVISE, "hbase, random")
.extracting(f -> f.getInputPolicy())
.isEqualTo(Random);
}
/**
* Verify that if a list of policies is supplied in a configuration,
* the first recognized policy will be adopted.
*/
@Test
public void testSeekPolicyExtractionFromList() throws Throwable {
String plist = "a, b, RandOm, other ";
Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
Collection<String> options = conf.getTrimmedStringCollection(
FS_OPTION_OPENFILE_READ_POLICY);
Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
.describedAs("Policy from " + plist)
.isEqualTo(Random);
}
@Test
public void testAdaptiveSeekPolicyRecognized() throws Throwable {
Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
.describedAs("adaptive")
.isEqualTo(Normal);
}
@Test
public void testUnknownSeekPolicyFallback() throws Throwable {
Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
.describedAs("unknown policy")
.isNull();
}
/**
* Test the mapping of the standard option names.
*/
@Test
public void testInputPolicyMapping() throws Throwable {
Object[][] policyMapping = {
{"normal", Normal},
{FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, Normal},
{FS_OPTION_OPENFILE_READ_POLICY_AVRO, Sequential},
{FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR, Random},
{FS_OPTION_OPENFILE_READ_POLICY_CSV, Sequential},
{FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Normal},
{FS_OPTION_OPENFILE_READ_POLICY_HBASE, Random},
{FS_OPTION_OPENFILE_READ_POLICY_JSON, Sequential},
{FS_OPTION_OPENFILE_READ_POLICY_ORC, Random},
{FS_OPTION_OPENFILE_READ_POLICY_PARQUET, Random},
{FS_OPTION_OPENFILE_READ_POLICY_RANDOM, Random},
{FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Sequential},
{FS_OPTION_OPENFILE_READ_POLICY_VECTOR, Random},
{FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE, Sequential},
};
for (Object[] mapping : policyMapping) {
String name = (String) mapping[0];
Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
.describedAs("Policy %s", name)
.isEqualTo(mapping[1]);
}
}
/**
* Verify readahead range is picked up.
*/
@Test
public void testReadahead() throws Throwable {
// readahead range option
assertOpenFile(READAHEAD_RANGE, "4096")
.extracting(f -> f.getReadAheadRange())
.isEqualTo(4096L);
}
/**
* Verify buffer size is picked up.
*/
@Test
public void testBufferSize() throws Throwable {
// readahead range option
assertOpenFile(FS_OPTION_OPENFILE_BUFFER_SIZE, "4096")
.extracting(f -> f.getBufferSize())
.isEqualTo(4096);
}
@Test
public void testStatusWithValidFilename() throws Throwable {
Path p = new Path("file:///tmp/" + TESTPATH.getName());
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "32")
.withStatus(status(p, 4096))));
asst.extracting(f -> f.getStatus().getVersionId())
.isEqualTo("version");
asst.extracting(f -> f.getStatus().getEtag())
.isEqualTo("etag");
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(4096L);
}
/**
* Verify S3ALocatedFileStatus is handled.
*/
@Test
public void testLocatedStatus() throws Throwable {
Path p = new Path("file:///tmp/" + TESTPATH.getName());
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(
prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "32")
.withStatus(
new S3ALocatedFileStatus(
status(p, 4096), null))));
asst.extracting(f -> f.getStatus().getVersionId())
.isEqualTo("version");
asst.extracting(f -> f.getStatus().getEtag())
.isEqualTo("etag");
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(4096L);
}
/**
* Callers cannot supply a directory status when opening a file.
*/
@Test
public void testDirectoryStatus() throws Throwable {
intercept(FileNotFoundException.class, TESTFILE, () ->
prepareToOpenFile(
params(INPUT_FADVISE, "normal")
.withStatus(new S3AFileStatus(true, TESTPATH, USERNAME))));
}
/**
* File name must match the path argument to openFile().
*/
@Test
public void testStatusWithInconsistentFilename() throws Throwable {
intercept(IllegalArgumentException.class, TESTFILE, () ->
prepareToOpenFile(params(INPUT_FADVISE, "normal")
.withStatus(new S3AFileStatus(true,
new Path(TESTFILE + "-"), USERNAME))));
}
/**
* Prepare to open a file with the set of parameters.
* @param parameters open a file
* @return
* @throws IOException
*/
public OpenFileSupport.OpenFileInformation prepareToOpenFile(
final OpenFileParameters parameters)
throws IOException {
return PREPARE.prepareToOpenFile(TESTPATH,
parameters,
IO_FILE_BUFFER_SIZE_DEFAULT
);
}
/**
* If a file length option is set, a file status
* is created.
*/
@Test
public void testFileLength() throws Throwable {
ObjectAssert<OpenFileSupport.OpenFileInformation> asst =
assertFileInfo(prepareToOpenFile(
params(FS_OPTION_OPENFILE_LENGTH, "8192")
.withStatus(null)));
asst.extracting(f -> f.getStatus())
.isNotNull();
asst.extracting(f -> f.getStatus().getPath())
.isEqualTo(TESTPATH);
asst.extracting(f -> f.getStatus().getLen())
.isEqualTo(8192L);
}
/**
* Verify that setting the split end sets the length.
* By passing in a value greater than the size of an int,
* the test verifies that the long is passed everywhere.
*/
@Test
public void testSplitEndSetsLength() throws Throwable {
long bigFile = 2L ^ 34;
assertOpenFile(FS_OPTION_OPENFILE_SPLIT_END, Long.toString(bigFile))
.matches(p -> p.getSplitEnd() == bigFile, "split end")
.matches(p -> p.getFileLength() == -1, "file length")
.matches(p -> p.getStatus() == null, "status");
}
/**
* Semantics of split and length. Split end can only be safely treated
* as a hint unless the codec is known (how?) that it will never
* read past it.
*/
@Test
public void testSplitEndAndLength() throws Throwable {
long splitEnd = 256;
long len = 8192;
Configuration conf = conf(FS_OPTION_OPENFILE_LENGTH,
Long.toString(len));
conf.setLong(FS_OPTION_OPENFILE_SPLIT_END, splitEnd);
conf.setLong(FS_OPTION_OPENFILE_SPLIT_START, 1024);
Set<String> s = new HashSet<>();
Collections.addAll(s,
FS_OPTION_OPENFILE_SPLIT_START,
FS_OPTION_OPENFILE_SPLIT_END,
FS_OPTION_OPENFILE_LENGTH);
assertFileInfo(prepareToOpenFile(
new OpenFileParameters()
.withMandatoryKeys(s)
.withOptions(conf)))
.matches(p -> p.getSplitStart() == 0, "split start")
.matches(p -> p.getSplitEnd() == splitEnd, "split end")
.matches(p -> p.getStatus().getLen() == len, "file length");
}
/**
* Create an S3A status entry with stub etag and versions, timestamp of 0.
* @param path status path
* @param length file length
* @return a status instance.
*/
private S3AFileStatus status(final Path path, final int length) {
return new S3AFileStatus(length, 0,
path, 0, "", "etag", "version");
}
/**
* Create an instance of {@link OpenFileParameters} with
* the key as a mandatory parameter.
* @param key mandatory key
* @param val value
* @return the instance.
*/
private OpenFileParameters params(final String key, final String val) {
return new OpenFileParameters()
.withMandatoryKeys(singleton(key))
.withOptions(conf(key, val));
}
/**
* Create a configuration with a single entry.
* @param key entry key
* @param val entry value
* @return a configuration
*/
private Configuration conf(String key, Object val) {
Configuration c = new Configuration(false);
c.set(key, val.toString());
return c;
}
}