TestWrappedIO.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.wrappedio.impl;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import org.apache.hadoop.io.wrappedio.WrappedIO;
import org.apache.hadoop.util.Lists;
import static java.nio.ByteBuffer.allocate;
import static org.apache.hadoop.fs.CommonPathCapabilities.BULK_DELETE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.StreamCapabilities.IOSTATISTICS_CONTEXT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.file;
import static org.apache.hadoop.util.dynamic.BindingUtils.loadClass;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.Tuples.pair;
/**
* Test WrappedIO operations.
* <p>
* This is a contract test; the base class is bonded to the local fs;
* it is possible for other stores to implement themselves.
* All classes/constants are referenced here because they are part of the reflected
* API. If anything changes, application code breaks.
*/
public class TestWrappedIO extends AbstractFSContractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestWrappedIO.class);
/**
* Dynamic wrapped IO.
*/
private DynamicWrappedIO io;
/**
* Dynamically Wrapped IO statistics.
*/
private DynamicWrappedStatistics statistics;
@BeforeEach
public void setup() throws Exception {
super.setup();
io = new DynamicWrappedIO();
statistics = new DynamicWrappedStatistics();
statistics.iostatisticsContext_reset();
}
@AfterEach
@Override
public void teardown() throws Exception {
super.teardown();
logIOStatisticsContext();
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new LocalFSContract(conf);
}
/**
* Verify the {@link #clazz(String)} method raises an assertion
* if the class isn't found.
*/
@Test
public void testClassResolution() throws Throwable {
intercept(AssertionError.class, () -> clazz("no.such.class"));
}
@Test
public void testAllMethodsFound() throws Throwable {
io.requireAllMethodsAvailable();
}
/**
* Test the openFile operation.
* Lots of calls are made to read the same file to save on setup/teardown
* overhead and to allow for some statistics collection.
*/
@Test
public void testOpenFileOperations() throws Throwable {
Path path = path("testOpenFileOperations");
final int len = 100;
final byte[] data = dataset(len, 'a', 26);
final FileSystem fs = getFileSystem();
// create the file and any statistics from it.
final Serializable iostats = statistics.iostatisticsSnapshot_create(
file(fs, path, true, data));
final FileStatus st = fs.getFileStatus(path);
final boolean ioStatisticsContextCapability;
describe("reading file " + path);
try (FSDataInputStream in = DynamicWrappedIO.openFile(fs,
fs.getFileStatus(path),
DynamicWrappedIO.PARQUET_READ_POLICIES)) {
Assertions.assertThat(in.read())
.describedAs("first byte")
.isEqualTo('a');
ioStatisticsContextCapability = supportsIOStatisticsContext(in);
if (ioStatisticsContextCapability) {
LOG.info("Stream has IOStatisticsContext support: {}", in);
} else {
LOG.info("Stream has no IOStatisticsContext support: {}", in);
}
Assertions.assertThat(ioStatisticsContextCapability)
.describedAs("Retrieved stream capability %s from %s",
IOSTATISTICS_CONTEXT, in)
.isEqualTo(WrappedIO.streamCapabilities_hasCapability(in, IOSTATISTICS_CONTEXT));
Assertions.assertThat(ioStatisticsContextCapability)
.describedAs("Actual stream capability %s from %s",
IOSTATISTICS_CONTEXT, in)
.isEqualTo(in.hasCapability(IOSTATISTICS_CONTEXT));
retrieveAndAggregate(iostats, in);
}
// open with a status
try (FSDataInputStream s = openFile(path, null, st, null, null)) {
s.seek(1);
s.read();
// and do a small amount of statistics collection
retrieveAndAggregate(iostats, s);
}
// open with a length and random IO passed in the map
try (FSDataInputStream s = openFile(path, null, null,
(long) len,
map(pair(FS_OPTION_OPENFILE_READ_POLICY, "random")))) {
s.seek(len - 10);
s.read();
retrieveAndAggregate(iostats, s);
}
// now open a file with a length option greater than the file length
// this string is used in exception logging to report where in the
// sequence an IOE was raised.
String validationPoint = "openfile call";
// open with a length and random IO passed in via the map
try (FSDataInputStream s = openFile(path, null, null,
null,
map(pair(FS_OPTION_OPENFILE_LENGTH, len * 2),
pair(FS_OPTION_OPENFILE_READ_POLICY, "random")))) {
// fails if the file length was determined and fixed in open,
// and the stream doesn't permit seek() beyond the file length.
validationPoint = "seek()";
s.seek(len + 10);
validationPoint = "readFully()";
// readFully must fail.
s.readFully(len + 10, new byte[10], 0, 10);
Assertions.fail("Expected an EOFException but readFully from %s", s);
} catch (EOFException expected) {
// expected
LOG.info("EOF successfully raised, validation point: {}", validationPoint);
LOG.debug("stack", expected);
}
// if we get this far, do a bulk delete
Assertions.assertThat(io.pathCapabilities_hasPathCapability(fs, path, BULK_DELETE))
.describedAs("Path capability %s", BULK_DELETE)
.isTrue();
// first assert page size was picked up
Assertions.assertThat(io.bulkDelete_pageSize(fs, path))
.describedAs("bulkDelete_pageSize for %s", path)
.isGreaterThanOrEqualTo(1);
// then do the delete.
// pass in the parent path for the bulk delete to avoid HADOOP-19196
Assertions
.assertThat(io.bulkDelete_delete(fs, path.getParent(), Lists.newArrayList(path)))
.describedAs("outcome of bulk delete")
.isEmpty();
}
@Test
public void testOpenFileNotFound() throws Throwable {
Path path = path("testOpenFileNotFound");
intercept(FileNotFoundException.class, () ->
io.fileSystem_openFile(getFileSystem(), path, null, null, null, null));
}
/**
* Test ByteBufferPositionedReadable.
* This is implemented by HDFS but not much else; this test skips if the stream
* doesn't support it.
*/
@Test
public void testByteBufferPositionedReadable() throws Throwable {
Path path = path("testByteBufferPositionedReadable");
final int len = 100;
final byte[] data = dataset(len, 'a', 26);
final FileSystem fs = getFileSystem();
file(fs, path, true, data);
describe("reading file " + path);
try (FSDataInputStream in = openFile(path, "random", null, (long) len, null)) {
// skip rest of test if API is not found.
if (io.byteBufferPositionedReadable_readFullyAvailable(in)) {
LOG.info("ByteBufferPositionedReadable is available in {}", in);
ByteBuffer buffer = allocate(len);
io.byteBufferPositionedReadable_readFully(in, 0, buffer);
Assertions.assertThat(buffer.array())
.describedAs("Full buffer read of %s", in)
.isEqualTo(data);
// read from offset (verifies the offset is passed in)
final int offset = 10;
final int range = len - offset;
buffer = allocate(range);
io.byteBufferPositionedReadable_readFully(in, offset, buffer);
byte[] byteArray = new byte[range];
in.readFully(offset, byteArray);
Assertions.assertThat(buffer.array())
.describedAs("Offset buffer read of %s", in)
.isEqualTo(byteArray);
// now try to read past the EOF
// first verify the stream rejects this call directly
intercept(EOFException.class, () ->
in.readFully(len + 1, allocate(len)));
// then do the same through the wrapped API
intercept(EOFException.class, () ->
io.byteBufferPositionedReadable_readFully(in, len + 1, allocate(len)));
} else {
LOG.info("ByteBufferPositionedReadable is not available in {}", in);
// expect failures here
intercept(UnsupportedOperationException.class, () ->
io.byteBufferPositionedReadable_readFully(in, 0, allocate(len)));
}
}
}
@Test
public void testFilesystemIOStatistics() throws Throwable {
final FileSystem fs = getFileSystem();
final Serializable iostats = statistics.iostatisticsSnapshot_retrieve(fs);
if (iostats != null) {
final String status = statistics.iostatisticsSnapshot_toJsonString(iostats);
final Serializable roundTripped = statistics.iostatisticsSnapshot_fromJsonString(
status);
final Path path = methodPath();
statistics.iostatisticsSnapshot_save(roundTripped, fs, path, true);
final Serializable loaded = statistics.iostatisticsSnapshot_load(fs, path);
Assertions.assertThat(loaded)
.describedAs("loaded statistics from %s", path)
.isNotNull()
.satisfies(statistics::isIOStatisticsSnapshot);
LOG.info("loaded statistics {}",
statistics.iostatistics_toPrettyString(loaded));
}
}
/**
* Retrieve any IOStatistics from a class, and aggregate it to the
* existing IOStatistics.
* @param iostats statistics to update
* @param object statistics source
*/
private void retrieveAndAggregate(final Serializable iostats, final Object object) {
statistics.iostatisticsSnapshot_aggregate(iostats,
statistics.iostatisticsSnapshot_retrieve(object));
}
/**
* Log IOStatisticsContext if enabled.
*/
private void logIOStatisticsContext() {
// context IOStats
if (statistics.iostatisticsContext_enabled()) {
final Serializable iostats = statistics.iostatisticsContext_snapshot();
LOG.info("Context: {}",
toPrettyString(iostats));
} else {
LOG.info("IOStatisticsContext disabled");
}
}
private String toPrettyString(final Object iostats) {
return statistics.iostatistics_toPrettyString(iostats);
}
/**
* Does the object update the thread-local IOStatisticsContext?
* @param o object to cast to StreamCapabilities and probe for the capability.
* @return true if the methods were found, the interface implemented and the probe successful.
*/
private boolean supportsIOStatisticsContext(final Object o) {
return io.streamCapabilities_hasCapability(o, IOSTATISTICS_CONTEXT);
}
/**
* Open a file through dynamic invocation of {@link FileSystem#openFile(Path)}.
* @param path path
* @param policy read policy
* @param status optional file status
* @param length file length or null
* @param options nullable map of other options
* @return stream of the opened file
*/
private FSDataInputStream openFile(
final Path path,
final String policy,
final FileStatus status,
final Long length,
final Map<String, String> options) throws Throwable {
final FSDataInputStream stream = io.fileSystem_openFile(
getFileSystem(), path, policy, status, length, options);
Assertions.assertThat(stream)
.describedAs("null stream from openFile(%s)", path)
.isNotNull();
return stream;
}
/**
* Build a map from the tuples, which all have the value of
* their toString() method used.
* @param tuples object list (must be even)
* @return a map.
*/
private Map<String, String> map(Map.Entry<String, Object>... tuples) {
Map<String, String> map = new HashMap<>();
for (Map.Entry<String, Object> tuple : tuples) {
map.put(tuple.getKey(), tuple.getValue().toString());
}
return map;
}
/**
* Load a class by name; includes an assertion that the class was loaded.
* @param className classname
* @return the class.
*/
private static Class<?> clazz(final String className) {
final Class<?> clazz = loadClass(className);
Assertions.assertThat(clazz)
.describedAs("Class %s not found", className)
.isNotNull();
return clazz;
}
/**
* Simulate a no binding and verify that everything downgrades as expected.
*/
@Test
public void testNoWrappedClass() throws Throwable {
final DynamicWrappedIO broken = new DynamicWrappedIO(this.getClass().getName());
Assertions.assertThat(broken)
.describedAs("broken dynamic io %s", broken)
.matches(d -> !d.bulkDelete_available())
.matches(d -> !d.byteBufferPositionedReadable_available())
.matches(d -> !d.fileSystem_openFile_available());
final Path path = methodPath();
final FileSystem fs = getFileSystem();
// bulk deletes fail
intercept(UnsupportedOperationException.class, () ->
broken.bulkDelete_pageSize(fs, path));
intercept(UnsupportedOperationException.class, () ->
broken.bulkDelete_delete(fs, path, Lists.newArrayList()));
// openfile
intercept(UnsupportedOperationException.class, () ->
broken.fileSystem_openFile(fs, path, "", null, null, null));
// hasPathCapability downgrades
Assertions.assertThat(broken.pathCapabilities_hasPathCapability(fs, path, "anything"))
.describedAs("hasPathCapability(anything) via %s", broken)
.isFalse();
// byte buffer positioned readable
ContractTestUtils.touch(fs, path);
try (InputStream in = fs.open(path)) {
Assertions.assertThat(broken.byteBufferPositionedReadable_readFullyAvailable(in))
.describedAs("byteBufferPositionedReadable_readFullyAvailable on %s", in)
.isFalse();
intercept(UnsupportedOperationException.class, () ->
broken.byteBufferPositionedReadable_readFully(in, 0, allocate(1)));
}
}
/**
* Simulate a missing binding and verify that static methods fallback as required.
*/
@Test
public void testMissingClassFallbacks() throws Throwable {
Path path = path("testMissingClassFallbacks");
final FileSystem fs = getFileSystem();
file(fs, path, true, dataset(100, 'a', 26));
final DynamicWrappedIO broken = new DynamicWrappedIO(this.getClass().getName());
try (FSDataInputStream in = DynamicWrappedIO.openFileOnInstance(broken,
fs, fs.getFileStatus(path), DynamicWrappedIO.PARQUET_READ_POLICIES)) {
Assertions.assertThat(in.read())
.describedAs("first byte")
.isEqualTo('a');
}
}
/**
* Verify that if an attempt is made to bond to a class where the methods
* exist but are not static, that this fails during the object construction rather
* than on invocation.
*/
@Test
public void testNonStaticMethods() throws Throwable {
intercept(IllegalStateException.class, () ->
new DynamicWrappedIO(NonStaticBulkDeleteMethods.class.getName()));
}
/**
* This class declares the bulk delete methods, but as non-static; the expectation
* is that class loading will raise an {@link IllegalStateException}.
*/
private static final class NonStaticBulkDeleteMethods {
public int bulkDelete_pageSize(FileSystem ignoredFs, Path ignoredPath) {
return 0;
}
public List<Map.Entry<Path, String>> bulkDelete_delete(
FileSystem ignoredFs,
Path ignoredBase,
Collection<Path> ignoredPaths) {
return null;
}
}
}