ITestAzureBlobFileSystemAppend.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AzureBlobIngressHandler;
import org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler;
import org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.test.LambdaTestUtils;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.anyString;
/**
* Test append operations.
*/
public class ITestAzureBlobFileSystemAppend extends
AbstractAbfsIntegrationTest {
private static final String TEST_FILE_PATH = "testfile";
private static final String TEST_FILE_PATH1 = "testfile1";
private static final String TEST_FOLDER_PATH = "testFolder";
private static final int TEN = 10;
private static final int TWENTY = 20;
private static final int THIRTY = 30;
private static final int HUNDRED = 100;
public ITestAzureBlobFileSystemAppend() throws Exception {
super();
}
@Test(expected = FileNotFoundException.class)
public void testAppendDirShouldFail() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
fs.mkdirs(filePath);
fs.append(filePath, 0).close();
}
@Test
public void testAppendWithLength0() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
try (FSDataOutputStream stream = fs.create(path(TEST_FILE_PATH))) {
final byte[] b = new byte[1024];
new Random().nextBytes(b);
stream.write(b, 1000, 0);
assertEquals(0, stream.getPos());
}
}
@Test(expected = FileNotFoundException.class)
public void testAppendFileAfterDelete() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
ContractTestUtils.touch(fs, filePath);
fs.delete(filePath, false);
fs.append(filePath).close();
}
@Test(expected = FileNotFoundException.class)
public void testAppendDirectory() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path folderPath = path(TEST_FOLDER_PATH);
fs.mkdirs(folderPath);
fs.append(folderPath).close();
}
@Test
public void testTracingForAppend() throws IOException {
AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
fs.create(testPath).close();
fs.registerListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, TEN);
}
@Test
public void testCloseOfDataBlockOnAppendComplete() throws Exception {
Set<String> blockBufferTypes = new HashSet<>();
blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
for (String blockBufferType : blockBufferTypes) {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration))) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
Mockito.doAnswer(getBlobFactoryInvocation -> {
DataBlocks.BlockFactory factory = Mockito.spy(
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
Mockito.doAnswer(factoryCreateInvocation -> {
dataBlock[0] = Mockito.spy(
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
return dataBlock[0];
})
.when(factory)
.create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
BlockUploadStatistics.class));
return factory;
}).when(store).getBlockFactory();
try (OutputStream os = fs.create(
new Path(getMethodName() + "_" + blockBufferType))) {
os.write(new byte[1]);
Assertions.assertThat(dataBlock[0].getState())
.describedAs(
"On write of data in outputStream, state should become Writing")
.isEqualTo(Writing);
os.close();
Mockito.verify(dataBlock[0], Mockito.times(1)).close();
Assertions.assertThat(dataBlock[0].getState())
.describedAs(
"On close of outputStream, state should become Closed")
.isEqualTo(Closed);
}
}
}
}
/**
* Creates a file over DFS and attempts to append over Blob.
* It should fallback to DFS when appending to the file fails.
*
* @throws IOException if an I/O error occurs.
*/
@Test
public void testCreateOverDfsAppendOverBlob() throws IOException {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
FSDataOutputStream outputStream = fs.append(testPath);
AzureIngressHandler ingressHandler
= ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler();
AbfsClient client = ingressHandler.getClient();
Assertions.assertThat(client)
.as("Blob client was not used before fallback")
.isInstanceOf(AbfsBlobClient.class);
outputStream.write(TEN);
outputStream.hsync();
outputStream.write(TWENTY);
outputStream.hsync();
outputStream.write(THIRTY);
outputStream.hsync();
AzureIngressHandler ingressHandlerFallback
= ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler();
AbfsClient clientFallback = ingressHandlerFallback.getClient();
Assertions.assertThat(clientFallback)
.as("DFS client was not used after fallback")
.isInstanceOf(AbfsDfsClient.class);
}
/**
* This test verifies that if multiple appends qualify for switch, no appends should fail.
*/
@Test
public void testMultipleAppendsQualifyForSwitch() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
// Create three output streams with different content length
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);
FSDataOutputStream out1 = fs.append(testPath);
FSDataOutputStream out2 = fs.append(testPath);
FSDataOutputStream out3 = fs.append(testPath);
// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(TEN);
out1.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out2.write(TWENTY);
out2.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out3.write(THIRTY);
out3.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
AzureIngressHandler ingressHandlerFallback
= ((AbfsOutputStream) out1.getWrappedStream()).getIngressHandler();
AbfsClient clientFallback = ingressHandlerFallback.getClient();
Assertions.assertThat(clientFallback)
.as("DFS client was not used after fallback")
.isInstanceOf(AbfsDfsClient.class);
}
/**
* This test verifies that parallel writes on dfs and blob endpoint should not fail.
*/
@Test
public void testParallelWritesOnDfsAndBlob() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
Path testPath1 = path(TEST_FILE_PATH1);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
FSDataOutputStream out1 = fs.create(testPath);
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath1).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
// Create three output streams with different content length
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);
FSDataOutputStream out2 = fs.append(testPath1);
// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(TEN);
out1.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out2.write(TWENTY);
out2.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
}
/**
* Creates a file over Blob and attempts to append over DFS.
* It should fallback to Blob when appending to the file fails.
*
* @throws IOException if an I/O error occurs.
*/
@Test
public void testCreateOverBlobAppendOverDfs() throws IOException {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
assumeDfsServiceType();
Configuration conf = getRawConfiguration();
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
String.valueOf(AbfsServiceType.DFS));
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
conf)) {
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore()
.getAbfsConfiguration()
.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE,
String.valueOf(AbfsServiceType.DFS));
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
FSDataOutputStream outputStream = fs.append(testPath);
outputStream.write(TEN);
outputStream.hsync();
outputStream.write(TWENTY);
outputStream.hsync();
outputStream.write(THIRTY);
outputStream.hsync();
}
}
/**
* Creates an Append Blob over Blob and attempts to append over DFS.
* It should fallback to Blob when appending to the file fails.
*
* @throws IOException if an I/O error occurs.
*/
@Test
public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
throws IOException, NoSuchFieldException, IllegalAccessException {
assumeDfsServiceType();
Configuration conf = getRawConfiguration();
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
String.valueOf(AbfsServiceType.DFS));
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(conf))) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
// Set abfsStore as our mocked value.
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
"abfsStore");
privateField.setAccessible(true);
privateField.set(fs, store);
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore()
.getAbfsConfiguration()
.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE,
String.valueOf(AbfsServiceType.DFS));
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, true, null,
null, getTestTracingContext(fs, true));
FSDataOutputStream outputStream = fs.append(testPath);
outputStream.write(TEN);
outputStream.hsync();
outputStream.write(TWENTY);
outputStream.hsync();
outputStream.write(THIRTY);
outputStream.hsync();
}
}
/**
* Creates an append Blob over DFS and attempts to append over Blob.
* It should fallback to DFS when appending to the file fails.
*
* @throws IOException if an I/O error occurs.
*/
@Test
public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
throws IOException, NoSuchFieldException, IllegalAccessException {
assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint");
final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
// Set abfsStore as our mocked value.
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
"abfsStore");
privateField.setAccessible(true);
privateField.set(fs, store);
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, true, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
FSDataOutputStream outputStream = fs.append(testPath);
AzureIngressHandler ingressHandler
= ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler();
AbfsClient client = ingressHandler.getClient();
Assertions.assertThat(client)
.as("Blob client was not used before fallback")
.isInstanceOf(AbfsBlobClient.class);
outputStream.write(TEN);
outputStream.hsync();
outputStream.write(TWENTY);
outputStream.hsync();
outputStream.write(THIRTY);
outputStream.flush();
AzureIngressHandler ingressHandlerFallback
= ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler();
AbfsClient clientFallback = ingressHandlerFallback.getClient();
Assertions.assertThat(clientFallback)
.as("DFS client was not used after fallback")
.isInstanceOf(AbfsDfsClient.class);
}
/**
* Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type.
*
* @throws IOException if an I/O error occurs
*/
@Test
public void testValidateIngressHandler() throws IOException {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
AbfsServiceType.BLOB.name());
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true,
false,
permissions, false, null,
null, getTestTracingContext(fs, true));
FSDataOutputStream outputStream = fs.append(testPath);
AzureIngressHandler ingressHandler
= ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler();
Assertions.assertThat(ingressHandler)
.as("Blob Ingress handler instance is not correct")
.isInstanceOf(AzureBlobIngressHandler.class);
AbfsClient client = ingressHandler.getClient();
Assertions.assertThat(client)
.as("Blob client was not used correctly")
.isInstanceOf(AbfsBlobClient.class);
Path testPath1 = new Path("testFile1");
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath1).toUri().getPath(), true,
false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name());
FSDataOutputStream outputStream1 = fs.append(testPath1);
AzureIngressHandler ingressHandler1
= ((AbfsOutputStream) outputStream1.getWrappedStream()).getIngressHandler();
Assertions.assertThat(ingressHandler1)
.as("DFS Ingress handler instance is not correct")
.isInstanceOf(AzureDFSIngressHandler.class);
AbfsClient client1 = ingressHandler1.getClient();
Assertions.assertThat(client1)
.as("Dfs client was not used correctly")
.isInstanceOf(AbfsDfsClient.class);
}
}
@Test(expected = FileNotFoundException.class)
public void testAppendImplicitDirectory() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path folderPath = new Path(TEST_FOLDER_PATH);
fs.mkdirs(folderPath);
fs.append(folderPath.getParent());
}
@Test(expected = FileNotFoundException.class)
public void testAppendFileNotExists() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final Path folderPath = new Path(TEST_FOLDER_PATH);
fs.append(folderPath);
}
/**
* Create directory over dfs endpoint and append over blob endpoint.
* Should return error as append is not supported for directory.
* **/
@Test(expected = IOException.class)
public void testCreateExplicitDirectoryOverDfsAppendOverBlob()
throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
final Path folderPath = path(TEST_FOLDER_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(folderPath).toUri().getPath(), false, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
FSDataOutputStream outputStream = fs.append(folderPath);
outputStream.write(TEN);
outputStream.hsync();
}
/**
* Recreate file between append and flush. Etag mismatch happens.
**/
@Test(expected = IOException.class)
public void testRecreateAppendAndFlush() throws IOException {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
fs.create(filePath);
Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
FSDataOutputStream outputStream = fs.append(filePath);
outputStream.write(TEN);
try (AzureBlobFileSystem fs1
= (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration());
FSDataOutputStream outputStream1 = fs1.create(filePath)) {
outputStream.hsync();
}
}
/**
* Recreate directory between append and flush. Etag mismatch happens.
**/
@Test(expected = IOException.class)
public void testRecreateDirectoryAppendAndFlush() throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
fs.create(filePath);
FSDataOutputStream outputStream = fs.append(filePath);
outputStream.write(TEN);
try (AzureBlobFileSystem fs1
= (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration())) {
fs1.mkdirs(filePath);
outputStream.hsync();
}
}
/**
* Verify that parallel write with same offset from different output streams will not throw exception.
**/
@Test
public void testParallelWriteSameOffsetDifferentOutputStreams()
throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
final byte[] b = new byte[8 * ONE_MB];
new Random().nextBytes(b);
final Path filePath = path(TEST_FILE_PATH);
// Create three output streams
FSDataOutputStream out1 = fs.create(filePath);
FSDataOutputStream out2 = fs.append(filePath);
FSDataOutputStream out3 = fs.append(filePath);
// Submit tasks to write to each output stream with the same offset
futures.add(executorService.submit(() -> {
try {
out1.write(b, TEN, 2 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out2.write(b, TEN, 2 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out3.write(b, TEN, 2 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
}
}
/**
* Verify that parallel write for different content length will not throw exception.
**/
@Test
public void testParallelWriteDifferentContentLength() throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
try (FileSystem fs = FileSystem.newInstance(configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
final Path filePath = path(TEST_FILE_PATH);
// Create three output streams with different content length
FSDataOutputStream out1 = fs.create(filePath);
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);
FSDataOutputStream out2 = fs.append(filePath);
FSDataOutputStream out3 = fs.append(filePath);
// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(b1, TEN, 2 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out2.write(b1, TWENTY, 3 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out3.write(b1, THIRTY, 4 * HUNDRED);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
}
}
/**
* Verify that parallel write for different content length will not throw exception.
**/
@Test
public void testParallelWriteOutputStreamClose() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
AzureBlobFileSystem fs = getFileSystem();
final Path secondarytestfile = new Path("secondarytestfile");
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Future<?>> futures = new ArrayList<>();
FSDataOutputStream out1 = fs.create(secondarytestfile);
Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
AbfsOutputStream outputStream1 = (AbfsOutputStream) out1.getWrappedStream();
String fileETag = outputStream1.getIngressHandler().getETag();
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);
final byte[] b2 = new byte[8 * ONE_MB];
new Random().nextBytes(b2);
FSDataOutputStream out2 = fs.append(secondarytestfile);
// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(b1, 0, 2 * HUNDRED);
out1.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
out2.write(b2, 0, 4 * HUNDRED);
out2.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 1);
// Validate that the data written in the buffer is the same as what was read
final byte[] readBuffer = new byte[8 * ONE_MB];
int result;
FSDataInputStream inputStream = fs.open(secondarytestfile);
inputStream.seek(0);
AbfsOutputStream outputStream2 = (AbfsOutputStream) out1.getWrappedStream();
String out1Etag = outputStream2.getIngressHandler().getETag();
AbfsOutputStream outputStream3 = (AbfsOutputStream) out2.getWrappedStream();
String out2Etag = outputStream3.getIngressHandler().getETag();
if (!fileETag.equals(out1Etag)) {
result = inputStream.read(readBuffer, 0, 4 * ONE_MB);
assertEquals(result, 2 * HUNDRED); // Verify that the number of bytes read matches the number of bytes written
assertArrayEquals(
Arrays.copyOfRange(readBuffer, 0, result), Arrays.copyOfRange(b1, 0,
result)); // Verify that the data read matches the original data written
} else if (!fileETag.equals(out2Etag)) {
result = inputStream.read(readBuffer, 0, 4 * ONE_MB);
assertEquals(result, 4 * HUNDRED); // Verify that the number of bytes read matches the number of bytes written
assertArrayEquals(Arrays.copyOfRange(readBuffer, 0, result),
Arrays.copyOfRange(b2, 0,
result)); // Verify that the data read matches the original data written
} else {
fail("Neither out1 nor out2 was flushed successfully.");
}
}
/**
* Verify that once flushed etag changes.
**/
@Test
public void testEtagMismatch() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
FSDataOutputStream out1 = fs.create(filePath);
FSDataOutputStream out2 = fs.create(filePath);
Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
out2.write(TEN);
out2.hsync();
out1.write(TEN);
intercept(IOException.class, () -> out1.hsync());
}
@Test
public void testAppendWithLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()),
TEST_FILE_PATH);
final AzureBlobFileSystem fs = Mockito.spy(
getCustomFileSystem(testFilePath.getParent(), 1));
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
FsAction.NONE);
AbfsOutputStream outputStream = (AbfsOutputStream) fs.getAbfsStore()
.createFile(testFilePath, null, true,
permission, umask, getTestTracingContext(fs, true));
outputStream.write(TEN);
outputStream.close();
assertNotNull(outputStream.getLeaseId());
}
private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs,
int numLeaseThreads) throws Exception {
Configuration conf = getRawConfiguration();
conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()),
true);
conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
FileSystem fileSystem = FileSystem.newInstance(conf);
return (AzureBlobFileSystem) fileSystem;
}
@Test
public void testAppendImplicitDirectoryAzcopy() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
createAzCopyFolder(new Path("/src"));
createAzCopyFile(new Path("/src/file"));
intercept(FileNotFoundException.class, () -> fs.append(new Path("/src")));
}
/**
* If a write operation fails asynchronously, when the next write comes once failure is
* registered, that operation would fail with the exception caught on previous
* write operation.
* The next close, hsync, hflush would also fail for the last caught exception.
*/
@Test
public void testIntermittentAppendFailureToBeReported() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
Mockito.doThrow(
new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception()))
.when(blobClient)
.append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
AppendRequestParameters.class), Mockito.any(), Mockito.any(),
Mockito.any(TracingContext.class));
byte[] bytes = new byte[1024 * 1024 * 8];
new Random().nextBytes(bytes);
LambdaTestUtils.intercept(IOException.class, () -> {
try (FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"), blobClient)) {
os.write(bytes);
}
});
LambdaTestUtils.intercept(IOException.class, () -> {
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file/file1"), blobClient);
os.write(bytes);
os.close();
});
LambdaTestUtils.intercept(IOException.class, () -> {
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file/file2"), blobClient);
os.write(bytes);
os.hsync();
});
LambdaTestUtils.intercept(IOException.class, () -> {
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file/file3"), blobClient);
os.write(bytes);
os.hflush();
});
LambdaTestUtils.intercept(IOException.class, () -> {
AbfsOutputStream os = (AbfsOutputStream) createMockedOutputStream(fs,
new Path("/test/file/file4"), blobClient).getWrappedStream();
os.write(bytes);
while (!os.areWriteOperationsTasksDone()) {
// No operation inside the loop
}
os.write(bytes);
});
}
}
/**
* Creates a mocked FSDataOutputStream for testing purposes.
*
* This method creates a mocked FSDataOutputStream by wrapping an AbfsOutputStream
* and its associated AzureIngressHandler. The method uses Mockito to create spies
* for the AbfsOutputStream and AzureIngressHandler, and sets up the necessary
* interactions between them.
*
* @param fs The AzureBlobFileSystem instance used to create the output stream.
* @param path The Path where the output stream will be created.
* @param client The AbfsClient instance to be used by the AzureIngressHandler.
* @return A mocked FSDataOutputStream instance.
* @throws IOException If an I/O error occurs while creating the output stream.
*/
private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
Path path,
AbfsClient client) throws IOException {
AbfsOutputStream abfsOutputStream = Mockito.spy(
(AbfsOutputStream) fs.create(path).getWrappedStream());
AzureIngressHandler ingressHandler = Mockito.spy(
abfsOutputStream.getIngressHandler());
Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
Mockito.doReturn(client).when(ingressHandler).getClient();
FSDataOutputStream fsDataOutputStream = Mockito.spy(
new FSDataOutputStream(abfsOutputStream, null));
return fsDataOutputStream;
}
/**
* Test to check when async write takes time, the close, hsync, hflush method
* wait to get async ops completed and then flush. If async ops fail, the methods
* will throw exception.
*/
@Test
public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient());
AbfsClient client = clientHandler.getIngressClient();
if (clientHandler.getIngressClient() instanceof AbfsBlobClient) {
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
} else {
Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient();
Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient();
}
Mockito.doReturn(clientHandler).when(store).getClientHandler();
byte[] bytes = new byte[1024 * 1024 * 8];
new Random().nextBytes(bytes);
AtomicInteger count = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
count.incrementAndGet();
while (count.get() < 2) {
// No operation inside the loop
}
Thread.sleep(10 * HUNDRED);
throw new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "",
new Exception());
})
.when(client instanceof AbfsBlobClient ? blobClient : dfsClient)
.append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
AppendRequestParameters.class), Mockito.any(), Mockito.any(),
Mockito.any(TracingContext.class));
Mockito.doAnswer(answer -> {
count.incrementAndGet();
while (count.get() < 2) {
// No operation inside the loop
}
Thread.sleep(10 * HUNDRED);
throw new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "",
new Exception());
})
.when(client instanceof AbfsBlobClient ? blobClient : dfsClient)
.append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
AppendRequestParameters.class), Mockito.any(), Mockito.any(),
Mockito.any(TracingContext.class));
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"),
client instanceof AbfsBlobClient ? blobClient : dfsClient);
os.write(bytes);
os.write(bytes);
LambdaTestUtils.intercept(IOException.class, os::close);
count.set(0);
FSDataOutputStream os1 = createMockedOutputStream(fs,
new Path("/test/file1"),
client instanceof AbfsBlobClient ? blobClient : dfsClient);
os1.write(bytes);
os1.write(bytes);
LambdaTestUtils.intercept(IOException.class, os1::hsync);
count.set(0);
FSDataOutputStream os2 = createMockedOutputStream(fs,
new Path("/test/file2"),
client instanceof AbfsBlobClient ? blobClient : dfsClient);
os2.write(bytes);
os2.write(bytes);
LambdaTestUtils.intercept(IOException.class, os2::hflush);
}
}
/**
* Test to simulate a successful flush operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the flush operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the flush
* operation and the connection reset. It then verifies that the flush
* operation is retried once before succeeding if the md5hash matches.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();
// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
AtomicInteger flushCount = new AtomicInteger(0);
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"), blobClient);
AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream();
String eTag = out.getIngressHandler().getETag();
byte[] bytes = new byte[1024 * 1024 * 4];
new Random().nextBytes(bytes);
// Write some bytes and attempt to flush, which should retry
out.write(bytes);
String blockId = out.getBlockManager().getActiveBlock().getBlockId();
String blockListXml = generateBlockListXml(blockId);
Mockito.doAnswer(answer -> {
// Set up the mock for the flush operation
AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient,
eTag, blockListXml, out,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();
int currentCount = flushCount.incrementAndGet();
if (currentCount == 1) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);
return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).flush(
Mockito.any(byte[].class),
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.nullable(String.class),
Mockito.nullable(String.class),
Mockito.anyString(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.any(TracingContext.class), Mockito.nullable(String.class)
);
out.hsync();
out.close();
Mockito.verify(blobClient, Mockito.times(1)).flush(
Mockito.any(byte[].class),
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.nullable(String.class),
Mockito.nullable(String.class),
Mockito.anyString(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.any(TracingContext.class), Mockito.nullable(String.class));
}
}
/**
* Test to simulate a successful flush operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the flush operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the flush
* operation and the connection reset. It then verifies that the flush
* operation is retried once before succeeding if the md5hash matches.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();
// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
AtomicInteger flushCount = new AtomicInteger(0);
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"), blobClient);
AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream();
String eTag = out.getIngressHandler().getETag();
byte[] bytes = new byte[1024 * 1024 * 4];
new Random().nextBytes(bytes);
// Write some bytes and attempt to flush, which should retry
out.write(bytes);
String blockId = out.getBlockManager().getActiveBlock().getBlockId();
String blockListXml = generateBlockListXml(blockId);
Mockito.doAnswer(answer -> {
// Set up the mock for the flush operation
AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient,
eTag, blockListXml, out,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();
int currentCount = flushCount.incrementAndGet();
if (currentCount == 1) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
} else if (currentCount == 2) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(HTTP_OK);
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("HTTP_OK");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);
return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).flush(
Mockito.any(byte[].class),
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.nullable(String.class),
Mockito.nullable(String.class),
Mockito.anyString(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.any(TracingContext.class), Mockito.nullable(String.class)
);
FSDataOutputStream os1 = createMockedOutputStream(fs,
new Path("/test/file"), blobClient);
AbfsOutputStream out1 = (AbfsOutputStream) os1.getWrappedStream();
byte[] bytes1 = new byte[1024 * 1024 * 8];
new Random().nextBytes(bytes1);
out1.write(bytes1);
//parallel flush call should lead to the first call failing because of md5 mismatch.
Thread parallelFlushThread = new Thread(() -> {
try {
out1.hsync();
} catch (IOException e) {
}
});
parallelFlushThread.start(); // Start the parallel flush operation
parallelFlushThread.join();
// Perform the first flush operation
intercept(IOException.class,
"The condition specified using HTTP conditional header(s) is not met.",
out::hsync
);
}
}
}