ITestAbfsFileSystemContractSeek.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contract;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
/**
* Contract test for seek operation.
*/
public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
private final boolean isSecure;
private final ABFSContractTestBinding binding;
private static final byte[] BLOCK = dataset(100 * 1024, 0, 255);
public ITestAbfsFileSystemContractSeek() throws Exception {
binding = new ABFSContractTestBinding();
this.isSecure = binding.isSecureMode();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return binding.getRawConfiguration();
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
disableFilesystemCaching(conf);
return new AbfsFileSystemContract(conf, isSecure);
}
/**
* Test verifies if the data is read correctly
* when {@code ConfigurationKeys#AZURE_READ_AHEAD_RANGE} is set.
* Reason for not breaking this test into smaller parts is we
* really want to simulate lot of forward and backward seeks
* similar to real production use case.
*/
@Test
public void testSeekAndReadWithReadAhead() throws IOException {
describe(" Testing seek and read with read ahead "
+ "enabled for random reads");
Path testSeekFile = path(getMethodName() + "bigseekfile.txt");
createDataSet(testSeekFile);
try (FSDataInputStream in = getFileSystem().open(testSeekFile)) {
AbfsInputStream inStream = ((AbfsInputStream) in.getWrappedStream());
AbfsInputStreamStatisticsImpl streamStatistics =
(AbfsInputStreamStatisticsImpl) inStream.getStreamStatistics();
assertEquals(String.format("Value of %s is not set correctly", AZURE_READ_AHEAD_RANGE),
MIN_BUFFER_SIZE, inStream.getReadAheadRange());
long remoteReadOperationsOldVal = streamStatistics.getRemoteReadOperations();
Assertions.assertThat(remoteReadOperationsOldVal)
.describedAs("Number of remote read ops should be 0 "
+ "before any read call is made")
.isEqualTo(0);
// Test read at first position. Remote read.
Assertions.assertThat(inStream.getPos())
.describedAs("First call to getPos() should return 0")
.isEqualTo(0);
assertDataAtPos(0, (byte) in.read());
assertSeekBufferStats(0, streamStatistics.getSeekInBuffer());
long remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seeking just before read ahead range. Read from buffer.
int newSeek = inStream.getReadAheadRange() - 1;
in.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seeking boundary of read ahead range. Read from buffer manager.
newSeek = inStream.getReadAheadRange();
inStream.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(1, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seeking just after read ahead range. Read from buffer.
newSeek = inStream.getReadAheadRange() + 1;
in.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(2, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seeking just 10 more bytes such that data is read from buffer.
newSeek += 10;
in.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seek backward such that data is read from remote.
newSeek -= 106;
in.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(3, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Seeking just 10 more bytes such that data is read from buffer.
newSeek += 10;
in.seek(newSeek);
assertGetPosition(newSeek, in.getPos());
assertDataAtPos(newSeek, (byte) in.read());
assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
remoteReadOperationsOldVal = remoteReadOperationsNewVal;
// Read multiple bytes across read ahead range. Remote read.
long oldSeek = newSeek;
newSeek = 2*inStream.getReadAheadRange() -1;
byte[] bytes = new byte[5];
in.readFully(newSeek, bytes);
// With readFully getPos should return oldSeek pos.
// Adding one as one byte is already read
// after the last seek is done.
assertGetPosition(oldSeek + 1, in.getPos());
assertSeekBufferStats(4, streamStatistics.getSeekInBuffer());
assertDatasetEquals(newSeek, "Read across read ahead ",
bytes, bytes.length);
remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations();
assertIncrementInRemoteReadOps(remoteReadOperationsOldVal,
remoteReadOperationsNewVal);
}
}
/**
* Test to validate the getPos() when a seek is done
* post {@code AbfsInputStream#unbuffer} call is made.
* Also using optimised builder api to open file.
*/
@Test
public void testSeekAfterUnbuffer() throws IOException {
describe("Test to make sure that seeking in AbfsInputStream after "
+ "unbuffer() call is not doing anyIO.");
Path testFile = path(getMethodName() + ".txt");
createDataSet(testFile);
final CompletableFuture<FSDataInputStream> future =
getFileSystem().openFile(testFile)
.build();
try (FSDataInputStream inputStream = awaitFuture(future)) {
AbfsInputStream abfsInputStream = (AbfsInputStream) inputStream.getWrappedStream();
AbfsInputStreamStatisticsImpl streamStatistics =
(AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
int readAheadRange = abfsInputStream.getReadAheadRange();
long seekPos = readAheadRange;
inputStream.seek(seekPos);
assertDataAtPos(readAheadRange, (byte) inputStream.read());
long currentRemoteReadOps = streamStatistics.getRemoteReadOperations();
assertIncrementInRemoteReadOps(0, currentRemoteReadOps);
inputStream.unbuffer();
seekPos -= 10;
inputStream.seek(seekPos);
// Seek backwards shouldn't do any IO
assertNoIncrementInRemoteReadOps(currentRemoteReadOps, streamStatistics.getRemoteReadOperations());
assertGetPosition(seekPos, inputStream.getPos());
}
}
private void createDataSet(Path path) throws IOException {
createFile(getFileSystem(), path, true, BLOCK);
}
private void assertGetPosition(long expected, long actual) {
final String seekPosErrorMsg = "getPos() should return %s";
Assertions.assertThat(actual)
.describedAs(seekPosErrorMsg, expected)
.isEqualTo(actual);
}
private void assertDataAtPos(int pos, byte actualData) {
final String dataErrorMsg = "Mismatch in data@%s";
Assertions.assertThat(actualData)
.describedAs(dataErrorMsg, pos)
.isEqualTo(BLOCK[pos]);
}
private void assertSeekBufferStats(long expected, long actual) {
final String statsErrorMsg = "Mismatch in seekInBuffer counts";
Assertions.assertThat(actual)
.describedAs(statsErrorMsg)
.isEqualTo(expected);
}
private void assertNoIncrementInRemoteReadOps(long oldVal, long newVal) {
final String incrementErrorMsg = "Number of remote read ops shouldn't increase";
Assertions.assertThat(newVal)
.describedAs(incrementErrorMsg)
.isEqualTo(oldVal);
}
private void assertIncrementInRemoteReadOps(long oldVal, long newVal) {
final String incrementErrorMsg = "Number of remote read ops should increase";
Assertions.assertThat(newVal)
.describedAs(incrementErrorMsg)
.isGreaterThan(oldVal);
}
/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
*/
private void assertDatasetEquals(
final int readOffset,
final String operation,
final byte[] data,
int length) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
Assertions.assertThat(data[i])
.describedAs(operation + "with read offset " + readOffset
+ ": data[" + i + "] != actualData[" + o + "]")
.isEqualTo(BLOCK[o]);
}
}
}