AbstractContractOpenTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
import org.assertj.core.api.Assertions;
import org.junit.Test;
/**
* Test Open operations.
*/
public abstract class AbstractContractOpenTest
extends AbstractFSContractTestBase {
private FSDataInputStream instream;
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096);
return conf;
}
@Override
public void teardown() throws Exception {
IOUtils.closeStream(instream);
instream = null;
super.teardown();
}
@Test
public void testOpenReadZeroByteFile() throws Throwable {
describe("create & read a 0 byte file");
Path path = path("zero.txt");
touch(getFileSystem(), path);
instream = getFileSystem().open(path);
assertEquals(0, instream.getPos());
//expect initial read to fail
assertMinusOne("initial byte read", instream.read());
}
@Test
public void testFsIsEncrypted() throws Exception {
describe("create an empty file and call FileStatus.isEncrypted()");
final Path path = path("file");
createFile(getFileSystem(), path, false, new byte[0]);
final FileStatus stat = getFileSystem().getFileStatus(path);
assertEquals("Result wrong for for isEncrypted() in " + stat,
areZeroByteFilesEncrypted(),
stat.isEncrypted());
}
/**
* Are zero byte files encrypted. This is implicitly
* false for filesystems which do not encrypt.
* @return true iff zero byte files are encrypted.
*/
protected boolean areZeroByteFilesEncrypted() {
return false;
}
@Test
public void testOpenReadDir() throws Throwable {
describe("create & read a directory");
Path path = path("zero.dir");
mkdirs(path);
try {
instream = getFileSystem().open(path);
//at this point we've opened a directory
fail("A directory has been opened for reading");
} catch (FileNotFoundException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("opening a directory for reading",
"FileNotFoundException",
e);
}
}
@Test
public void testOpenReadDirWithChild() throws Throwable {
describe("create & read a directory which has a child");
Path path = path("zero.dir");
mkdirs(path);
Path path2 = new Path(path, "child");
mkdirs(path2);
try {
instream = getFileSystem().open(path);
//at this point we've opened a directory
fail("A directory has been opened for reading");
} catch (FileNotFoundException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("opening a directory for reading",
"FileNotFoundException",
e);
}
}
@Test
public void testOpenFileTwice() throws Throwable {
describe("verify that two opened file streams are independent");
Path path = path("testopenfiletwice.txt");
byte[] block = dataset(TEST_FILE_LEN, 0, 255);
//this file now has a simple rule: offset => value
createFile(getFileSystem(), path, true, block);
//open first
FSDataInputStream instream1 = getFileSystem().open(path);
FSDataInputStream instream2 = null;
try {
int c = instream1.read();
assertEquals(0,c);
instream2 = getFileSystem().open(path);
assertEquals("first read of instream 2", 0, instream2.read());
assertEquals("second read of instream 1", 1, instream1.read());
instream1.close();
assertEquals("second read of instream 2", 1, instream2.read());
//close instream1 again
instream1.close();
} finally {
IOUtils.closeStream(instream1);
IOUtils.closeStream(instream2);
}
}
@Test
public void testSequentialRead() throws Throwable {
describe("verify that sequential read() operations return values");
Path path = path("testsequentialread.txt");
int len = 4;
int base = 0x40; // 64
byte[] block = dataset(len, base, base + len);
//this file now has a simple rule: offset => (value | 0x40)
createFile(getFileSystem(), path, true, block);
//open first
instream = getFileSystem().open(path);
assertEquals(base, instream.read());
assertEquals(base + 1, instream.read());
assertEquals(base + 2, instream.read());
assertEquals(base + 3, instream.read());
// and now, failures
assertEquals(-1, instream.read());
assertEquals(-1, instream.read());
instream.close();
}
@Test
public void testOpenFileReadZeroByte() throws Throwable {
describe("create & read a 0 byte file through the builders; use a negative length");
Path path = path("zero.txt");
FileSystem fs = getFileSystem();
fs.createFile(path).overwrite(true).build().close();
try (FSDataInputStream is = fs.openFile(path)
.opt("fs.test.something", true)
.opt("fs.test.something2", 3)
.opt("fs.test.something3", "3")
.optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
.build().get()) {
assertMinusOne("initial byte read", is.read());
}
}
@Test
public void testOpenFileUnknownOption() throws Throwable {
describe("calling openFile fails when a 'must()' option is unknown");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testOpenFileUnknownOption"))
.opt("fs.test.something", true)
.must("fs.test.something", true);
intercept(IllegalArgumentException.class,
() -> builder.build());
}
@Test
public void testOpenFileUnknownOptionLong() throws Throwable {
describe("calling openFile fails when a 'must()' option is unknown");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testOpenFileUnknownOption"))
.optLong("fs.test.something", 1L)
.mustLong("fs.test.something2", 1L);
intercept(IllegalArgumentException.class,
() -> builder.build());
}
@Test
public void testOpenFileLazyFail() throws Throwable {
describe("openFile fails on a missing file in the get() and not before");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testOpenFileLazyFail"))
.opt("fs.test.something", true);
interceptFuture(FileNotFoundException.class, "", builder.build());
}
@Test
public void testOpenFileFailExceptionally() throws Throwable {
describe("openFile missing file chains into exceptionally()");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testOpenFileFailExceptionally"))
.opt("fs.test.something", true);
assertNull("exceptional uprating",
builder.build().exceptionally(ex -> null).get());
}
@Test
public void testAwaitFutureFailToFNFE() throws Throwable {
describe("Verify that FutureIOSupport.awaitFuture extracts IOExceptions");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
.opt("fs.test.something", true);
intercept(FileNotFoundException.class,
() -> awaitFuture(builder.build()));
}
@Test
public void testAwaitFutureTimeoutFailToFNFE() throws Throwable {
describe("Verify that FutureIOSupport.awaitFuture with a timeout works");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
.opt("fs.test.something", true);
intercept(FileNotFoundException.class,
() -> awaitFuture(builder.build(),
10, TimeUnit.DAYS));
}
@Test
public void testOpenFileExceptionallyTranslating() throws Throwable {
describe("openFile missing file chains into exceptionally()");
CompletableFuture<FSDataInputStream> f = getFileSystem()
.openFile(path("testOpenFileExceptionallyTranslating")).build();
interceptFuture(RuntimeException.class,
"exceptionally",
f.exceptionally(ex -> {
throw new RuntimeException("exceptionally", ex);
}));
}
@Test
public void testChainedFailureAwaitFuture() throws Throwable {
describe("await Future handles chained failures");
CompletableFuture<FSDataInputStream> f = getFileSystem()
.openFile(path("testChainedFailureAwaitFuture"))
.withFileStatus(null)
.build();
intercept(RuntimeException.class,
"exceptionally",
() -> awaitFuture(
f.exceptionally(ex -> {
throw new RuntimeException("exceptionally", ex);
})));
}
@Test
public void testOpenFileApplyRead() throws Throwable {
describe("use the apply sequence to read a whole file");
Path path = path("testOpenFileApplyRead");
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
FileStatus st = fs.getFileStatus(path);
CompletableFuture<Long> readAllBytes = fs.openFile(path)
.withFileStatus(st)
.build()
.thenApply(ContractTestUtils::readStream);
assertEquals("Wrong number of bytes read value",
len,
(long) readAllBytes.get());
// now reattempt with a new FileStatus and a different path
// other than the final name element
// implementations MUST use path in openFile() call
FileStatus st2 = new FileStatus(
len, false,
st.getReplication(),
st.getBlockSize(),
st.getModificationTime(),
st.getAccessTime(),
st.getPermission(),
st.getOwner(),
st.getGroup(),
new Path("gopher:///localhost:/" + path.getName()));
assertEquals("Wrong number of bytes read value",
len,
(long) fs.openFile(path)
.withFileStatus(st2)
.build()
.thenApply(ContractTestUtils::readStream)
.get());
}
@Test
public void testOpenFileApplyAsyncRead() throws Throwable {
describe("verify that async accept callbacks are evaluated");
Path path = path("testOpenFileApplyAsyncRead");
FileSystem fs = getFileSystem();
final int len = 512;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
CompletableFuture<FSDataInputStream> future = fs.openFile(path)
.mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double
.build();
AtomicBoolean accepted = new AtomicBoolean(false);
final Long bytes = future.thenApply(stream -> {
accepted.set(true);
return ContractTestUtils.readStream(stream);
}).get();
assertTrue("async accept operation not invoked",
accepted.get());
Assertions.assertThat(bytes)
.describedAs("bytes read from stream")
.isEqualTo(len);
}
/**
* Open a file with a null status, and the length
* passed in as an opt() option (along with sequential IO).
* The file is opened, the data read, and it must match
* the source data.
* opt() is used so that integration testing with external
* filesystem connectors will downgrade if the option is not
* recognized.
*/
@Test
public void testOpenFileNullStatusButFileLength() throws Throwable {
describe("use openFile() with a null status and expect the status to be"
+ " ignored. block size, fadvise and length are passed in as"
+ " opt() options");
Path path = path("testOpenFileNullStatus");
FileSystem fs = getFileSystem();
int len = 4;
byte[] result = new byte[len];
byte[] dataset = dataset(len, 0x40, 0x80);
createFile(fs, path, true,
dataset);
CompletableFuture<FSDataInputStream> future = fs.openFile(path)
.withFileStatus(null)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
"unknown, sequential, random")
.optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
.optLong(FS_OPTION_OPENFILE_LENGTH, len)
.build();
try (FSDataInputStream in = future.get()) {
in.readFully(result);
}
compareByteArrays(dataset, result, len);
}
/**
* open a file with a length set as a double; verifies resilience
* of the parser.
*/
@Test
public void testFloatingPointLength() throws Throwable {
describe("Open file with a length");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
final Long l = fs.openFile(path)
.mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
.build()
.thenApply(ContractTestUtils::readStream)
.get();
Assertions.assertThat(l)
.describedAs("bytes read from file %s", path)
.isEqualTo(len);
}
}