ITestAzureBlobFileSystemFlush.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APPEND_BLOB_KEY;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities;
/**
* Test flush operation.
* This class cannot be run in parallel test mode--check comments in
* testWriteHeavyBytesToFileSyncFlush().
*/
public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int BASE_SIZE = 1024;
private static final int ONE_THOUSAND = 1000;
//3000 KB to support appenblob too
private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE;
private static final int ONE_MB = 1024 * 1024;
private static final int FLUSH_TIMES = 200;
private static final int THREAD_SLEEP_TIME = 1000;
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
private static final int WAITING_TIME = 1000;
public ITestAzureBlobFileSystemFlush() throws Exception {
super();
}
@Test
public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path testFilePath = path(methodName.getMethodName());
final byte[] b;
try (FSDataOutputStream stream = fs.create(testFilePath)) {
b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
stream.write(b);
for (int j = 0; j < FLUSH_TIMES; j++) {
stream.flush();
Thread.sleep(10);
}
}
}
final byte[] r = new byte[TEST_BUFFER_SIZE];
try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
while (inputStream.available() != 0) {
int result = inputStream.read(r);
assertNotEquals("read returned -1", -1, result);
assertArrayEquals("buffer read from stream", r, b);
}
}
}
@Test
public void testAbfsOutputStreamSyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path testFilePath = path(methodName.getMethodName());
final byte[] b;
try (FSDataOutputStream stream = fs.create(testFilePath)) {
b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
stream.write(b);
for (int i = 0; i < FLUSH_TIMES; i++) {
stream.hsync();
stream.hflush();
Thread.sleep(10);
}
}
final byte[] r = new byte[TEST_BUFFER_SIZE];
try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
int result = inputStream.read(r);
assertNotEquals(-1, result);
assertArrayEquals(r, b);
}
}
@Test
public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path testFilePath = path(methodName.getMethodName());
ExecutorService es;
try (FSDataOutputStream stream = fs.create(testFilePath)) {
es = Executors.newFixedThreadPool(10);
final byte[] b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
List<Future<Void>> tasks = new ArrayList<>();
for (int i = 0; i < FLUSH_TIMES; i++) {
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
stream.write(b);
return null;
}
};
tasks.add(es.submit(callable));
}
boolean shouldStop = false;
while (!shouldStop) {
shouldStop = true;
for (Future<Void> task : tasks) {
if (!task.isDone()) {
stream.hsync();
shouldStop = false;
Thread.sleep(THREAD_SLEEP_TIME);
}
}
}
tasks.clear();
}
es.shutdownNow();
FileStatus fileStatus = fs.getFileStatus(testFilePath);
long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES;
assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen());
}
@Test
public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
ExecutorService es = Executors.newFixedThreadPool(10);
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = fs.create(testFilePath)) {
final byte[] b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
List<Future<Void>> tasks = new ArrayList<>();
for (int i = 0; i < FLUSH_TIMES; i++) {
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
stream.write(b);
return null;
}
};
tasks.add(es.submit(callable));
}
boolean shouldStop = false;
while (!shouldStop) {
shouldStop = true;
for (Future<Void> task : tasks) {
if (!task.isDone()) {
stream.flush();
shouldStop = false;
}
}
}
Thread.sleep(THREAD_SLEEP_TIME);
tasks.clear();
}
es.shutdownNow();
FileStatus fileStatus = fs.getFileStatus(testFilePath);
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
}
@Test
public void testFlushWithOutputStreamFlushEnabled() throws Exception {
testFlush(false);
}
@Test
public void testFlushWithOutputStreamFlushDisabled() throws Exception {
testFlush(true);
}
private void testFlush(boolean disableOutputStreamFlush) throws Exception {
final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
// Simulate setting "fs.azure.disable.outputstream.flush" to true or false
fs.getAbfsStore().getAbfsConfiguration()
.setDisableOutputStreamFlush(disableOutputStreamFlush);
final Path testFilePath = path(methodName.getMethodName());
byte[] buffer = getRandomBytesArray();
// The test case must write "fs.azure.write.request.size" bytes
// to the stream in order for the data to be uploaded to storage.
assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
<= buffer.length);
boolean isAppendBlob = true;
if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = false;
}
try (FSDataOutputStream stream = fs.create(testFilePath)) {
stream.write(buffer);
// Write asynchronously uploads data, so we must wait for completion
AbfsOutputStream abfsStream = (AbfsOutputStream) stream
.getWrappedStream();
abfsStream.waitForPendingUploads();
// Flush commits the data so it can be read.
stream.flush();
// Verify that the data can be read if disableOutputStreamFlush is
// false; and otherwise cannot be read.
/* For Appendlob flush is not needed to update data on server */
validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush || isAppendBlob);
}
}
@Test
public void testHflushWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
String fileName = UUID.randomUUID().toString();
final Path testFilePath = path(fileName);
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
stream.hflush();
validate(fs, testFilePath, buffer, true);
}
}
@Test
public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = true;
}
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hflush();
/* For Appendlob flush is not needed to update data on server */
validate(fs, testFilePath, buffer, isAppendBlob);
}
}
@Test
public void testHsyncWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
stream.hsync();
validate(fs, testFilePath, buffer, true);
}
}
@Test
public void testTracingHeaderForAppendBlob() throws Exception {
Configuration config = new Configuration(this.getRawConfiguration());
config.set(FS_AZURE_APPEND_BLOB_KEY, "abfss:/");
config.set(TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED, "true");
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(config);
byte[] buf = new byte[10];
new Random().nextBytes(buf);
try (FSDataOutputStream out = fs.create(new Path("/testFile"))) {
((AbfsOutputStream) out.getWrappedStream()).registerListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
((AbfsOutputStream) out.getWrappedStream()).getStreamID()));
out.write(buf);
out.hsync();
}
}
@Test
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
assertLacksStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
}
}
@Test
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
assertHasStreamCapabilities(stream,
StreamCapabilities.HFLUSH,
StreamCapabilities.HSYNC);
assertLacksStreamCapabilities(stream,
StreamCapabilities.DROPBEHIND,
StreamCapabilities.READAHEAD,
StreamCapabilities.UNBUFFER);
}
}
@Test
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
isAppendBlob = true;
}
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hsync();
/* For Appendlob flush is not needed to update data on server */
validate(fs, testFilePath, buffer, isAppendBlob);
}
}
private byte[] getRandomBytesArray() {
final byte[] b = new byte[TEST_FILE_LENGTH];
new Random().nextBytes(b);
return b;
}
private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException {
fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
FSDataOutputStream stream = fs.create(path);
stream.write(buffer);
return stream;
}
private void validate(InputStream stream, byte[] writeBuffer, boolean isEqual)
throws IOException {
try {
byte[] readBuffer = new byte[writeBuffer.length];
int numBytesRead = stream.read(readBuffer, 0, readBuffer.length);
if (isEqual) {
assertArrayEquals(
"Bytes read do not match bytes written.",
writeBuffer,
readBuffer);
} else {
assertThat(
"Bytes read unexpectedly match bytes written.",
readBuffer,
IsNot.not(IsEqual.equalTo(writeBuffer)));
}
} finally {
stream.close();
}
}
private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
String filePath = path.toUri().toString();
try (FSDataInputStream inputStream = fs.open(path)) {
byte[] readBuffer = new byte[TEST_FILE_LENGTH];
int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
if (isEqual) {
assertArrayEquals(
String.format("Bytes read do not match bytes written to %1$s", filePath), writeBuffer, readBuffer);
} else {
assertThat(
String.format("Bytes read unexpectedly match bytes written to %1$s",
filePath),
readBuffer,
IsNot.not(IsEqual.equalTo(writeBuffer)));
}
}
}
}