ITestAzureBlobFileSystemCreate.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.ReflectionUtils;
import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test create operation.
*/
public class ITestAzureBlobFileSystemCreate extends
AbstractAbfsIntegrationTest {
private static final Path TEST_FILE_PATH = new Path("testfile");
private static final String TEST_FOLDER_PATH = "testFolder";
private static final String TEST_CHILD_FILE = "childFile";
public ITestAzureBlobFileSystemCreate() throws Exception {
super();
}
@Test
public void testEnsureFileCreatedImmediately() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try {
assertIsFile(fs, TEST_FILE_PATH);
} finally {
out.close();
}
assertIsFile(fs, TEST_FILE_PATH);
}
@Test
@SuppressWarnings("deprecation")
public void testCreateNonRecursive() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFolderPath = path(TEST_FOLDER_PATH);
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
try {
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null);
fail("Should've thrown");
} catch (FileNotFoundException expected) {
}
fs.registerListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.MKDIR, false, 0));
fs.mkdirs(testFolderPath);
fs.registerListener(null);
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
.close();
assertIsFile(fs, testFile);
}
@Test
@SuppressWarnings("deprecation")
public void testCreateNonRecursive1() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFolderPath = path(TEST_FOLDER_PATH);
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
try {
fs.createNonRecursive(testFile, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1,
1024, null);
fail("Should've thrown");
} catch (FileNotFoundException expected) {
}
fs.mkdirs(testFolderPath);
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
.close();
assertIsFile(fs, testFile);
}
@Test
@SuppressWarnings("deprecation")
public void testCreateNonRecursive2() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFolderPath = path(TEST_FOLDER_PATH);
Path testFile = new Path(testFolderPath, TEST_CHILD_FILE);
try {
fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024,
(short) 1, 1024, null);
fail("Should've thrown");
} catch (FileNotFoundException e) {
}
fs.mkdirs(testFolderPath);
fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
.close();
assertIsFile(fs, testFile);
}
/**
* Test createNonRecursive when parent exist.
*
* @throws Exception in case of failure
*/
@Test
public void testCreateNonRecursiveWhenParentExist() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path createDirectoryPath = new Path("hbase/A");
fs.mkdirs(createDirectoryPath);
fs.createNonRecursive(new Path(createDirectoryPath, "B"), FsPermission
.getDefault(), false, 1024,
(short) 1, 1024, null);
Assertions.assertThat(fs.exists(new Path(createDirectoryPath, "B")))
.describedAs("File should be created").isTrue();
fs.close();
}
/**
* Test createNonRecursive when parent does not exist.
*
* @throws Exception in case of failure
*/
@Test
public void testCreateNonRecursiveWhenParentNotExist() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path createDirectoryPath = new Path("A/");
fs.mkdirs(createDirectoryPath);
intercept(FileNotFoundException.class,
() -> fs.createNonRecursive(new Path("A/B/C"), FsPermission
.getDefault(), false, 1024, (short) 1, 1024, null));
Assertions.assertThat(fs.exists(new Path("A/B/C")))
.describedAs("New File should not be created.").isFalse();
fs.close();
}
/**
* Helper method to create a json file.
* @param path parent path
* @param renameJson rename json path
*
* @return file system
* @throws IOException in case of failure
*/
private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) throws IOException {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
doReturn(store).when(fs).getAbfsStore();
AbfsClient client = Mockito.spy(store.getClient());
doReturn(client).when(store).getClient();
fs.setWorkingDirectory(new Path(ROOT_PATH));
fs.mkdirs(new Path(path, "test3"));
VersionedFileStatus fileStatus
= (VersionedFileStatus) fs.getFileStatus(path);
new RenameAtomicity(path,
new Path("/hbase/test4"), renameJson,
getTestTracingContext(fs, true), fileStatus.getEtag(),
client).preRename();
Assertions.assertThat(fs.exists(renameJson))
.describedAs("Rename Pending Json file should exist.")
.isTrue();
return fs;
}
/**
* Test createNonRecursive when parent does not exist and rename pending exists.
* Rename redo should fail.
* Json file should be deleted.
* No new File creation.
*
* @throws Exception in case of failure
*/
@Test
public void testCreateNonRecursiveWhenParentNotExistAndRenamePendingExist() throws Exception {
AzureBlobFileSystem fs = null;
try {
Path path = new Path("/hbase/test1/test2");
Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
fs = createJsonFile(path, renameJson);
fs.delete(path, true);
Assertions.assertThat(fs.exists(renameJson)).isTrue();
AzureBlobFileSystem finalFs = fs;
intercept(FileNotFoundException.class,
() -> finalFs.createNonRecursive(new Path(path, "test4"), FsPermission
.getDefault(), false, 1024, (short) 1, 1024, null));
Assertions.assertThat(finalFs.exists(new Path(path, "test4")))
.describedAs("New File should not be created.")
.isFalse();
Assertions.assertThat(finalFs.exists(renameJson))
.describedAs("Rename Pending Json file should be deleted.")
.isFalse();
} finally {
if (fs != null) {
fs.close();
}
}
}
/**
* Test createNonRecursive when parent and rename pending exist.
* Rename redo should be successful.
* Json file should be deleted.
* No file should be created.
*
* @throws Exception in case of failure
*/
@Test
public void testCreateNonRecursiveWhenParentAndRenamePendingExist() throws Exception {
AzureBlobFileSystem fs = null;
try {
Path path = new Path("/hbase/test1/test2");
Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
fs = createJsonFile(path, renameJson);
AzureBlobFileSystem finalFs = fs;
intercept(FileNotFoundException.class,
() -> finalFs.createNonRecursive(new Path(path, "test4"), FsPermission
.getDefault(), false, 1024, (short) 1, 1024, null));
Assertions.assertThat(finalFs.exists(path))
.describedAs("Old path should be deleted.")
.isFalse();
Assertions.assertThat(finalFs.exists(new Path(path, "test4")))
.describedAs("New File should not be created.")
.isFalse();
Assertions.assertThat(finalFs.exists(renameJson))
.describedAs("Rename Pending Json file should be deleted.")
.isFalse();
Assertions.assertThat(finalFs.exists(new Path("/hbase/test4")))
.describedAs("Rename should be successful.")
.isTrue();
} finally {
if (fs != null) {
fs.close();
}
}
}
@Test
public void testCreateOnRoot() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFile = path(AbfsHttpConstants.ROOT_PATH);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () ->
fs.create(testFile, true));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}
ex = intercept(AbfsRestOperationException.class, () ->
fs.createNonRecursive(testFile, FsPermission.getDefault(),
false, 1024, (short) 1, 1024, null));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}
}
/**
* Attempts to use to the ABFS stream after it is closed.
*/
@Test
public void testWriteAfterClose() throws Throwable {
final AzureBlobFileSystem fs = getFileSystem();
Path testFolderPath = path(TEST_FOLDER_PATH);
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
FSDataOutputStream out = fs.create(testPath);
out.close();
intercept(IOException.class, () -> out.write('a'));
intercept(IOException.class, () -> out.write(new byte[]{'a'}));
// hsync is not ignored on a closed stream
// out.hsync();
out.flush();
out.close();
}
/**
* Attempts to double close an ABFS output stream from within a
* FilterOutputStream.
* That class handles a double failure on close badly if the second
* exception rethrows the first.
*/
@Test
public void testTryWithResources() throws Throwable {
final AzureBlobFileSystem fs = getFileSystem();
Path testFolderPath = path(TEST_FOLDER_PATH);
Path testPath = new Path(testFolderPath, TEST_CHILD_FILE);
try (FSDataOutputStream out = fs.create(testPath)) {
out.write('1');
out.hsync();
// this will cause the next write to failAll
fs.delete(testPath, false);
out.write('2');
out.hsync();
fail("Expected a failure");
} catch (IOException fnfe) {
//appendblob outputStream does not generate suppressed exception on close as it is
//single threaded code
if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testPath).toString())) {
// the exception raised in close() must be in the caught exception's
// suppressed list
Throwable[] suppressed = fnfe.getSuppressed();
Assertions.assertThat(suppressed.length)
.describedAs("suppressed count should be 1").isEqualTo(1);
Throwable inner = suppressed[0];
if (!(inner instanceof IOException)) {
throw inner;
}
GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
}
}
}
/**
* Attempts to write to the azure stream after it is closed will raise
* an IOException.
*/
@Test
public void testFilterFSWriteAfterClose() throws Throwable {
try (AzureBlobFileSystem fs = getFileSystem()) {
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
FSDataOutputStream out = fs.create(testPath);
intercept(IOException.class,
() -> {
try (FilterOutputStream fos = new FilterOutputStream(out)) {
byte[] bytes = new byte[8 * ONE_MB];
fos.write(bytes);
fos.write(bytes);
fos.flush();
out.hsync();
fs.delete(testPath, false);
// trigger the first failure
throw intercept(IOException.class,
() -> {
fos.write('b');
out.hsync();
return "hsync didn't raise an IOE";
});
}
});
}
}
/**
* Tests if the number of connections made for:
* 1. create overwrite=false of a file that doesnt pre-exist
* 2. create overwrite=false of a file that pre-exists
* 3. create overwrite=true of a file that doesnt pre-exist
* 4. create overwrite=true of a file that pre-exists
* matches the expectation when run against both combinations of
* fs.azure.enable.conditional.create.overwrite=true and
* fs.azure.enable.conditional.create.overwrite=false
* @throws Throwable
*/
@Test
public void testDefaultCreateOverwriteFileTest() throws Throwable {
testCreateFileOverwrite(true);
testCreateFileOverwrite(false);
}
public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(enableConditionalCreateOverwrite));
AzureBlobFileSystemStore store = currentFs.getAbfsStore();
AbfsClient client = store.getClientHandler().getIngressClient();
try (AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config)) {
long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
.get(CONNECTIONS_MADE.getStatName());
int createRequestCount = 0;
final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
+ UUID.randomUUID().toString());
// Case 1: Not Overwrite - File does not pre-exist
// create should be successful
fs.create(nonOverwriteFile, false);
// One request to server to create path should be issued
// two calls added for -
// 1. getFileStatus on DFS endpoint : 1
// getFileStatus on Blob endpoint: 1 ListBlobcall
// 2. actual create call: 1
createRequestCount += (
client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
? 2
: 1);
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
// Case 2: Not Overwrite - File pre-exists
fs.registerListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE, false, 0));
intercept(FileAlreadyExistsException.class,
() -> fs.create(nonOverwriteFile, false));
fs.registerListener(null);
// One request to server to create path should be issued
// Only single tryGetFileStatus should happen
// 1. getFileStatus on DFS endpoint : 1
// getFileStatus on Blob endpoint: 1 (No Additional List blob call as file exists)
createRequestCount += (
client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
? 2
: 1);
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
+ UUID.randomUUID().toString());
// Case 3: Overwrite - File does not pre-exist
// create should be successful
fs.create(overwriteFilePath, true);
/// One request to server to create path should be issued
// two calls added for -
// 1. getFileStatus on DFS endpoint : 1
// getFileStatus on Blob endpoint: 1 ListBlobCall + 1 GPS
// 2. actual create call: 1
// 1 extra call when conditional overwrite is not enabled to check for empty directory
createRequestCount += (client instanceof AbfsBlobClient
&& !getIsNamespaceEnabled(fs))
? (enableConditionalCreateOverwrite ? 2 : 3)
: 1;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
// Case 4: Overwrite - File pre-exists
fs.registerListener(new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.CREATE, true, 0));
fs.create(overwriteFilePath, true);
fs.registerListener(null);
createRequestCount += (
client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)
? 1
: 0);
// Second actual create call will hap
if (enableConditionalCreateOverwrite) {
// Three requests will be sent to server to create path,
// 1. create without overwrite
// 2. GetFileStatus to get eTag
// 3. create with overwrite
createRequestCount += 3;
} else {
createRequestCount += (client instanceof AbfsBlobClient
&& !getIsNamespaceEnabled(fs)) ? 2 : 1;
}
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
}
}
}
/**
* Test negative scenarios with Create overwrite=false as default
* With create overwrite=true ending in 3 calls:
* A. Create overwrite=false
* B. GFS
* C. Create overwrite=true
*
* Scn1: A fails with HTTP409, leading to B which fails with HTTP404,
* detect parallel access
* Scn2: A fails with HTTP409, leading to B which fails with HTTP500,
* fail create with HTTP500
* Scn3: A fails with HTTP409, leading to B and then C,
* which fails with HTTP412, detect parallel access
* Scn4: A fails with HTTP409, leading to B and then C,
* which fails with HTTP500, fail create with HTTP500
* Scn5: A fails with HTTP500, fail create with HTTP500
*/
@Test
public void testNegativeScenariosForCreateOverwriteDisabled()
throws Throwable {
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(true));
try (AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config)) {
// Get mock AbfsClient with current config
AbfsClient
mockClient
= ITestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
fs.getAbfsStore().getAbfsConfiguration());
AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
when(clientHandler.getIngressClient()).thenReturn(mockClient);
when(clientHandler.getClient(Mockito.any())).thenReturn(mockClient);
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore,
"clientHandler", clientHandler);
ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore,
"client", mockClient);
AbfsRestOperation successOp = mock(
AbfsRestOperation.class);
AbfsHttpOperation http200Op = mock(
AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
when(successOp.getResult()).thenReturn(http200Op);
AbfsRestOperationException conflictResponseEx
= getMockAbfsRestOperationException(HTTP_CONFLICT);
AbfsRestOperationException serverErrorResponseEx
= getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
AbfsRestOperationException fileNotFoundResponseEx
= getMockAbfsRestOperationException(HTTP_NOT_FOUND);
AbfsRestOperationException preConditionResponseEx
= getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
doCallRealMethod().when(mockClient)
.conditionalCreateOverwriteFile(anyString(),
Mockito.nullable(FileSystem.Statistics.class),
Mockito.nullable(AzureBlobFileSystemStore.Permissions.class),
anyBoolean(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.nullable(TracingContext.class));
// mock for overwrite=false
doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
.doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
.doThrow(
conflictResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
conflictResponseEx) // Scn4: create overwrite=true fails with Http500
.doThrow(
serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
.when(mockClient)
.createPath(any(String.class), eq(true), eq(false),
any(AzureBlobFileSystemStore.Permissions.class),
any(boolean.class), eq(null), any(),
any(TracingContext.class));
doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
.doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
.doReturn(
successOp) // Scn3: create overwrite=true fails with Http412
.doReturn(
successOp) // Scn4: create overwrite=true fails with Http500
.when(mockClient)
.getPathStatus(any(String.class), eq(false),
any(TracingContext.class), nullable(
ContextEncryptionAdapter.class));
// mock for overwrite=true
doThrow(
preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
.when(mockClient)
.createPath(any(String.class), eq(true), eq(true),
any(AzureBlobFileSystemStore.Permissions.class),
any(boolean.class), eq(null), any(),
any(TracingContext.class));
if (mockClient instanceof AbfsBlobClient) {
doReturn(false).when((AbfsBlobClient) mockClient)
.isNonEmptyDirectory(anyString(),
Mockito.nullable(TracingContext.class));
doNothing().when((AbfsBlobClient) mockClient)
.tryMarkerCreation(anyString(),
anyBoolean(),
Mockito.nullable(String.class),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.nullable(TracingContext.class));
// mock for overwrite=true
doThrow(
preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
.when((AbfsBlobClient) mockClient)
.createPathRestOp(any(String.class), eq(true), eq(true),
any(boolean.class), eq(null), any(),
any(TracingContext.class));
// mock for overwrite=false
doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
.doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
.doThrow(
conflictResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
conflictResponseEx) // Scn4: create overwrite=true fails with Http500
.doThrow(
serverErrorResponseEx)
// Scn5: create overwrite=false fails with Http500
.when((AbfsBlobClient) mockClient)
.createPathRestOp(any(String.class), eq(true), eq(false),
any(boolean.class), eq(null), any(),
any(TracingContext.class));
doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
.doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
.doReturn(
successOp) // Scn3: create overwrite=true fails with Http412
.doReturn(
successOp) // Scn4: create overwrite=true fails with Http500
.when((AbfsBlobClient) mockClient)
.getPathStatus(any(String.class), any(TracingContext.class),
nullable(
ContextEncryptionAdapter.class), eq(false));
}
// Scn1: GFS fails with Http404
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - fail with File Not found
// Create will fail with ConcurrentWriteOperationDetectedException
validateCreateFileException(
ConcurrentWriteOperationDetectedException.class,
abfsStore);
// Scn2: GFS fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - fail with Server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class,
abfsStore);
// Scn3: create overwrite=true fails with Http412
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - pass
// 3. create overwrite=true - fail with Pre-Condition
// Create will fail with ConcurrentWriteOperationDetectedException
validateCreateFileException(
ConcurrentWriteOperationDetectedException.class,
abfsStore);
// Scn4: create overwrite=true fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - pass
// 3. create overwrite=true - fail with Server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class,
abfsStore);
// Scn5: create overwrite=false fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class,
abfsStore);
}
}
}
@Test
public void testCreateMarkerFailExceptionIsSwallowed() throws Throwable {
assumeBlobServiceType();
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite", Boolean.toString(true));
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) {
AbfsClient mockClient = Mockito.spy(fs.getAbfsClient());
AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
spiedStore.setClient(mockClient);
AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class);
when(clientHandler.getIngressClient()).thenReturn(mockClient);
when(clientHandler.getClient(Mockito.any())).thenReturn(mockClient);
Path testFolder = new Path("/dir1");
createAzCopyFolder(testFolder);
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "clientHandler", clientHandler);
ReflectionUtils.setFinalField(AzureBlobFileSystemStore.class, abfsStore, "client", mockClient);
AbfsRestOperation successOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
when(successOp.getResult()).thenReturn(http200Op);
AbfsRestOperationException preConditionResponseEx = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
doCallRealMethod().when(mockClient)
.conditionalCreateOverwriteFile(anyString(),
Mockito.nullable(FileSystem.Statistics.class),
Mockito.nullable(AzureBlobFileSystemStore.Permissions.class),
anyBoolean(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.nullable(TracingContext.class));
doCallRealMethod().when((AbfsBlobClient) mockClient)
.tryMarkerCreation(anyString(), anyBoolean(), Mockito.nullable(String.class),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.nullable(TracingContext.class));
Mockito.doReturn(new ArrayList<>(Collections.singletonList(testFolder)))
.when((AbfsBlobClient) mockClient)
.getMarkerPathsTobeCreated(any(Path.class), Mockito.nullable(TracingContext.class));
doReturn(false).when((AbfsBlobClient) mockClient)
.isNonEmptyDirectory(anyString(), Mockito.nullable(TracingContext.class));
doAnswer(new Answer<Void>() {
private boolean firstCall = true;
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if (firstCall) {
firstCall = false;
throw preConditionResponseEx;
}
return null;
}
}).doCallRealMethod()
.when((AbfsBlobClient) mockClient)
.createPathRestOp(anyString(), anyBoolean(), anyBoolean(),
anyBoolean(), Mockito.nullable(String.class),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.nullable(TracingContext.class));
AbfsClientTestUtil.hookOnRestOpsForTracingContextSingularity(mockClient);
doReturn(successOp)
.when((AbfsBlobClient) mockClient)
.getPathStatus(any(String.class), any(TracingContext.class),
nullable(ContextEncryptionAdapter.class), eq(false));
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE);
Path testPath = new Path("/dir1/testFile");
abfsStore.createFile(testPath, null, true, permission, umask,
getTestTracingContext(getFileSystem(), true));
Assertions.assertThat(fs.exists(testPath))
.describedAs("File not created when marker creation failed.")
.isTrue();
}
}
}
private <E extends Throwable> void validateCreateFileException(final Class<E> exceptionClass, final AzureBlobFileSystemStore abfsStore)
throws Exception {
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
FsAction.NONE);
Path testPath = new Path("/testFile");
intercept(
exceptionClass,
() -> abfsStore.createFile(testPath, null, true, permission, umask,
getTestTracingContext(getFileSystem(), true)));
}
private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
return new AbfsRestOperationException(status, "", "", new Exception());
}
/**
* Attempts to test multiple flush calls.
*/
@Test
public void testMultipleFlush() throws Throwable {
try (AzureBlobFileSystem fs = getFileSystem()) {
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
try (FSDataOutputStream out = fs.create(testPath)) {
out.write('1');
out.hsync();
out.write('2');
out.hsync();
}
}
}
/**
* Delete the blob before flush and verify that an exception should be thrown.
*/
@Test
public void testDeleteBeforeFlush() throws Throwable {
try (AzureBlobFileSystem fs = getFileSystem()) {
Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
try (FSDataOutputStream out = fs.create(testPath)) {
out.write('1');
fs.delete(testPath, false);
out.hsync();
// this will cause the next write to failAll
} catch (IOException fnfe) {
//appendblob outputStream does not generate suppressed exception on close as it is
//single threaded code
if (!fs.getAbfsStore()
.isAppendBlobKey(fs.makeQualified(testPath).toString())) {
// the exception raised in close() must be in the caught exception's
// suppressed list
Throwable[] suppressed = fnfe.getSuppressed();
assertEquals("suppressed count", 1, suppressed.length);
Throwable inner = suppressed[0];
if (!(inner instanceof IOException)) {
throw inner;
}
GenericTestUtils.assertExceptionContains(fnfe.getMessage(),
inner.getCause(), inner.getCause().getMessage());
}
}
}
}
/**
* Creating subdirectory on existing file path should fail.
* @throws Exception
*/
@Test
public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a/b/c"));
fs.mkdirs(new Path("a/b/d"));
intercept(IOException.class, () -> fs.mkdirs(new Path("a/b/c/d/e")));
Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/d"))).isTrue();
// Asserting directory created still exists as explicit.
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/d"), fs,
getTestTracingContext(fs, true)))
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Calling mkdir for existing implicit directory.
* @throws Exception
*/
@Test
public void testMkdirSameFolder() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
createAzCopyFolder(new Path("a/b/d"));
fs.mkdirs(new Path("a/b/d"));
}
}
/**
* Try creating file same as an existing directory.
* @throws Exception
*/
@Test
public void testCreateDirectoryAndFile() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c"))).isTrue();
intercept(IOException.class, () -> fs.create(new Path("a/b/c")));
// Asserting that directory still exists as explicit
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
fs, getTestTracingContext(fs, true)))
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Creating same file without specifying overwrite.
* @throws Exception
*/
@Test
public void testCreateSameFile() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a/b/c"));
fs.create(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path does not exist")
.isTrue();
}
}
/**
* Test the creation of a file without conditional overwrite.
* This test sets the configuration `fs.azure.enable.conditional.create.overwrite` to false,
* creates a directory, and then attempts to create a file at the same path with overwrite set to true.
* It expects an IOException to be thrown.
*
* @throws Exception if any exception occurs during the test execution
*/
@Test
public void testCreationWithoutConditionalOverwrite()
throws Exception {
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
String.valueOf(false));
try (AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config)) {
fs.mkdirs(new Path("a/b/c"));
intercept(IOException.class,
() -> fs.create(new Path("a/b/c"), true));
}
}
}
/**
* Test the creation of a file with overwrite set to false without conditional overwrite.
* This test sets the configuration `fs.azure.enable.conditional.create.overwrite` to false,
* creates a directory, and then attempts to create a file at the same path with overwrite set to false.
* It expects an IOException to be thrown.
*
* @throws Exception if any exception occurs during the test execution
*/
@Test
public void testCreationOverwriteFalseWithoutConditionalOverwrite() throws Exception {
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
String.valueOf(false));
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
currentFs.getUri(), config)) {
fs.mkdirs(new Path("a/b/c"));
intercept(IOException.class,
() -> fs.create(new Path("a/b/c"), false));
}
}
}
/**
* Creating same file with overwrite flag set to false.
* @throws Exception
*/
@Test
public void testCreateSameFileWithOverwriteFalse() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path does not exist")
.isTrue();
intercept(IOException.class,
() -> fs.create(new Path("a/b/c"), false));
}
}
/**
* Creation of already existing subpath should fail.
* @throws Exception
*/
@Test
public void testCreateSubPath() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path does not exist")
.isTrue();
intercept(IOException.class,
() -> fs.create(new Path("a/b")));
}
}
/**
* Test create path in parallel with overwrite false.
**/
@Test
public void testParallelCreateOverwriteFalse()
throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
final byte[] b = new byte[8 * ONE_MB];
new Random().nextBytes(b);
final Path filePath = path("/testPath");
futures.add(executorService.submit(() -> {
try {
fs.create(filePath, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
fs.create(filePath, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
fs.create(filePath, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 2);
}
}
/**
* Test create path in parallel with overwrite true.
**/
@Test
public void testParallelCreateOverwriteTrue()
throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();
final byte[] b = new byte[8 * ONE_MB];
new Random().nextBytes(b);
final Path filePath = path("/testPath");
futures.add(executorService.submit(() -> {
try {
fs.create(filePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
fs.create(filePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
futures.add(executorService.submit(() -> {
try {
fs.create(filePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
}
}
/**
* Creating path with parent explicit.
*/
@Test
public void testCreatePathParentExplicit() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path does not exist")
.isTrue();
fs.create(new Path("a/b/c/d"));
Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
.describedAs("Path does not exist")
.isTrue();
// asserting that parent stays explicit
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
fs, getTestTracingContext(fs, true)))
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
// Creation with append blob should succeed for blob endpoint
@Test
public void testCreateWithAppendBlobEnabled()
throws IOException, NoSuchFieldException, IllegalAccessException {
Configuration conf = getRawConfiguration();
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(conf))) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
doReturn(true).when(store).isAppendBlobKey(anyString());
// Set abfsStore as our mocked value.
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
"abfsStore");
privateField.setAccessible(true);
privateField.set(fs, store);
Path testPath = path("/testPath");
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getBlobClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, true, null,
null, getTestTracingContext(fs, true));
}
}
/**
* Test create on implicit directory with explicit parent.
* @throws Exception
*/
@Test
public void testParentExplicitPathImplicit() throws Exception {
assumeBlobServiceType();
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("/explicitParent"));
String sourcePathName = "/explicitParent/implicitDir";
Path sourcePath = new Path(sourcePathName);
createAzCopyFolder(sourcePath);
intercept(IOException.class, () ->
fs.create(sourcePath, true));
intercept(IOException.class, () ->
fs.create(sourcePath, false));
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(sourcePath.getParent(), fs,
getTestTracingContext(fs, true)))
.describedAs("Parent directory should be explicit.")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(sourcePath, fs,
getTestTracingContext(fs, true)))
.describedAs("Path should be implicit.")
.isTrue();
}
}
/**
* Test create on implicit directory with implicit parent
* @throws Exception
*/
@Test
public void testParentImplicitPathImplicit() throws Exception {
assumeBlobServiceType();
try (AzureBlobFileSystem fs = getFileSystem()) {
String parentPathName = "/implicitParent";
Path parentPath = new Path(parentPathName);
String sourcePathName = "/implicitParent/implicitDir";
Path sourcePath = new Path(sourcePathName);
createAzCopyFolder(parentPath);
createAzCopyFolder(sourcePath);
intercept(IOException.class, () ->
fs.create(sourcePath, true));
intercept(IOException.class, () ->
fs.create(sourcePath, false));
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(parentPath, fs,
getTestTracingContext(fs, true)))
.describedAs("Parent directory is implicit.")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(sourcePath, fs,
getTestTracingContext(fs, true)))
.describedAs("Path should also be implicit.")
.isTrue();
}
}
/**
* Tests create file when file exists already and parent is implicit
* Verifies using eTag for overwrite = true/false
*/
@Test
public void testCreateFileExistsImplicitParent() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
String parentPathName = "/implicitParent";
Path parentPath = new Path(parentPathName);
createAzCopyFolder(parentPath);
String fileName = "/implicitParent/testFile";
Path filePath = new Path(fileName);
fs.create(filePath);
String eTag = extractFileEtag(fileName);
// testing createFile on already existing file path
fs.create(filePath, true);
String eTagAfterCreateOverwrite = extractFileEtag(fileName);
Assertions.assertThat(eTag.equals(eTagAfterCreateOverwrite))
.describedAs(
"New file eTag after create overwrite should be different from old")
.isFalse();
intercept(IOException.class, () ->
fs.create(filePath, false));
String eTagAfterCreate = extractFileEtag(fileName);
Assertions.assertThat(eTagAfterCreateOverwrite.equals(eTagAfterCreate))
.describedAs("File eTag should not change as creation fails")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(parentPath, fs,
getTestTracingContext(fs, true)))
.describedAs("Parent path should also change to explicit.")
.isTrue();
}
}
/**
* Tests create file when file exists already and parent is explicit
* Verifies using eTag for overwrite = true/false
*/
@Test
public void testCreateFileExistsExplicitParent() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
String parentPathName = "/explicitParent";
Path parentPath = new Path(parentPathName);
fs.mkdirs(parentPath);
String fileName = "/explicitParent/testFile";
Path filePath = new Path(fileName);
fs.create(filePath);
String eTag = extractFileEtag(fileName);
// testing createFile on already existing file path
fs.create(filePath, true);
String eTagAfterCreateOverwrite = extractFileEtag(fileName);
Assertions.assertThat(eTag.equals(eTagAfterCreateOverwrite))
.describedAs(
"New file eTag after create overwrite should be different from old")
.isFalse();
intercept(IOException.class, () ->
fs.create(filePath, false));
String eTagAfterCreate = extractFileEtag(fileName);
Assertions.assertThat(eTagAfterCreateOverwrite.equals(eTagAfterCreate))
.describedAs("File eTag should not change as creation fails")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(parentPath, fs,
getTestTracingContext(fs, true)))
.describedAs("Parent path should also change to explicit.")
.isTrue();
}
}
/**
* Tests create file when the parent is an existing file
* should fail.
* @throws Exception FileAlreadyExists for blob and IOException for dfs.
*/
@Test
public void testCreateFileParentFile() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
String parentName = "/testParentFile";
Path parent = new Path(parentName);
fs.create(parent);
String childName = "/testParentFile/testChildFile";
Path child = new Path(childName);
IOException e = intercept(IOException.class, () ->
fs.create(child, false));
// asserting that parent stays explicit
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path(parentName)),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path is not a file")
.isFalse();
}
}
/**
* Creating directory on existing file path should fail.
* @throws Exception
*/
@Test
public void testCreateMkdirs() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a/b/c"));
intercept(IOException.class,
() -> fs.mkdirs(new Path("a/b/c/d")));
}
}
/**
* Test mkdirs.
* @throws Exception
*/
@Test
public void testMkdirs() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b"));
fs.mkdirs(new Path("a/b/c/d"));
fs.mkdirs(new Path("a/b/c/e"));
Assertions.assertThat(fs.exists(new Path("a/b")))
.describedAs("Path a/b does not exist")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
.describedAs("Path a/b/c/d does not exist")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c/e")))
.describedAs("Path a/b/c/e does not exist")
.isTrue();
// Asserting that directories created as explicit
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path a/b is not an explicit directory")
.isTrue();
FileStatus status1 = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c/d")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status1.isDirectory())
.describedAs("Path a/b/c/d is not an explicit directory")
.isTrue();
FileStatus status2 = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c/e")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status2.isDirectory())
.describedAs("Path a/b/c/e is not an explicit directory")
.isTrue();
}
}
/**
* Creating subpath of directory path should fail.
* @throws Exception
*/
@Test
public void testMkdirsCreateSubPath() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path a/b/c does not exist")
.isTrue();
intercept(IOException.class, () -> fs.create(new Path("a/b")));
// Asserting that directories created as explicit
FileStatus status2 = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status2.isDirectory())
.describedAs("Path a/b/c is not an explicit directory")
.isTrue();
}
}
/**
* Test creation of directory by level.
* @throws Exception
*/
@Test
public void testMkdirsByLevel() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a"));
fs.mkdirs(new Path("a/b/c"));
fs.mkdirs(new Path("a/b/c/d/e"));
Assertions.assertThat(fs.exists(new Path("a")))
.describedAs("Path a does not exist")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path a/b/c does not exist")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c/d/e")))
.describedAs("Path a/b/c/d/e does not exist")
.isTrue();
// Asserting that directories created as explicit
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path a is not an explicit directory")
.isTrue();
FileStatus status1 = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status1.isDirectory())
.describedAs("Path a/b/c is not an explicit directory")
.isTrue();
FileStatus status2 = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c/d/e")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status2.isDirectory())
.describedAs("Path a/b/c/d/e is not an explicit directory")
.isTrue();
}
}
/*
Delete part of a path and validate sub path exists.
*/
@Test
public void testMkdirsWithDelete() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b"));
fs.mkdirs(new Path("a/b/c/d"));
fs.delete(new Path("a/b/c/d"));
fs.getFileStatus(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path a/b/c does not exist")
.isTrue();
}
}
/**
* Verify mkdir and rename of parent.
*/
@Test
public void testMkdirsWithRename() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c/d"));
fs.create(new Path("e/file"));
fs.delete(new Path("a/b/c/d"));
Assertions.assertThat(fs.rename(new Path("e"), new Path("a/b/c/d")))
.describedAs("Failed to rename path e to a/b/c/d")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c/d/file")))
.describedAs("Path a/b/c/d/file does not exist")
.isTrue();
}
}
/**
* Create a file with name /dir1 and then mkdirs for /dir1/dir2 should fail.
* @throws Exception
*/
@Test
public void testFileCreateMkdirsRoot() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.setWorkingDirectory(new Path("/"));
final Path p1 = new Path("dir1");
fs.create(p1);
intercept(IOException.class,
() -> fs.mkdirs(new Path("dir1/dir2")));
}
}
/**
* Create a file with name /dir1 and then mkdirs for /dir1/dir2 should fail.
* @throws Exception
*/
@Test
public void testFileCreateMkdirsNonRoot() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path p1 = new Path("dir1");
fs.create(p1);
intercept(IOException.class, () -> fs.mkdirs(new Path("dir1/dir2")));
}
}
/**
* Creation of same directory without overwrite flag should pass.
* @throws Exception
*/
@Test
public void testCreateSameDirectory() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c"));
fs.mkdirs(new Path("a/b/c"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Path a/b/c does not exist")
.isTrue();
// Asserting that directories created as explicit
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a/b/c")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path a/b/c is not an explicit directory")
.isTrue();
}
}
/**
* Creation of same directory without overwrite flag should pass.
* @throws Exception
*/
@Test
public void testCreateSamePathDirectory() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.create(new Path("a"));
intercept(IOException.class, () -> fs.mkdirs(new Path("a")));
}
}
/**
* Creation of directory with root as parent
*/
@Test
public void testMkdirOnRootAsParent() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path path = new Path("a");
fs.setWorkingDirectory(new Path("/"));
fs.mkdirs(path);
// Asserting that the directory created by mkdir exists as explicit.
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("a")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path a is not an explicit directory")
.isTrue();
}
}
/**
* Creation of directory on root
*/
@Test
public void testMkdirOnRoot() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path path = new Path("/");
fs.setWorkingDirectory(new Path("/"));
fs.mkdirs(path);
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(new Path("/")),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Creation of file on path with unicode chars
*/
@Test
public void testCreateUnicode() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path path = new Path("/file\u0031");
fs.create(path);
Assertions.assertThat(fs.exists(path))
.describedAs("Path with unicode does not exist")
.isTrue();
}
}
/**
* Creation of directory on path with unicode chars
*/
@Test
public void testMkdirUnicode() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path path = new Path("/dir\u0031");
fs.mkdirs(path);
// Asserting that the directory created by mkdir exists as explicit.
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(path),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Creation of directory on same path with parallel threads.
*/
@Test
public void testMkdirParallelRequests() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path path = new Path("/dir1");
ExecutorService es = Executors.newFixedThreadPool(3);
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
fs.mkdirs(path);
} catch (IOException e) {
throw new CompletionException(e);
}
}, es);
tasks.add(future);
}
// Wait for all the tasks to complete
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
// Assert that the directory created by mkdir exists as explicit
FileStatus status = fs.getAbfsStore()
.getFileStatus(fs.makeQualified(path),
new TracingContext(getTestTracingContext(fs, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Creation of directory with overwrite set to false should not fail according to DFS code.
* @throws Exception
*/
@Test
public void testCreateSameDirectoryOverwriteFalse() throws Exception {
Configuration configuration = getRawConfiguration();
configuration.setBoolean(FS_AZURE_ENABLE_MKDIR_OVERWRITE, false);
try (AzureBlobFileSystem fs1 = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
fs1.mkdirs(new Path("a/b/c"));
fs1.mkdirs(new Path("a/b/c"));
// Asserting that directories created as explicit
FileStatus status = fs1.getAbfsStore()
.getFileStatus(fs1.makeQualified(new Path("a/b/c")),
new TracingContext(getTestTracingContext(fs1, true)));
Assertions.assertThat(status.isDirectory())
.describedAs("Path is not an explicit directory")
.isTrue();
}
}
/**
* Try creating directory same as an existing file.
*/
@Test
public void testCreateDirectoryAndFileRecreation() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
fs.mkdirs(new Path("a/b/c"));
fs.create(new Path("a/b/c/d"));
Assertions.assertThat(fs.exists(new Path("a/b/c")))
.describedAs("Directory a/b/c does not exist")
.isTrue();
Assertions.assertThat(fs.exists(new Path("a/b/c/d")))
.describedAs("File a/b/c/d does not exist")
.isTrue();
intercept(IOException.class,
() -> fs.mkdirs(new Path("a/b/c/d")));
}
}
@Test
public void testCreateNonRecursiveForAtomicDirectoryFile() throws Exception {
try (AzureBlobFileSystem fileSystem = getFileSystem()) {
fileSystem.setWorkingDirectory(new Path("/"));
fileSystem.mkdirs(new Path("/hbase/dir"));
fileSystem.createFile(new Path("/hbase/dir/file"))
.overwrite(false)
.replication((short) 1)
.bufferSize(1024)
.blockSize(1024)
.build();
Assertions.assertThat(fileSystem.exists(new Path("/hbase/dir/file")))
.describedAs("File /hbase/dir/file does not exist")
.isTrue();
}
}
/**
* Test creating a file on a non-existing path with an implicit parent directory.
* This test creates an implicit directory, then creates a file
* inside this implicit directory and asserts that it gets created.
*
* @throws Exception if any exception occurs during the test execution
*/
@Test
public void testCreateOnNonExistingPathWithImplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path implicitPath = new Path("dir1");
final Path path = new Path("dir1/dir2");
createAzCopyFolder(implicitPath);
// Creating a directory on non-existing path inside an implicit directory
fs.create(path);
// Asserting that path created by azcopy becomes explicit.
Assertions.assertThat(fs.exists(path))
.describedAs("File dir1/dir2 does not exist")
.isTrue();
}
}
@Test
public void testMkdirOnNonExistingPathWithImplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path implicitPath = new Path("dir1");
final Path path = new Path("dir1/dir2");
createAzCopyFolder(implicitPath);
// Creating a directory on non-existing path inside an implicit directory
fs.mkdirs(path);
// Asserting that path created by azcopy becomes explicit.
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(implicitPath,
fs, getTestTracingContext(fs, true)))
.describedAs("Path created by azcopy did not become explicit")
.isTrue();
// Asserting that the directory created by mkdir exists as explicit.
Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
fs, getTestTracingContext(fs, true)))
.describedAs("Directory created by mkdir does not exist as explicit")
.isTrue();
}
}
/**
* Creation of directory with parent directory existing as implicit.
* And the directory to be created existing as explicit directory
* @throws Exception
*/
@Test
public void testMkdirOnExistingExplicitDirWithImplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path implicitPath = new Path("dir1");
final Path path = new Path("dir1/dir2");
// Creating implicit directory to be used as parent
createAzCopyFolder(implicitPath);
// Creating an explicit directory on the path first
fs.mkdirs(path);
// Creating a directory on existing explicit directory inside an implicit directory
fs.mkdirs(path);
// Asserting that path created by azcopy becomes explicit.
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(implicitPath,
fs, getTestTracingContext(fs, true)))
.describedAs("Path created by azcopy did not become explicit")
.isTrue();
// Asserting that the directory created by mkdir exists as explicit.
Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
fs, getTestTracingContext(fs, true)))
.describedAs("Directory created by mkdir does not exist as explicit")
.isTrue();
}
}
/**
* Creation of directory with parent directory existing as explicit.
* And the directory to be created existing as implicit directory
* @throws Exception
*/
@Test
public void testMkdirOnExistingImplicitDirWithExplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path explicitPath = new Path("dir1");
final Path path = new Path("dir1/dir2");
// Creating an explicit directory to be used a parent
fs.mkdirs(explicitPath);
createAzCopyFolder(path);
// Creating a directory on existing implicit directory inside an explicit directory
fs.mkdirs(path);
// Asserting that the directory created by mkdir exists as explicit.
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(explicitPath,
fs, getTestTracingContext(fs, true)))
.describedAs("Explicit parent directory does not exist as explicit")
.isTrue();
// Asserting that the directory created by mkdir exists as explicit.
Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path,
fs, getTestTracingContext(fs, true)))
.describedAs("Mkdir created explicit directory")
.isTrue();
}
}
/**
* Creation of directory with parent directory existing as implicit.
* And the directory to be created existing as implicit directory
* @throws Exception
*/
@Test
public void testMkdirOnExistingImplicitDirWithImplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path implicitPath = new Path("dir3");
final Path path = new Path("dir3/dir4");
createAzCopyFolder(implicitPath);
// Creating an implicit directory on path
createAzCopyFolder(path);
// Creating a directory on existing implicit directory inside an implicit directory
fs.mkdirs(path);
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(implicitPath,
fs, getTestTracingContext(fs, true)))
.describedAs("Marker is present for path created by azcopy")
.isTrue();
// Asserting that the mkdir didn't create markers for existing directory.
Assertions.assertThat(DirectoryStateHelper.isImplicitDirectory(path,
fs, getTestTracingContext(fs, true)))
.describedAs("Marker is present for existing directory")
.isTrue();
}
}
/**
* Creation of directory with parent directory existing as implicit.
* And the directory to be created existing as file
* @throws Exception
*/
@Test
public void testMkdirOnExistingFileWithImplicitParentDir() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
final Path implicitPath = new Path("dir1");
final Path path = new Path("dir1/dir2");
createAzCopyFolder(implicitPath);
// Creating a file on path
fs.create(path);
// Creating a directory on existing file inside an implicit directory
// Asserting that the mkdir fails
LambdaTestUtils.intercept(FileAlreadyExistsException.class, () -> {
fs.mkdirs(path);
});
// Asserting that the file still exists at path.
Assertions.assertThat(DirectoryStateHelper.isExplicitDirectory(path,
fs, getTestTracingContext(fs, true)))
.describedAs("File still exists at path")
.isFalse();
}
}
/**
* 1. a/b/c as implicit.
* 2. Create marker for b.
* 3. Do mkdir on a/b/c/d.
* 4. Verify all b,c,d have marker but a is implicit.
*/
@Test
public void testImplicitExplicitFolder() throws Exception {
Configuration configuration = Mockito.spy(getRawConfiguration());
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
final Path implicitPath = new Path("a/b/c");
createAzCopyFolder(implicitPath);
Path path = makeQualified(new Path("a/b"));
AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore()
.getClient(AbfsServiceType.BLOB);
blobClient.createPathRestOp(path.toUri().getPath(), false, true,
false, null, null, getTestTracingContext(fs, true));
fs.mkdirs(new Path("a/b/c/d"));
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(new Path("a"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a' should be implicit")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b' should be explicit")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b/c' should be explicit")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b/c/d' should be explicit")
.isTrue();
}
}
/**
* 1. a/b/c implicit.
* 2. Marker for a and c.
* 3. mkdir on a/b/c/d.
* 4. Verify a,c,d are explicit but b is implicit.
*/
@Test
public void testImplicitExplicitFolder1() throws Exception {
Configuration configuration = Mockito.spy(getRawConfiguration());
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) {
final Path implicitPath = new Path("a/b/c");
createAzCopyFolder(implicitPath);
Path path = makeQualified(new Path("a"));
AbfsBlobClient blobClient = (AbfsBlobClient) fs.getAbfsStore()
.getClient(AbfsServiceType.BLOB);
blobClient.createPathRestOp(path.toUri().getPath(), false, true, false,
null, null, getTestTracingContext(fs, true));
Path newPath = makeQualified(new Path("a/b/c"));
blobClient.createPathRestOp(newPath.toUri().getPath(), false, true,
false, null, null, getTestTracingContext(fs, true));
fs.mkdirs(new Path("a/b/c/d"));
Assertions.assertThat(
DirectoryStateHelper.isImplicitDirectory(new Path("a/b"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b' should be implicit")
.isTrue();
// Asserting that the directory created by mkdir exists as explicit.
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a' should be explicit")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b/c' should be explicit")
.isTrue();
Assertions.assertThat(
DirectoryStateHelper.isExplicitDirectory(new Path("a/b/c/d"),
fs, getTestTracingContext(fs, true)))
.describedAs("Directory 'a/b/c/d' should be explicit")
.isTrue();
}
}
/**
* Extracts the eTag for an existing file
* @param fileName file Path in String from container root
* @return String etag for the file
* @throws IOException
*/
private String extractFileEtag(String fileName) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
final AbfsClient client = fs.getAbfsClient();
final TracingContext testTracingContext = getTestTracingContext(fs, false);
AbfsRestOperation op;
op = client.getPathStatus(fileName, true, testTracingContext, null);
return AzureBlobFileSystemStore.extractEtagHeader(op.getResult());
}
/**
* Tests the idempotency of creating a path with retries by simulating
* a conflict response (HTTP 409) from the Azure Blob File System client.
* The method ensures that the path creation operation retries correctly
* with the proper transaction ID headers, verifying idempotency during
* failure recovery.
*
* @throws Exception if any error occurs during the operation.
*/
@Test
public void testCreatePathRetryIdempotency() throws Exception {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
try (AzureBlobFileSystem fs = getFileSystem(configuration)) {
assumeRecoveryThroughClientTransactionID(true);
AbfsDfsClient abfsClient = mockIngressClientHandler(fs);
final Path nonOverwriteFile = new Path(
"/NonOverwriteTest_FileName_" + UUID.randomUUID());
final List<AbfsHttpHeader> headers = new ArrayList<>();
mockRetriedRequest(abfsClient, headers);
AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doAnswer(answer -> {
String requiredHeader = null;
for (AbfsHttpHeader httpHeader : headers) {
if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(
httpHeader.getName())) {
requiredHeader = httpHeader.getValue();
break;
}
}
return requiredHeader;
}).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
Mockito.doReturn(true).when(getPathRestOp).hasResult();
Mockito.doReturn(op).when(getPathRestOp).getResult();
Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class));
fs.create(nonOverwriteFile, false);
}
}
/**
* Test to verify that the client transaction ID is included in the response header
* during the creation of a new file in Azure Blob Storage.
*
* This test ensures that when a new file is created, the Azure Blob FileSystem client
* correctly includes the client transaction ID in the response header for the created file.
* The test uses a configuration where client transaction ID is enabled and verifies
* its presence after the file creation operation.
*
* @throws Exception if any error occurs during test execution
*/
@Test
public void testGetClientTransactionIdAfterCreate() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(true);
final String[] clientTransactionId = new String[1];
AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
final Path nonOverwriteFile = new Path(
"/NonOverwriteTest_FileName_" + UUID.randomUUID());
fs.create(nonOverwriteFile, false);
final AbfsHttpOperation getPathStatusOp =
abfsDfsClient.getPathStatus(nonOverwriteFile.toUri().getPath(), false,
getTestTracingContext(fs, true), null).getResult();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be set during create")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}
/**
* Test to verify that the client transaction ID is included in the response header
* after two consecutive create operations on the same file in Azure Blob Storage.
*
* This test ensures that even after performing two create operations (with overwrite)
* on the same file, the Azure Blob FileSystem client includes the client transaction ID
* in the response header for the created file. The test checks for the presence of
* the client transaction ID in the response after the second create call.
*
* @throws Exception if any error occurs during test execution
*/
@Test
public void testClientTransactionIdAfterTwoCreateCalls() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(true);
final String[] clientTransactionId = new String[1];
AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
Path testPath = path("testfile");
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.create(testPath, false);
fs.create(testPath, true);
final AbfsHttpOperation getPathStatusOp =
abfsDfsClient.getPathStatus(testPath.toUri().getPath(), false,
getTestTracingContext(fs, true), null).getResult();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be set during create")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}
/**
* Test case to simulate a failure scenario during the recovery process while
* creating a file in Azure Blob File System. This test verifies that when the
* `getPathStatus` method encounters a timeout exception during recovery, it
* triggers an appropriate failure response.
*
* The test mocks the `AbfsDfsClient` to simulate the failure behavior, including
* a retry logic. It also verifies that an exception is correctly thrown and the
* error message contains the expected details for recovery failure.
*
* @throws Exception If an error occurs during the test setup or execution.
*/
@Test
public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(true);
final String[] clientTransactionId = new String[1];
AbfsDfsClient abfsDfsClient = mockIngressClientHandler(fs);
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
mockRetriedRequest(abfsDfsClient, new ArrayList<>());
boolean[] flag = new boolean[1];
Mockito.doAnswer(getPathStatus -> {
if (!flag[0]) {
flag[0] = true;
throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
}
return getPathStatus.callRealMethod();
}).when(abfsDfsClient).getPathStatus(
Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class));
final Path nonOverwriteFile = new Path(
"/NonOverwriteTest_FileName_" + UUID.randomUUID());
String errorMessage = intercept(AbfsDriverException.class,
() -> fs.create(nonOverwriteFile, false)).getErrorMessage();
Assertions.assertThat(errorMessage)
.describedAs("getPathStatus should fail while recovering")
.contains(ERR_CREATE_RECOVERY);
}
}
/**
* Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem.
* This method sets up the necessary mock behavior for the client handler and ingress client.
*
* @param fs The {@link AzureBlobFileSystem} instance for which the client handler will be mocked.
* @return A mocked {@link AbfsDfsClient} instance associated with the provided file system.
*/
private AbfsDfsClient mockIngressClientHandler(AzureBlobFileSystem fs) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(
clientHandler.getClient());
fs.getAbfsStore().setClient(abfsDfsClient);
fs.getAbfsStore().setClientHandler(clientHandler);
Mockito.doReturn(abfsDfsClient).when(clientHandler).getIngressClient();
return abfsDfsClient;
}
/**
* Mocks the retry behavior for an AbfsDfsClient request. The method intercepts
* the Abfs operation and simulates an HTTP conflict (HTTP 409) error on the
* first invocation. It creates a mock HTTP operation with a PUT method and
* specific status codes and error messages.
*
* @param abfsDfsClient The AbfsDfsClient to mock operations for.
* @param headers The list of HTTP headers to which request headers will be added.
*
* @throws Exception If an error occurs during mock creation or operation execution.
*/
private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
final List<AbfsHttpHeader> headers) throws Exception {
TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
new MockIntercept<AbfsRestOperation>() {
private int count = 0;
@Override
public void answer(final AbfsRestOperation mockedObj,
final InvocationOnMock answer)
throws AbfsRestOperationException {
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_CONFLICT).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(), EMPTY_STRING,
null, op);
}
}
}, 0);
}
}