ITestAzureBlobFileSystemRandomRead.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.EOFException;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.UUID;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ETAG;
/**
* Test random read operation.
*/
public class ITestAzureBlobFileSystemRandomRead extends
AbstractAbfsScaleTest {
private static final int BYTE = 1;
private static final int THREE_BYTES = 3;
private static final int FIVE_BYTES = 5;
private static final int TWENTY_BYTES = 20;
private static final int THIRTY_BYTES = 30;
private static final int KILOBYTE = 1024;
private static final int MEGABYTE = KILOBYTE * KILOBYTE;
private static final int FOUR_MB = 4 * MEGABYTE;
private static final int NINE_MB = 9 * MEGABYTE;
private static final long TEST_FILE_SIZE = 8 * MEGABYTE;
private static final int MAX_ELAPSEDTIMEMS = 20;
private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE;
private static final int SEEK_POSITION_ONE = 2* KILOBYTE;
private static final int SEEK_POSITION_TWO = 5 * KILOBYTE;
private static final int SEEK_POSITION_THREE = 10 * KILOBYTE;
private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE;
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * MEGABYTE;
private static final int DISABLED_READAHEAD_DEPTH = 0;
private static final String TEST_FILE_PREFIX = "/TestRandomRead";
private static final String WASB = "WASB";
private static final String ABFS = "ABFS";
private static final Logger LOG =
LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class);
public ITestAzureBlobFileSystemRandomRead() throws Exception {
super();
}
@Test
public void testBasicRead() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testBasicRead");
assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] buffer = new byte[3 * MEGABYTE];
// forward seek and read a kilobyte into first kilobyte of bufferV2
inputStream.seek(5 * MEGABYTE);
int numBytesRead = inputStream.read(buffer, 0, KILOBYTE);
assertEquals("Wrong number of bytes read", KILOBYTE, numBytesRead);
int len = MEGABYTE;
int offset = buffer.length - len;
// reverse seek and read a megabyte into last megabyte of bufferV1
inputStream.seek(3 * MEGABYTE);
numBytesRead = inputStream.read(buffer, offset, len);
assertEquals("Wrong number of bytes read after seek", len, numBytesRead);
}
}
/**
* Validates the implementation of random read in ABFS
* @throws IOException
*/
@Test
public void testRandomRead() throws Exception {
Assume.assumeFalse("This test does not support namespace enabled account",
getIsNamespaceEnabled(getFileSystem()));
Path testPath = path(TEST_FILE_PREFIX + "_testRandomRead");
assumeHugeFileExists(testPath);
try (
FSDataInputStream inputStreamV1
= this.getFileSystem().open(testPath);
FSDataInputStream inputStreamV2
= this.getWasbFileSystem().open(testPath);
) {
final int bufferSize = 4 * KILOBYTE;
byte[] bufferV1 = new byte[bufferSize];
byte[] bufferV2 = new byte[bufferV1.length];
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(0);
inputStreamV2.seek(0);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(SEEK_POSITION_ONE);
inputStreamV2.seek(SEEK_POSITION_ONE);
inputStreamV1.seek(0);
inputStreamV2.seek(0);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(SEEK_POSITION_TWO);
inputStreamV2.seek(SEEK_POSITION_TWO);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(SEEK_POSITION_THREE);
inputStreamV2.seek(SEEK_POSITION_THREE);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
inputStreamV1.seek(SEEK_POSITION_FOUR);
inputStreamV2.seek(SEEK_POSITION_FOUR);
verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
}
}
/**
* Validates the implementation of Seekable.seekToNewSource
* @throws IOException
*/
@Test
public void testSeekToNewSource() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testSeekToNewSource");
assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
assertFalse(inputStream.seekToNewSource(0));
}
}
/**
* Validates the implementation of InputStream.skip and ensures there is no
* network I/O for AbfsInputStream
* @throws Exception
*/
@Test
public void testSkipBounds() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testSkipBounds");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
long skipped = inputStream.skip(-1);
assertEquals(0, skipped);
skipped = inputStream.skip(0);
assertEquals(0, skipped);
assertTrue(testFileLength > 0);
skipped = inputStream.skip(testFileLength);
assertEquals(testFileLength, skipped);
intercept(EOFException.class,
new Callable<Long>() {
@Override
public Long call() throws Exception {
return inputStream.skip(1);
}
}
);
long elapsedTimeMs = timer.elapsedTimeMs();
assertTrue(
String.format(
"There should not be any network I/O (elapsedTimeMs=%1$d).",
elapsedTimeMs),
elapsedTimeMs < MAX_ELAPSEDTIMEMS);
}
}
/**
* Validates the implementation of Seekable.seek and ensures there is no
* network I/O for forward seek.
* @throws Exception
*/
@Test
public void testValidateSeekBounds() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
inputStream.seek(0);
assertEquals(0, inputStream.getPos());
intercept(EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
new Callable<FSDataInputStream>() {
@Override
public FSDataInputStream call() throws Exception {
inputStream.seek(-1);
return inputStream;
}
}
);
assertTrue("Test file length only " + testFileLength, testFileLength > 0);
inputStream.seek(testFileLength);
assertEquals(testFileLength, inputStream.getPos());
intercept(EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
new Callable<FSDataInputStream>() {
@Override
public FSDataInputStream call() throws Exception {
inputStream.seek(testFileLength + 1);
return inputStream;
}
}
);
long elapsedTimeMs = timer.elapsedTimeMs();
assertTrue(
String.format(
"There should not be any network I/O (elapsedTimeMs=%1$d).",
elapsedTimeMs),
elapsedTimeMs < MAX_ELAPSEDTIMEMS);
}
}
/**
* Validates the implementation of Seekable.seek, Seekable.getPos,
* and InputStream.available.
* @throws Exception
*/
@Test
public void testSeekAndAvailableAndPosition() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
byte[] buffer = new byte[3];
int bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected1, buffer);
assertEquals(buffer.length, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected2, buffer);
assertEquals(2 * buffer.length, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
// reverse seek
int seekPos = 0;
inputStream.seek(seekPos);
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected1, buffer);
assertEquals(buffer.length + seekPos, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
// reverse seek
seekPos = 1;
inputStream.seek(seekPos);
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected3, buffer);
assertEquals(buffer.length + seekPos, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
// forward seek
seekPos = 6;
inputStream.seek(seekPos);
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected4, buffer);
assertEquals(buffer.length + seekPos, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
}
}
/**
* Validates the implementation of InputStream.skip, Seekable.getPos,
* and InputStream.available.
* @throws IOException
*/
@Test
public void testSkipAndAvailableAndPosition() throws Exception {
Path testPath = path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
long testFileLength = assumeHugeFileExists(testPath);
try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
assertEquals(testFileLength, inputStream.available());
assertEquals(0, inputStream.getPos());
int n = 3;
long skipped = inputStream.skip(n);
assertEquals(skipped, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
assertEquals(skipped, n);
byte[] buffer = new byte[3];
int bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected2, buffer);
assertEquals(buffer.length + skipped, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
// does skip still work after seek?
int seekPos = 1;
inputStream.seek(seekPos);
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected3, buffer);
assertEquals(buffer.length + seekPos, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
long currentPosition = inputStream.getPos();
n = 2;
skipped = inputStream.skip(n);
assertEquals(currentPosition + skipped, inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
assertEquals(skipped, n);
bytesRead = inputStream.read(buffer);
assertEquals(buffer.length, bytesRead);
assertArrayEquals(expected4, buffer);
assertEquals(buffer.length + skipped + currentPosition,
inputStream.getPos());
assertEquals(testFileLength - inputStream.getPos(),
inputStream.available());
}
}
/**
* Ensures parity in the performance of sequential read after reverse seek for
* abfs of the AbfsInputStream.
* @throws IOException
*/
@Test
public void testSequentialReadAfterReverseSeekPerformance()
throws Exception {
Path testPath = path(
TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
assumeHugeFileExists(testPath);
final int maxAttempts = 10;
final double maxAcceptableRatio = 1.01;
double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
double ratio = Double.MAX_VALUE;
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
beforeSeekElapsedMs = sequentialRead(ABFS, testPath,
this.getFileSystem(), false);
afterSeekElapsedMs = sequentialRead(ABFS, testPath,
this.getFileSystem(), true);
ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
LOG.info((String.format(
"beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
(long) beforeSeekElapsedMs,
(long) afterSeekElapsedMs,
ratio)));
}
assertTrue(String.format(
"Performance of ABFS stream after reverse seek is not acceptable:"
+ " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
+ " ratio=%3$.2f",
(long) beforeSeekElapsedMs,
(long) afterSeekElapsedMs,
ratio),
ratio < maxAcceptableRatio);
}
@Test
@Ignore("HADOOP-16915")
public void testRandomReadPerformance() throws Exception {
Assume.assumeFalse("This test does not support namespace enabled account",
getIsNamespaceEnabled(getFileSystem()));
Path testPath = path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
assumeHugeFileExists(testPath);
final AzureBlobFileSystem abFs = this.getFileSystem();
final NativeAzureFileSystem wasbFs = this.getWasbFileSystem();
final int maxAttempts = 10;
final double maxAcceptableRatio = 1.025;
double v1ElapsedMs = 0, v2ElapsedMs = 0;
double ratio = Double.MAX_VALUE;
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
v1ElapsedMs = randomRead(1, testPath, wasbFs);
v2ElapsedMs = randomRead(2, testPath, abFs);
ratio = v2ElapsedMs / v1ElapsedMs;
LOG.info(String.format(
"v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
(long) v1ElapsedMs,
(long) v2ElapsedMs,
ratio));
}
assertTrue(String.format(
"Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+ " v2ElapsedMs=%2$d, ratio=%3$.2f",
(long) v1ElapsedMs,
(long) v2ElapsedMs,
ratio),
ratio < maxAcceptableRatio);
}
/**
* With this test we should see a full buffer read being triggered in case
* alwaysReadBufferSize is on, else only the requested buffer size.
* Hence a seek done few bytes away from last read position will trigger
* a network read when alwaysReadBufferSize is off, whereas it will return
* from the internal buffer when it is on.
* Reading a full buffer size is the Gen1 behaviour.
* @throws Throwable
*/
@Test
public void testAlwaysReadBufferSizeConfig() throws Throwable {
testAlwaysReadBufferSizeConfig(false);
testAlwaysReadBufferSizeConfig(true);
}
public void testAlwaysReadBufferSizeConfig(boolean alwaysReadBufferSizeConfigValue)
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.readaheadqueue.depth", "0");
config.set("fs.azure.read.alwaysReadBufferSize",
Boolean.toString(alwaysReadBufferSizeConfigValue));
final Path testFile = new Path("/FileName_"
+ UUID.randomUUID().toString());
final AzureBlobFileSystem fs = createTestFile(testFile, 16 * MEGABYTE,
1 * MEGABYTE, config);
String eTag = fs.getAbfsClient()
.getPathStatus(testFile.toUri().getPath(), false,
getTestTracingContext(fs, false), null)
.getResult()
.getResponseHeader(ETAG);
TestAbfsInputStream testInputStream = new TestAbfsInputStream();
AbfsInputStream inputStream = testInputStream.getAbfsInputStream(
fs.getAbfsClient(),
testFile.getName(), ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, eTag,
DISABLED_READAHEAD_DEPTH, FOUR_MB,
alwaysReadBufferSizeConfigValue, FOUR_MB);
long connectionsAtStart = fs.getInstrumentationMap()
.get(GET_RESPONSES.getStatName());
long dateSizeReadStatAtStart = fs.getInstrumentationMap()
.get(BYTES_RECEIVED.getStatName());
long newReqCount = 0;
long newDataSizeRead = 0;
byte[] buffer20b = new byte[TWENTY_BYTES];
byte[] buffer30b = new byte[THIRTY_BYTES];
byte[] byteBuffer5 = new byte[FIVE_BYTES];
// first read
// if alwaysReadBufferSize is off, this is a sequential read
inputStream.read(byteBuffer5, 0, FIVE_BYTES);
newReqCount++;
newDataSizeRead += FOUR_MB;
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount,
fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED,
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
// second read beyond that the buffer holds
// if alwaysReadBufferSize is off, this is a random read. Reads only
// incoming buffer size
// else, reads a buffer size
inputStream.seek(NINE_MB);
inputStream.read(buffer20b, 0, BYTE);
newReqCount++;
if (alwaysReadBufferSizeConfigValue) {
newDataSizeRead += FOUR_MB;
} else {
newDataSizeRead += TWENTY_BYTES;
}
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED,
dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
// third read adjacent to second but not exactly sequential.
// if alwaysReadBufferSize is off, this is another random read
// else second read would have read this too.
inputStream.seek(NINE_MB + TWENTY_BYTES + THREE_BYTES);
inputStream.read(buffer30b, 0, THREE_BYTES);
if (!alwaysReadBufferSizeConfigValue) {
newReqCount++;
newDataSizeRead += THIRTY_BYTES;
}
assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
assertAbfsStatistics(BYTES_RECEIVED, dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
}
private long sequentialRead(String version,
Path testPath,
FileSystem fs,
boolean afterReverseSeek) throws IOException {
byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE];
long totalBytesRead = 0;
long bytesRead = 0;
long testFileLength = fs.getFileStatus(testPath).getLen();
try(FSDataInputStream inputStream = fs.open(testPath)) {
if (afterReverseSeek) {
while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
bytesRead = inputStream.read(buffer);
totalBytesRead += bytesRead;
}
totalBytesRead = 0;
inputStream.seek(0);
}
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
while ((bytesRead = inputStream.read(buffer)) > 0) {
totalBytesRead += bytesRead;
}
long elapsedTimeMs = timer.elapsedTimeMs();
LOG.info(String.format(
"v%1$s: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
+ " afterReverseSeek=%5$s",
version,
totalBytesRead,
elapsedTimeMs,
toMbps(totalBytesRead, elapsedTimeMs),
afterReverseSeek));
assertEquals(testFileLength, totalBytesRead);
inputStream.close();
return elapsedTimeMs;
}
}
private long randomRead(int version, Path testPath, FileSystem fs) throws Exception {
assumeHugeFileExists(testPath);
final long minBytesToRead = 2 * MEGABYTE;
Random random = new Random();
byte[] buffer = new byte[8 * KILOBYTE];
long totalBytesRead = 0;
long bytesRead = 0;
try(FSDataInputStream inputStream = fs.open(testPath)) {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
do {
bytesRead = inputStream.read(buffer);
totalBytesRead += bytesRead;
inputStream.seek(random.nextInt(
(int) (TEST_FILE_SIZE - buffer.length)));
} while (bytesRead > 0 && totalBytesRead < minBytesToRead);
long elapsedTimeMs = timer.elapsedTimeMs();
inputStream.close();
LOG.info(String.format(
"v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
version,
totalBytesRead,
elapsedTimeMs,
toMbps(totalBytesRead, elapsedTimeMs)));
assertTrue(minBytesToRead <= totalBytesRead);
return elapsedTimeMs;
}
}
/**
* Calculate megabits per second from the specified values for bytes and
* milliseconds.
* @param bytes The number of bytes.
* @param milliseconds The number of milliseconds.
* @return The number of megabits per second.
*/
private static double toMbps(long bytes, long milliseconds) {
return bytes / 1000.0 * 8 / milliseconds;
}
private long createTestFile(Path testPath) throws Exception {
createTestFile(testPath,
TEST_FILE_SIZE,
MEGABYTE,
null);
return TEST_FILE_SIZE;
}
private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
int createBufferSize, Configuration config) throws Exception {
AzureBlobFileSystem fs;
if (config == null) {
config = this.getRawConfiguration();
}
final AzureBlobFileSystem currentFs = getFileSystem();
fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
if (fs.exists(testFilePath)) {
FileStatus status = fs.getFileStatus(testFilePath);
if (status.getLen() == testFileSize) {
return fs;
}
}
byte[] buffer = new byte[createBufferSize];
char character = 'a';
for (int i = 0; i < buffer.length; i++) {
buffer[i] = (byte) character;
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
}
LOG.info(String.format("Creating test file %s of size: %d ", testFilePath, testFileSize));
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
String bufferContents = new String(buffer);
int bytesWritten = 0;
while (bytesWritten < testFileSize) {
outputStream.write(buffer);
bytesWritten += buffer.length;
}
LOG.info("Closing stream {}", outputStream);
ContractTestUtils.NanoTimer closeTimer
= new ContractTestUtils.NanoTimer();
outputStream.close();
closeTimer.end("time to close() output stream");
}
timer.end("time to write %d KB", testFileSize / 1024);
return fs;
}
private long assumeHugeFileExists(Path testPath) throws Exception{
long fileSize = createTestFile(testPath);
FileSystem fs = this.getFileSystem();
ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", testPath);
FileStatus status = fs.getFileStatus(testPath);
ContractTestUtils.assertIsFile(testPath, status);
assertTrue("File " + testPath + " is not of expected size " + fileSize + ":actual=" + status.getLen(), status.getLen() == fileSize);
return fileSize;
}
private void verifyConsistentReads(FSDataInputStream inputStreamV1,
FSDataInputStream inputStreamV2,
byte[] bufferV1,
byte[] bufferV2) throws IOException {
int size = bufferV1.length;
final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
assertEquals("Bytes read from wasb stream", size, numBytesReadV1);
final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
assertEquals("Bytes read from abfs stream", size, numBytesReadV2);
assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
}
}