ITestSmallWriteOptimization.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import java.util.Map;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.runners.Parameterized;
import org.junit.runner.RunWith;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED;
/**
* Test combination for small writes with flush and close operations.
* This test class formulates an append test flow to assert on various scenarios.
* Test stages:
* 1. Pre-create test file of required size. This is determined by
* startingFileSize parameter. If it is 0, then pre-creation is skipped.
*
* 2. Formulate an append loop or iteration. An iteration, will do N writes
* (determined by numOfClientWrites parameter) with each writing X bytes
* (determined by recurringClientWriteSize parameter).
*
* 3. Determine total number of append iterations needed by a test.
* If intention is to close the outputStream right after append, setting
* directCloseTest parameter will determine 1 append test iteration with an
* ending close.
* Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with
* each doing appends, hflush/hsync and then close.
*
* 4. Execute test iterations with asserts on number of store requests made and
* validating file content.
*/
@RunWith(Parameterized.class)
public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest {
private static final int ONE_MB = 1024 * 1024;
private static final int TWO_MB = 2 * ONE_MB;
private static final int TEST_BUFFER_SIZE = TWO_MB;
private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2;
private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4;
private static final int TEST_FLUSH_ITERATION = 2;
@Parameterized.Parameter
public String testScenario;
@Parameterized.Parameter(1)
public boolean enableSmallWriteOptimization;
/**
* If true, will initiate close after appends. (That is, no explicit hflush or
* hsync calls will be made from client app.)
*/
@Parameterized.Parameter(2)
public boolean directCloseTest;
/**
* If non-zero, test file should be created as pre-requisite with this size.
*/
@Parameterized.Parameter(3)
public Integer startingFileSize;
/**
* Determines the write sizes to be issued by client app.
*/
@Parameterized.Parameter(4)
public Integer recurringClientWriteSize;
/**
* Determines the number of Client writes to make.
*/
@Parameterized.Parameter(5)
public Integer numOfClientWrites;
/**
* True, if the small write optimization is supposed to be effective in
* the scenario.
*/
@Parameterized.Parameter(6)
public boolean flushExpectedToBeMergedWithAppend;
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> params() {
return Arrays.asList(
// Parameter Order :
// testScenario,
// enableSmallWriteOptimization, directCloseTest, startingFileSize,
// recurringClientWriteSize, numOfClientWrites, flushExpectedToBeMergedWithAppend
new Object[][]{
// Buffer Size Write tests
{ "OptmON_FlushCloseTest_EmptyFile_BufferSizeWrite",
true, false, 0, TEST_BUFFER_SIZE, 1, false
},
{ "OptmON_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
},
{ "OptmON_CloseTest_EmptyFile_BufferSizeWrite",
true, true, 0, TEST_BUFFER_SIZE, 1, false
},
{ "OptmON_CloseTest_NonEmptyFile_BufferSizeWrite",
true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
},
{ "OptmOFF_FlushCloseTest_EmptyFile_BufferSizeWrite",
false, false, 0, TEST_BUFFER_SIZE, 1, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
},
{ "OptmOFF_CloseTest_EmptyFile_BufferSizeWrite",
false, true, 0, TEST_BUFFER_SIZE, 1, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_BufferSizeWrite",
false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
},
// Less than buffer size write tests
{ "OptmON_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
true, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
},
{ "OptmON_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
true, false, 2 * TEST_BUFFER_SIZE,
Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
},
{ "OptmON_CloseTest_EmptyFile_LessThanBufferSizeWrite",
true, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
},
{ "OptmON_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
true, true, 2 * TEST_BUFFER_SIZE,
Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
},
{ "OptmOFF_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
false, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
false, false, 2 * TEST_BUFFER_SIZE,
Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
},
{ "OptmOFF_CloseTest_EmptyFile_LessThanBufferSizeWrite",
false, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
false, true, 2 * TEST_BUFFER_SIZE,
Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
},
// Multiple small writes still less than buffer size
{ "OptmON_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
true, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
},
{ "OptmON_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
true, false, 2 * TEST_BUFFER_SIZE,
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
},
{ "OptmON_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
true, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
},
{ "OptmON_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
true, true, 2 * TEST_BUFFER_SIZE,
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
},
{ "OptmOFF_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
false, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
false, false, 2 * TEST_BUFFER_SIZE,
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
},
{ "OptmOFF_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
false, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
false, true, 2 * TEST_BUFFER_SIZE,
Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
},
// Multiple full buffer writes
{ "OptmON_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
true, false, 0, TEST_BUFFER_SIZE, 3, false
},
{ "OptmON_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
},
{ "OptmON_CloseTest_EmptyFile_MultiBufferSizeWrite",
true, true, 0, TEST_BUFFER_SIZE, 3, false
},
{ "OptmON_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
},
{ "OptmOFF_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
false, false, 0, TEST_BUFFER_SIZE, 3, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
},
{ "OptmOFF_CloseTest_EmptyFile_MultiBufferSizeWrite",
false, true, 0, TEST_BUFFER_SIZE, 3, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
},
// Multiple full buffers triggered and data less than buffer size pending
{ "OptmON_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
true, false, 0,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmON_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
true, false, 2 * TEST_BUFFER_SIZE,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmON_CloseTest_EmptyFile__BufferAndExtraWrite",
true, true, 0,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmON_CloseTest_NonEmptyFile_BufferAndExtraWrite",
true, true, 2 * TEST_BUFFER_SIZE,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmOFF_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
false, false, 0,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
false, false, 2 * TEST_BUFFER_SIZE,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmOFF_CloseTest_EmptyFile_BufferAndExtraWrite",
false, true, 0,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_BufferAndExtraWrite",
false, true, 2 * TEST_BUFFER_SIZE,
TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
3, false
},
// 0 byte tests
{ "OptmON_FlushCloseTest_EmptyFile_0ByteWrite",
true, false, 0, 0, 1, false
},
{ "OptmON_FlushCloseTest_NonEmptyFile_0ByteWrite",
true, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
},
{ "OptmON_CloseTest_EmptyFile_0ByteWrite",
true, true, 0, 0, 1, false
},
{ "OptmON_CloseTest_NonEmptyFile_0ByteWrite",
true, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
},
{ "OptmOFF_FlushCloseTest_EmptyFile_0ByteWrite",
false, false, 0, 0, 1, false
},
{ "OptmOFF_FlushCloseTest_NonEmptyFile_0ByteWrite",
false, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
},
{ "OptmOFF_CloseTest_EmptyFile_0ByteWrite",
false, true, 0, 0, 1, false
},
{ "OptmOFF_CloseTest_NonEmptyFile_0ByteWrite",
false, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
},
});
}
public ITestSmallWriteOptimization() throws Exception {
super();
}
@Test
public void testSmallWriteOptimization()
throws IOException {
boolean serviceDefaultOptmSettings = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
// Tests with Optimization should only run if service has the feature on by
// default. Default settings will be turned on when server support is
// available on all store prod regions.
if (enableSmallWriteOptimization) {
Assume.assumeTrue(serviceDefaultOptmSettings);
}
final AzureBlobFileSystem currentfs = this.getFileSystem();
Configuration config = currentfs.getConf();
boolean isAppendBlobTestSettingEnabled = (config.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true");
// This optimization doesnt take effect when append blob is on.
Assume.assumeFalse(isAppendBlobTestSettingEnabled);
config.set(ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, Integer.toString(TEST_BUFFER_SIZE));
config.set(ConfigurationKeys.AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, Boolean.toString(enableSmallWriteOptimization));
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(
currentfs.getUri(), config);
formulateSmallWriteTestAppendPattern(fs, startingFileSize,
recurringClientWriteSize, numOfClientWrites,
directCloseTest, flushExpectedToBeMergedWithAppend);
}
/**
* if isDirectCloseTest == true, append + close is triggered
* if isDirectCloseTest == false, append + flush runs are repeated over
* iterations followed by close
* @param fs
* @param startingFileSize
* @param recurringWriteSize
* @param numOfWrites
* @param isDirectCloseTest
* @throws IOException
*/
private void formulateSmallWriteTestAppendPattern(final AzureBlobFileSystem fs,
int startingFileSize,
int recurringWriteSize,
int numOfWrites,
boolean isDirectCloseTest,
boolean flushExpectedToBeMergedWithAppend) throws IOException {
int totalDataToBeAppended = 0;
int testIteration = 0;
int dataWrittenPerIteration = (numOfWrites * recurringWriteSize);
if (isDirectCloseTest) {
totalDataToBeAppended = dataWrittenPerIteration;
testIteration = 1;
} else {
testIteration = TEST_FLUSH_ITERATION;
totalDataToBeAppended = testIteration * dataWrittenPerIteration;
}
int totalFileSize = totalDataToBeAppended + startingFileSize;
// write buffer of file size created. This will be used as write
// source and for file content validation
final byte[] writeBuffer = new byte[totalFileSize];
new Random().nextBytes(writeBuffer);
int writeBufferCursor = 0;
Path testPath = new Path(getMethodName() + UUID.randomUUID().toString());
FSDataOutputStream opStream;
if (startingFileSize > 0) {
writeBufferCursor += createFileWithStartingTestSize(fs, writeBuffer, writeBufferCursor, testPath,
startingFileSize);
opStream = fs.append(testPath);
} else {
opStream = fs.create(testPath);
}
final int writeBufferSize = fs.getAbfsStore()
.getAbfsConfiguration()
.getWriteBufferSize();
long expectedTotalRequestsMade = fs.getInstrumentationMap()
.get(CONNECTIONS_MADE.getStatName());
long expectedRequestsMadeWithData = fs.getInstrumentationMap()
.get(SEND_REQUESTS.getStatName());
long expectedBytesSent = fs.getInstrumentationMap()
.get(BYTES_SENT.getStatName());
AbfsClient client = fs.getAbfsStore().getClientHandler().getIngressClient();
while (testIteration > 0) {
// trigger recurringWriteSize appends over numOfWrites
writeBufferCursor += executeWritePattern(opStream, writeBuffer,
writeBufferCursor, numOfWrites, recurringWriteSize);
int numOfBuffersWrittenToStore = (int) Math.floor(
dataWrittenPerIteration / writeBufferSize);
int dataSizeWrittenToStore = numOfBuffersWrittenToStore * writeBufferSize;
int pendingDataToStore = dataWrittenPerIteration - dataSizeWrittenToStore;
expectedTotalRequestsMade += numOfBuffersWrittenToStore;
expectedRequestsMadeWithData += numOfBuffersWrittenToStore;
expectedBytesSent += dataSizeWrittenToStore;
if (isDirectCloseTest) {
opStream.close();
} else {
opStream.hflush();
}
boolean wasDataPendingToBeWrittenToServer = (pendingDataToStore > 0);
// Small write optimization will only work if
// a. config for small write optimization is on
// b. no buffer writes have been triggered since last flush
// c. there is some pending data in buffer to write to store
final boolean smallWriteOptimizationEnabled = fs.getAbfsStore()
.getAbfsConfiguration()
.isSmallWriteOptimizationEnabled();
boolean flushWillBeMergedWithAppend = smallWriteOptimizationEnabled
&& (numOfBuffersWrittenToStore == 0)
&& (wasDataPendingToBeWrittenToServer);
Assertions.assertThat(flushWillBeMergedWithAppend)
.describedAs(flushExpectedToBeMergedWithAppend
? "Flush was to be merged with Append"
: "Flush should not have been merged with Append")
.isEqualTo(flushExpectedToBeMergedWithAppend);
int totalAppendFlushCalls = (flushWillBeMergedWithAppend
? 1 // 1 append (with flush and close param)
: (wasDataPendingToBeWrittenToServer)
? 2 // 1 append + 1 flush (with close)
: (recurringWriteSize == 0 && client instanceof AbfsBlobClient)
? 0 // no flush or close on prefix mode blob
: 1); //1 flush (with close) // 1 flush (with close)
expectedTotalRequestsMade += totalAppendFlushCalls;
expectedRequestsMadeWithData += totalAppendFlushCalls;
expectedBytesSent += wasDataPendingToBeWrittenToServer
? pendingDataToStore
: 0;
assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade,
expectedRequestsMadeWithData, expectedBytesSent);
if (isDirectCloseTest) {
// stream already closed
validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
return;
}
testIteration--;
}
/**
* Above test iteration loop executes one of the below two patterns
* 1. Append + Close (triggers flush)
* 2. Append + Flush
* For both patters PutBlockList is complete in the iteration loop itself
* Hence with PrefixMode Blob, below close won't trigger any network call
*/
opStream.close();
if (client instanceof AbfsDfsClient) {
expectedTotalRequestsMade += 1;
expectedRequestsMadeWithData += 1;
}
// no change in expectedBytesSent
assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, expectedRequestsMadeWithData, expectedBytesSent);
validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
}
private int createFileWithStartingTestSize(AzureBlobFileSystem fs, byte[] writeBuffer,
int writeBufferCursor, Path testPath, int startingFileSize)
throws IOException {
FSDataOutputStream opStream = fs.create(testPath);
writeBufferCursor += executeWritePattern(opStream,
writeBuffer,
writeBufferCursor,
1,
startingFileSize);
opStream.close();
Assertions.assertThat(fs.getFileStatus(testPath).getLen())
.describedAs("File should be of size %d at the start of test.",
startingFileSize)
.isEqualTo(startingFileSize);
return writeBufferCursor;
}
private void validateStoreAppends(AzureBlobFileSystem fs,
Path testPath,
int totalFileSize,
byte[] bufferWritten)
throws IOException {
// Final validation
Assertions.assertThat(fs.getFileStatus(testPath).getLen())
.describedAs("File should be of size %d at the end of test.",
totalFileSize)
.isEqualTo(totalFileSize);
byte[] fileReadFromStore = new byte[totalFileSize];
fs.open(testPath).read(fileReadFromStore, 0, totalFileSize);
assertArrayEquals("Test file content incorrect", bufferWritten,
fileReadFromStore);
}
private void assertOpStats(Map<String, Long> metricMap,
long expectedTotalRequestsMade,
long expectedRequestsMadeWithData,
long expectedBytesSent) {
assertAbfsStatistics(CONNECTIONS_MADE, expectedTotalRequestsMade,
metricMap);
assertAbfsStatistics(SEND_REQUESTS, expectedRequestsMadeWithData,
metricMap);
assertAbfsStatistics(BYTES_SENT, expectedBytesSent, metricMap);
}
private int executeWritePattern(FSDataOutputStream opStream,
byte[] buffer,
int startOffset,
int writeLoopCount,
int writeSize)
throws IOException {
int dataSizeWritten = startOffset;
while (writeLoopCount > 0) {
opStream.write(buffer, startOffset, writeSize);
startOffset += writeSize;
writeLoopCount--;
}
dataSizeWritten = startOffset - dataSizeWritten;
return dataSizeWritten;
}
}