AbstractContractPathHandleTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathHandleException;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import static org.apache.hadoop.fs.contract.ContractTestUtils.appendFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import org.apache.hadoop.fs.RawPathHandle;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test {@link PathHandle} operations and semantics.
* @see ContractOptions#SUPPORTS_FILE_REFERENCE
* @see ContractOptions#SUPPORTS_CONTENT_CHECK
* @see org.apache.hadoop.fs.FileSystem#getPathHandle(FileStatus, HandleOpt...)
* @see org.apache.hadoop.fs.FileSystem#open(PathHandle)
* @see org.apache.hadoop.fs.FileSystem#open(PathHandle, int)
*/
@RunWith(Parameterized.class)
public abstract class AbstractContractPathHandleTest
extends AbstractFSContractTestBase {
private final HandleOpt[] opts;
private final boolean serialized;
private static final byte[] B1 = dataset(TEST_FILE_LEN, 43, 255);
private static final byte[] B2 = dataset(TEST_FILE_LEN, 44, 255);
/**
* Create an instance of the test from {@link #params()}.
* @param testname Name of the set of options under test
* @param opts Set of {@link HandleOpt} params under test.
* @param serialized Serialize the handle before using it.
*/
public AbstractContractPathHandleTest(String testname, HandleOpt[] opts,
boolean serialized) {
this.opts = opts;
this.serialized = serialized;
}
/**
* Run test against all combinations of default options. Also run each
* after converting the PathHandle to bytes and back.
* @return
*/
@Parameterized.Parameters(name="Test{0}")
public static Collection<Object[]> params() {
return Arrays.asList(
Arrays.asList("Exact", HandleOpt.exact()),
Arrays.asList("Content", HandleOpt.content()),
Arrays.asList("Path", HandleOpt.path()),
Arrays.asList("Reference", HandleOpt.reference())
).stream()
.flatMap((x) -> Arrays.asList(true, false).stream()
.map((b) -> {
ArrayList<Object> y = new ArrayList<>(x);
y.add(b);
return y;
}))
.map(ArrayList::toArray)
.collect(Collectors.toList());
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, 4096);
return conf;
}
@Test
public void testIdent() throws IOException {
describe("verify simple open, no changes");
FileStatus stat = testFile(B1);
PathHandle fd = getHandleOrSkip(stat);
verifyFileContents(getFileSystem(), stat.getPath(), B1);
try (FSDataInputStream in = getFileSystem().open(fd)) {
verifyRead(in, B1, 0, TEST_FILE_LEN);
}
}
@Test
public void testChanged() throws IOException {
describe("verify open(PathHandle, changed(*))");
assumeSupportsContentCheck();
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
try {
// Temporary workaround while RawLocalFS supports only second precision
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IOException(e);
}
// modify the file by appending data
appendFile(getFileSystem(), stat.getPath(), B2);
byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);
System.arraycopy(B2, 0, b12, B1.length, B2.length);
// verify fd entity contains contents of file1 + appended bytes
verifyFileContents(getFileSystem(), stat.getPath(), b12);
// get the handle *after* the file has been modified
PathHandle fd = getHandleOrSkip(stat);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect content change", data.allowChange());
verifyRead(in, b12, 0, b12.length);
} catch (InvalidPathHandleException e) {
assertFalse("Failed to allow content change", data.allowChange());
}
}
@Test
public void testMoved() throws IOException {
describe("verify open(PathHandle, moved(*))");
assumeSupportsFileReference();
HandleOpt.Location loc = HandleOpt.getOpt(HandleOpt.Location.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
// rename the file after obtaining FileStatus
ContractTestUtils.rename(getFileSystem(), stat.getPath(),
path(stat.getPath() + "2"));
// obtain handle to entity from #getFileStatus call
PathHandle fd = getHandleOrSkip(stat);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect location change", loc.allowChange());
verifyRead(in, B1, 0, B1.length);
} catch (InvalidPathHandleException e) {
assertFalse("Failed to allow location change", loc.allowChange());
}
}
@Test
public void testChangedAndMoved() throws IOException {
describe("verify open(PathHandle, changed(*), moved(*))");
assumeSupportsFileReference();
assumeSupportsContentCheck();
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElseThrow(IllegalArgumentException::new);
HandleOpt.Location loc = HandleOpt.getOpt(HandleOpt.Location.class, opts)
.orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1);
Path dst = path(stat.getPath() + "2");
ContractTestUtils.rename(getFileSystem(), stat.getPath(), dst);
appendFile(getFileSystem(), dst, B2);
PathHandle fd = getHandleOrSkip(stat);
byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);
System.arraycopy(B2, 0, b12, B1.length, B2.length);
try (FSDataInputStream in = getFileSystem().open(fd)) {
assertTrue("Failed to detect location change", loc.allowChange());
assertTrue("Failed to detect content change", data.allowChange());
verifyRead(in, b12, 0, b12.length);
} catch (InvalidPathHandleException e) {
if (data.allowChange()) {
assertFalse("Failed to allow location change", loc.allowChange());
}
if (loc.allowChange()) {
assertFalse("Failed to allow content change", data.allowChange());
}
}
}
private FileStatus testFile(byte[] content) throws IOException {
Path path = path(methodName.getMethodName());
createFile(getFileSystem(), path, false, content);
FileStatus stat = getFileSystem().getFileStatus(path);
assertNotNull(stat);
assertEquals(path, stat.getPath());
return stat;
}
/**
* Skip a test case if the FS doesn't support file references.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsFileReference() throws IOException {
if (getContract().isSupported(SUPPORTS_FILE_REFERENCE, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_FILE_REFERENCE);
}
/**
* Skip a test case if the FS doesn't support content validation.
* The feature is assumed to be unsupported unless stated otherwise.
*/
protected void assumeSupportsContentCheck() throws IOException {
if (getContract().isSupported(SUPPORTS_CONTENT_CHECK, false)) {
return;
}
skip("Skipping as unsupported feature: " + SUPPORTS_CONTENT_CHECK);
}
/**
* Utility method to obtain a handle or skip the test if the set of opts
* are not supported.
* @param stat Target file status
* @return Handle to the indicated entity or skip the test
*/
protected PathHandle getHandleOrSkip(FileStatus stat) {
try {
PathHandle fd = getFileSystem().getPathHandle(stat, opts);
if (serialized) {
ByteBuffer sb = fd.bytes();
return new RawPathHandle(sb);
}
return fd;
} catch (UnsupportedOperationException e) {
skip("FileSystem does not support " + Arrays.toString(opts));
}
// unreachable
return null;
}
@Test
public void testOpenFileApplyRead() throws Throwable {
describe("use the apply sequence to read a whole file");
CompletableFuture<Long> readAllBytes = getFileSystem()
.openFile(
getHandleOrSkip(
testFile(B1)))
.build()
.thenApply(ContractTestUtils::readStream);
assertEquals("Wrong number of bytes read value",
TEST_FILE_LEN,
(long) readAllBytes.get());
}
@Test
public void testOpenFileDelete() throws Throwable {
describe("use the apply sequence to read a whole file");
FileStatus testFile = testFile(B1);
PathHandle handle = getHandleOrSkip(testFile);
// delete that file
FileSystem fs = getFileSystem();
fs.delete(testFile.getPath(), false);
// now construct the builder.
// even if the open happens in the build operation,
// the failure must not surface until later.
CompletableFuture<FSDataInputStream> builder =
fs.openFile(handle)
.opt("fs.test.something", true)
.build();
IOException ioe = interceptFuture(IOException.class, "", builder);
if (!(ioe instanceof FileNotFoundException)
&& !(ioe instanceof InvalidPathHandleException)) {
// support both FileNotFoundException
// and InvalidPathHandleException as different implementations
// support either -and with non-atomic open sequences, possibly
// both
throw ioe;
}
}
@Test
public void testOpenFileLazyFail() throws Throwable {
describe("openFile fails on a misssng file in the get() and not before");
FileStatus stat = testFile(B1);
CompletableFuture<Long> readAllBytes = getFileSystem()
.openFile(
getHandleOrSkip(
stat))
.build()
.thenApply(ContractTestUtils::readStream);
assertEquals("Wrong number of bytes read value",
TEST_FILE_LEN,
(long) readAllBytes.get());
}
}