ITestAzureBlobFileSystemRename.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.BlobRenameHandler;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test rename operation.
*/
public class ITestAzureBlobFileSystemRename extends
AbstractAbfsIntegrationTest {
private static final int MAX_ITERATIONS = 20;
private static final int BLOB_COUNT = 11;
private static final int TOTAL_FILES = 25;
public ITestAzureBlobFileSystemRename() throws Exception {
super();
}
@Test
public void testEnsureFileIsRenamed() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path src = path("testEnsureFileIsRenamed-src");
touch(src);
Path dest = path("testEnsureFileIsRenamed-dest");
fs.delete(dest, true);
assertRenameOutcome(fs, src, dest, true);
assertIsFile(fs, dest);
assertPathDoesNotExist(fs, "expected renamed", src);
}
@Test
public void testRenameWithPreExistingDestination() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path src = path("renameSrc");
touch(src);
Path dest = path("renameDest");
touch(dest);
assertRenameOutcome(fs, src, dest, false);
}
@Test
public void testRenameFileUnderDir() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path file1 = new Path(sourceDir, filename);
touch(file1);
Path destDir = path("/testDst");
assertRenameOutcome(fs, sourceDir, destDir, true);
FileStatus[] fileStatus = fs.listStatus(destDir);
assertNotNull("Null file status", fileStatus);
FileStatus status = fileStatus[0];
assertEquals("Wrong filename in " + status,
filename, status.getPath().getName());
}
@Test
public void testRenameDirectory() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testDir = path("testDir");
fs.mkdirs(testDir);
Path test1 = new Path(testDir + "/test1");
fs.mkdirs(test1);
fs.mkdirs(new Path(testDir + "/test1/test2"));
fs.mkdirs(new Path(testDir + "/test1/test2/test3"));
assertRenameOutcome(fs, test1,
new Path(testDir + "/test10"), true);
assertPathDoesNotExist(fs, "rename source dir", test1);
}
@Test
public void testRenameFirstLevelDirectory() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final List<Future<Void>> tasks = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(10);
Path source = path("/test");
for (int i = 0; i < 1000; i++) {
final Path fileName = new Path(source + "/" + i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
touch(fileName);
return null;
}
};
tasks.add(es.submit(callable));
}
for (Future<Void> task : tasks) {
task.get();
}
es.shutdownNow();
Path dest = path("/renamedDir");
assertRenameOutcome(fs, source, dest, true);
FileStatus[] files = fs.listStatus(dest);
assertEquals("Wrong number of files in listing", 1000, files.length);
assertPathDoesNotExist(fs, "rename source dir", source);
}
@Test
public void testRenameRoot() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assertRenameOutcome(fs,
new Path("/"),
new Path("/testRenameRoot"),
false);
assertRenameOutcome(fs,
new Path(fs.getUri().toString() + "/"),
new Path(fs.getUri().toString() + "/s"),
false);
}
@Test
public void testPosixRenameDirectory() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
Path testDir2 = path("testDir2");
fs.mkdirs(new Path(testDir2 + "/test1/test2/test3"));
fs.mkdirs(new Path(testDir2 + "/test4"));
assertTrue(fs.rename(new Path(testDir2 + "/test1/test2/test3"),
new Path(testDir2 + "/test4")));
assertPathExists(fs, "This path should exist", testDir2);
assertPathExists(fs, "This path should exist",
new Path(testDir2 + "/test1/test2"));
assertPathExists(fs, "This path should exist",
new Path(testDir2 + "/test4"));
assertPathExists(fs, "This path should exist",
new Path(testDir2 + "/test4/test3"));
assertPathDoesNotExist(fs, "This path should not exist",
new Path(testDir2 + "/test1/test2/test3"));
}
@Test
public void testRenameWithNoDestinationParentDir() throws Exception {
describe("Verifying the expected behaviour of ABFS rename when "
+ "destination parent Dir doesn't exist.");
final AzureBlobFileSystem fs = getFileSystem();
Path sourcePath = path(getMethodName());
Path destPath = new Path("falseParent", "someChildFile");
byte[] data = dataset(1024, 'a', 'z');
writeDataset(fs, sourcePath, data, data.length, 1024, true);
// Verify that renaming on a destination with no parent dir wasn't
// successful.
assertFalse("Rename result expected to be false with no Parent dir",
fs.rename(sourcePath, destPath));
// Verify that metadata was in an incomplete state after the rename
// failure, and we retired the rename once more.
IOStatistics ioStatistics = fs.getIOStatistics();
AbfsClient client = fs.getAbfsStore().getClient();
IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
RENAME_PATH_ATTEMPTS.getStatName())
.describedAs("For Dfs endpoint: There should be 2 rename "
+ "attempts if metadata incomplete state failure is hit."
+ "For Blob endpoint: There would be only one rename attempt which "
+ "would have a failed precheck.")
.isEqualTo(client instanceof AbfsDfsClient ? 2 : 1);
}
/**
* Tests renaming a directory to the root directory. This test ensures that a directory can be renamed
* successfully to the root directory and that the renamed directory appears as expected.
*
* The test creates a directory (`/src1/src2`), renames it to the root (`/`), and verifies that
* the renamed directory (`/src2`) exists in the root.
*
* @throws Exception if an error occurs during test execution
*/
@Test
public void testRenameToRoot() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(new Path("/src1/src2"));
assertTrue(fs.rename(new Path("/src1/src2"), new Path("/")));
assertTrue(fs.exists(new Path("/src2")));
}
/**
* Tests renaming a non-existent file to the root directory. This test ensures that the rename
* operation returns `false` when attempting to rename a file that does not exist.
*
* The test attempts to rename a file located at `/file` (which does not exist) to the root directory `/`
* and verifies that the rename operation fails.
*
* @throws Exception if an error occurs during test execution
*/
@Test
public void testRenameNotFoundBlobToEmptyRoot() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assertFalse(fs.rename(new Path("/file"), new Path("/")));
}
/**
* Tests renaming a source path to a destination path that contains a colon in the path.
* This verifies that the rename operation handles paths with special characters like a colon.
*
* The test creates a source directory and renames it to a destination path that includes a colon,
* ensuring that the operation succeeds without errors.
*
* @throws Exception if an error occurs during test execution
*/
@Test
public void testRenameBlobToDstWithColonInSourcePath() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
fs.create(new Path("/src:/file"));
Assertions.assertThat(
fs.rename(new Path("/src:"), new Path("/dst")))
.describedAs("Rename should succeed")
.isTrue();
}
/**
* Tests renaming a source path to a destination path that contains a colon in the path.
* This verifies that the rename operation handles paths with special characters like a colon.
*
* The test creates a source directory and renames it to a destination path that includes a colon,
* ensuring that the operation succeeds without errors.
*
* @throws Exception if an error occurs during test execution
*/
@Test
public void testRenameWithColonInDestinationPath() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
fs.create(new Path("/src"));
Assertions.assertThat(
fs.rename(new Path("/src"), new Path("/dst:")))
.describedAs("Rename should succeed")
.isTrue();
}
@Test
public void testRenameWithColonInSourcePath() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
String sourceDirectory = "/src:";
String destinationDirectory = "/dst";
String fileName = "file";
fs.create(new Path(sourceDirectory, fileName));
fs.create(new Path(sourceDirectory + "/Test:", fileName));
// Rename from source to destination
Assertions.assertThat(
fs.rename(new Path(sourceDirectory), new Path(destinationDirectory)))
.describedAs("Rename should succeed")
.isTrue();
Assertions.assertThat(
fs.exists(new Path(sourceDirectory, fileName)))
.describedAs("Source directory should not exist after rename")
.isFalse();
Assertions.assertThat(
fs.exists(new Path(destinationDirectory, fileName)))
.describedAs("Destination directory should exist after rename")
.isTrue();
// Rename from destination to source
Assertions.assertThat(
fs.rename(new Path(destinationDirectory), new Path(sourceDirectory)))
.describedAs("Rename should succeed").isTrue();
Assertions.assertThat(
fs.exists(new Path(sourceDirectory, fileName)))
.describedAs("Destination directory should exist after rename")
.isTrue();
Assertions.assertThat(
fs.exists(new Path(destinationDirectory, fileName)))
.describedAs("Source directory should not exist after rename")
.isFalse();
}
/**
* Tests renaming a directory within the same parent directory when there is no marker file.
* This test ensures that the rename operation succeeds even when no special marker file is present.
*
* The test creates a file in a directory, deletes the blob path using the client, and then attempts
* to rename the directory. It verifies that the rename operation completes successfully.
*
* @throws Exception if an error occurs during test execution
*/
@Test
public void testRenameBlobInSameDirectoryWithNoMarker() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) fs.getAbfsStore().getClient();
fs.create(new Path("/srcDir/dir/file"));
client.deleteBlobPath(new Path("/srcDir/dir"), null,
getTestTracingContext(fs, true));
assertTrue(fs.rename(new Path("/srcDir/dir"), new Path("/srcDir")));
}
/**
* <pre>
* Test to check behaviour of rename API if the destination directory is already
* there. The HNS call and the one for Blob endpoint should have same behaviour.
*
* /testDir2/test1/test2/test3 contains (/file)
* There is another path that exists: /testDir2/test4/test3
* On rename(/testDir2/test1/test2/test3, /testDir2/test4).
* </pre>
*
* Expectation for HNS / Blob endpoint:<ol>
* <li>Rename should fail</li>
* <li>No file should be transferred to destination directory</li>
* </ol>
*/
@Test
public void testPosixRenameDirectoryWhereDirectoryAlreadyThereOnDestination()
throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
fs.create(new Path("testDir2/test1/test2/test3/file"));
fs.mkdirs(new Path("testDir2/test4/test3"));
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file")));
assertFalse(fs.rename(new Path("testDir2/test1/test2/test3"),
new Path("testDir2/test4")));
assertTrue(fs.exists(new Path("testDir2")));
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
assertTrue(fs.exists(new Path("testDir2/test4")));
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3")));
if (getIsNamespaceEnabled(fs)
|| fs.getAbfsClient() instanceof AbfsBlobClient) {
assertFalse(fs.exists(new Path("testDir2/test4/test3/file")));
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file")));
} else {
assertTrue(fs.exists(new Path("testDir2/test4/test3/file")));
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file")));
}
}
/**
* <pre>
* Test to check behaviour of rename API if the destination directory is already
* there. The HNS call and the one for Blob endpoint should have same behaviour.
*
* /testDir2/test1/test2/test3 contains (/file)
* There is another path that exists: /testDir2/test4/test3
* On rename(/testDir2/test1/test2/test3, /testDir2/test4).
* </pre>
*
* Expectation for HNS / Blob endpoint:<ol>
* <li>Rename should fail</li>
* <li>No file should be transferred to destination directory</li>
* </ol>
*/
@Test
public void testPosixRenameDirectoryWherePartAlreadyThereOnDestination()
throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.mkdirs(new Path("testDir2/test1/test2/test3"));
fs.create(new Path("testDir2/test1/test2/test3/file"));
fs.create(new Path("testDir2/test1/test2/test3/file1"));
fs.mkdirs(new Path("testDir2/test4/"));
fs.create(new Path("testDir2/test4/file1"));
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file")));
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file1")));
assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"),
new Path("testDir2/test4")));
assertTrue(fs.exists(new Path("testDir2")));
assertTrue(fs.exists(new Path("testDir2/test1/test2")));
assertTrue(fs.exists(new Path("testDir2/test4")));
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
assertFalse(fs.exists(new Path("testDir2/test4/file")));
assertTrue(fs.exists(new Path("testDir2/test4/file1")));
assertTrue(fs.exists(new Path("testDir2/test4/test3/file")));
assertTrue(fs.exists(new Path("testDir2/test4/test3/file1")));
assertTrue(fs.exists(new Path("testDir2/test4/file1")));
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file")));
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3/file1")));
}
/**
* Test that after completing rename for a directory which is enabled for
* AtomicRename, the RenamePending JSON file is deleted.
*/
@Test
public void testRenamePendingJsonIsRemovedPostSuccessfulRename()
throws Exception {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path("/"));
fs.mkdirs(new Path("hbase/test1/test2/test3"));
fs.create(new Path("hbase/test1/test2/test3/file"));
fs.create(new Path("hbase/test1/test2/test3/file1"));
fs.mkdirs(new Path("hbase/test4/"));
fs.create(new Path("hbase/test4/file1"));
final Integer[] correctDeletePathCount = new Integer[1];
correctDeletePathCount[0] = 0;
Mockito.doAnswer(answer -> {
final String correctDeletePath = "/hbase/test1/test2/test3" + SUFFIX;
if (correctDeletePath.equals(
((Path) answer.getArgument(0)).toUri().getPath())) {
correctDeletePathCount[0] = 1;
}
return null;
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
assertTrue(fs.rename(new Path("hbase/test1/test2/test3"),
new Path("hbase/test4")));
assertEquals("RenamePendingJson should be deleted",
1,
(int) correctDeletePathCount[0]);
}
/**
* Test for a directory in /hbase directory. To simulate the crash of process,
* test will throw an exception with 403 on a copy of one of the blob.<br>
* ListStatus API will be called on the directory. Expectation is that the ListStatus
* API of {@link AzureBlobFileSystem} should recover the paused rename.
*/
@Test
public void testHBaseHandlingForFailedRenameWithListRecovery()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
String srcPath = "hbase/test1/test2";
final String failedCopyPath = srcPath + "/test3/file1";
setupAndTestHBaseFailedRenameRecovery(fs, client, srcPath, failedCopyPath,
(abfsFs) -> {
abfsFs.listStatus(new Path(srcPath).getParent());
return null;
});
}
/**
* Test for a directory in /hbase directory. To simulate the crash of process,
* test will throw an exception with 403 on a copy of one of the blob. The
* source directory is a nested directory.<br>
* GetFileStatus API will be called on the directory. Expectation is that the
* GetFileStatus API of {@link AzureBlobFileSystem} should recover the paused
* rename.
*/
@Test
public void testHBaseHandlingForFailedRenameWithGetFileStatusRecovery()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
String srcPath = "hbase/test1/test2";
final String failedCopyPath = srcPath + "/test3/file1";
setupAndTestHBaseFailedRenameRecovery(fs, client, srcPath, failedCopyPath,
(abfsFs) -> {
abfsFs.exists(new Path(srcPath));
return null;
});
}
/**
* Simulates a scenario where HMaster in Hbase starts up and executes listStatus
* API on the directory that has to be renamed by some other executor-machine.
* The scenario is that RenamePending JSON is created but before it could be
* appended, it has been opened by the HMaster. The HMaster will delete it. The
* machine doing rename would have to recreate the JSON file.
* ref: <a href="https://issues.apache.org/jira/browse/HADOOP-12678">issue</a>
*/
@Test
public void testHbaseListStatusBeforeRenamePendingFileAppendedWithIngressOnBlob()
throws Exception {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
fs.setWorkingDirectory(new Path(ROOT_PATH));
testRenamePreRenameFailureResolution(fs);
testAtomicityRedoInvalidFile(fs);
}
/**
* Test to check the atomicity of rename operation. The rename operation should
* be atomic and should not leave any intermediate state.
*/
@Test
public void testRenameJsonDeletedBeforeRenameAtomicityCanDelete()
throws Exception {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path path = new Path("/hbase/test1/test2");
fs.mkdirs(new Path(path, "test3"));
Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
OutputStream os = fs.create(renameJson);
os.write("{}".getBytes(StandardCharsets.UTF_8));
os.close();
int[] renameJsonDeleteCounter = new int[1];
Mockito.doAnswer(deleteAnswer -> {
Path ansPath = deleteAnswer.getArgument(0);
if (renameJson.toUri()
.getPath()
.equalsIgnoreCase(ansPath.toUri().getPath())) {
renameJsonDeleteCounter[0]++;
}
getFileSystem().delete(ansPath, true);
return deleteAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
new RenameAtomicity(renameJson, 2,
getTestTracingContext(fs, true), null, client);
}
/**
* Tests the scenario where the rename operation is complete before the redo
* operation for atomicity, leading to a failure. This test verifies that the
* system correctly handles the case when a rename operation is attempted after
* the source path has already been deleted, which should result in an error.
* <p>
* The test simulates a situation where a `renameJson` file is created for the
* rename operation, and the source path is deleted during the read process in
* the redo operation. The `redoRenameAtomicity` is then executed, and it is
* expected to fail with a `404` error, indicating that the source path no longer exists.
* <p>
* The test ensures that the system can handle this error condition and return
* the correct response, preventing a potentially invalid or inconsistent state.
*
* @throws Exception If an error occurs during file system operations.
*/
@Test
public void testRenameCompleteBeforeRenameAtomicityRedo() throws Exception {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path path = new Path("/hbase/test1/test2");
fs.mkdirs(new Path(path, "test3"));
Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
/*
* Create renameJson file.
*/
VersionedFileStatus fileStatus
= (VersionedFileStatus) fs.getFileStatus(path);
int jsonLen = new RenameAtomicity(path,
new Path("/hbase/test4"), renameJson,
getTestTracingContext(fs, true), fileStatus.getEtag(),
client).preRename();
RenameAtomicity redoRenameAtomicity = Mockito.spy(
new RenameAtomicity(renameJson, jsonLen,
getTestTracingContext(fs, true), null, client));
RenameAtomicityTestUtils.addReadPathMock(redoRenameAtomicity,
readCallbackAnswer -> {
byte[] bytes = (byte[]) readCallbackAnswer.callRealMethod();
fs.delete(path, true);
return bytes;
});
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
redoRenameAtomicity::redo);
Assertions.assertThat(ex.getStatusCode())
.describedAs("RenameAtomicity redo should fail with 404")
.isEqualTo(SOURCE_PATH_NOT_FOUND.getStatusCode());
Assertions.assertThat(ex.getErrorCode())
.describedAs("RenameAtomicity redo should fail with 404")
.isEqualTo(SOURCE_PATH_NOT_FOUND);
}
/**
* Tests the idempotency of the copyBlob operation during a rename when the
* destination already exists. This test simulates a scenario where the source
* blob is copied to the destination before the actual rename operation is invoked.
* It ensures that the copyBlob operation can handle idempotency issues and perform
* the rename successfully even when the destination is pre-created.
* <p>
* The test verifies that the rename operation successfully copies the blob from
* the source to the destination, and the source is deleted, leaving only the
* destination file. This ensures that the system behaves correctly in scenarios
* where the destination path already contains the blob.
*
* @throws Exception If an error occurs during file system operations.
*/
@Test
public void testCopyBlobIdempotency() throws Exception {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path src = new Path("/srcDir/src");
Path dst = new Path("/dst");
fs.create(src);
Mockito.doAnswer(answer -> {
Path srcCopy = answer.getArgument(0);
Path dstCopy = answer.getArgument(1);
String leaseId = answer.getArgument(2);
TracingContext tracingContext = answer.getArgument(3);
/*
* To fail copyBlob with idempotency issue, making a copy of the source to destination
* before the invoked copy
*/
((AbfsBlobClient) getFileSystem().getAbfsClient()).copyBlob(srcCopy,
dstCopy, leaseId, tracingContext);
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should be successful and copyBlob should"
+ "be able to handle idempotency issue")
.isTrue();
Assertions.assertThat(fs.exists(src))
.describedAs("Source should not exist after rename")
.isFalse();
Assertions.assertThat(fs.exists(dst))
.describedAs("Destination should exist after rename")
.isTrue();
}
/**
* Tests the idempotency of the rename operation when the destination path is
* created by some other process before the rename operation. This test simulates
* the scenario where a source blob is renamed, and the destination path already
* exists due to actions from another process. It ensures that the rename operation
* behaves idempotently and correctly handles the case where the destination is
* pre-created.
* <p>
* The test verifies that the rename operation fails (since the destination already
* exists), but the source path remains intact, and the blob copy operation is able
* to handle the idempotency issue.
*
* @throws IOException If an error occurs during file system operations.
*/
@Test
public void testRenameBlobIdempotencyWhereDstIsCreatedFromSomeOtherProcess()
throws IOException {
final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path src = new Path("/src");
Path dst = new Path("/dst");
fs.create(src);
Mockito.doAnswer(answer -> {
Path dstCopy = answer.getArgument(1);
fs.create(dstCopy);
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should be successful and copyBlob should"
+ "be able to handle idempotency issue")
.isFalse();
Assertions.assertThat(fs.exists(src))
.describedAs("Source should exist after rename failure")
.isTrue();
}
/**
* Tests renaming a directory when the destination directory is missing a marker blob.
* This test involves creating multiple directories and files, deleting a blob (marker) in the
* destination directory, and renaming the source directory to the destination.
* It then verifies that the renamed directory exists at the expected destination path.
*
* @throws Exception If an error occurs during the file system operations or assertions.
*/
@Test
public void testRenameDirWhenMarkerBlobIsAbsentOnDstDir() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
fs.mkdirs(new Path("/test1"));
fs.mkdirs(new Path("/test1/test2"));
fs.mkdirs(new Path("/test1/test2/test3"));
fs.create(new Path("/test1/test2/test3/file"));
((AbfsBlobClient) fs.getAbfsClient())
.deleteBlobPath(new Path("/test1/test2"),
null, getTestTracingContext(fs, true));
fs.mkdirs(new Path("/test4/test5"));
fs.rename(new Path("/test4"), new Path("/test1/test2"));
assertTrue(fs.exists(new Path("/test1/test2/test4/test5")));
}
/**
* Tests the renaming of a directory when the source directory does not have a marker file.
* This test creates a file within a source directory, deletes the source directory from the blob storage,
* creates a new target directory, and renames the source directory to the target location.
* It verifies that the renamed source directory exists in the target path.
*
* @throws Exception If an error occurs during the file system operations or assertions.
*/
@Test
public void testBlobRenameSrcDirHasNoMarker() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
fs.create(new Path("/test1/test2/file1"));
((AbfsBlobClient) fs.getAbfsStore().getClient())
.deleteBlobPath(new Path("/test1"), null,
getTestTracingContext(fs, true));
fs.mkdirs(new Path("/test2"));
fs.rename(new Path("/test1"), new Path("/test2"));
assertTrue(fs.exists(new Path("/test2/test1")));
}
/**
* Verifies the behavior of a blob copy operation that takes time to complete.
* The test ensures the following:
* <ul>
* <li>A file is created and a rename operation is initiated.</li>
* <li>The copy operation progress is mocked to simulate a time-consuming process.</li>
* <li>The rename operation triggers a call to handle the copy progress.</li>
* <li>The test checks that the file exists after the rename and that the
* `handleCopyInProgress` method is called exactly once.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testCopyBlobTakeTime() throws Exception {
AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient spiedClient = (AbfsBlobClient) addSpyHooksOnClient(
fileSystem);
addMockForProgressStatusOnCopyOperation(spiedClient);
fileSystem.create(new Path("/test1/file"));
BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1];
AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient,
blobRenameHandler -> {
blobRenameHandlers[0] = blobRenameHandler;
return null;
});
fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2"));
assertTrue(fileSystem.exists(new Path("/test1/file2")));
Mockito.verify(blobRenameHandlers[0], Mockito.times(1))
.handleCopyInProgress(Mockito.any(Path.class),
Mockito.any(TracingContext.class), Mockito.any(String.class));
}
/**
* Verifies the behavior when a blob copy operation takes time and eventually fails.
* The test ensures the following:
* <ul>
* <li>A file is created and a copy operation is initiated.</li>
* <li>The copy operation is mocked to eventually fail.</li>
* <li>The rename operation triggers an exception due to the failed copy.</li>
* <li>The test checks that the appropriate 'COPY_FAILED' error code and status code are returned.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testCopyBlobTakeTimeAndEventuallyFail() throws Exception {
AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient spiedClient = (AbfsBlobClient) addSpyHooksOnClient(
fileSystem);
addMockForProgressStatusOnCopyOperation(spiedClient);
fileSystem.create(new Path("/test1/file"));
addMockForCopyOperationFinalStatus(spiedClient, COPY_STATUS_FAILED);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
() -> {
fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2"));
});
Assertions.assertThat(ex.getStatusCode())
.describedAs("Expecting COPY_FAILED status code")
.isEqualTo(COPY_BLOB_FAILED.getStatusCode());
Assertions.assertThat(ex.getErrorCode())
.describedAs("Expecting COPY_FAILED error code")
.isEqualTo(COPY_BLOB_FAILED);
}
/**
* Verifies the behavior when a blob copy operation takes time and is eventually aborted.
* The test ensures the following:
* <ul>
* <li>A file is created and a copy operation is initiated.</li>
* <li>The copy operation is mocked to eventually be aborted.</li>
* <li>The rename operation triggers an exception due to the aborted copy.</li>
* <li>The test checks that the appropriate 'COPY_ABORTED' error code and status code are returned.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testCopyBlobTakeTimeAndEventuallyAborted() throws Exception {
AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient spiedClient = (AbfsBlobClient) addSpyHooksOnClient(
fileSystem);
addMockForProgressStatusOnCopyOperation(spiedClient);
fileSystem.create(new Path("/test1/file"));
addMockForCopyOperationFinalStatus(spiedClient, COPY_STATUS_ABORTED);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
() -> {
fileSystem.rename(new Path("/test1/file"), new Path("/test1/file2"));
});
Assertions.assertThat(ex.getStatusCode())
.describedAs("Expecting COPY_ABORTED status code")
.isEqualTo(COPY_BLOB_ABORTED.getStatusCode());
Assertions.assertThat(ex.getErrorCode())
.describedAs("Expecting COPY_ABORTED error code")
.isEqualTo(COPY_BLOB_ABORTED);
}
/**
* Verifies the behavior when a blob copy operation takes time and the destination blob
* is deleted during the process. The test ensures the following:
* <ul>
* <li>A source file is created and a copy operation is initiated.</li>
* <li>During the copy process, the destination file is deleted.</li>
* <li>The copy operation returns a pending status.</li>
* <li>The test checks that the destination file does not exist after the copy operation is interrupted.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testCopyBlobTakeTimeAndBlobIsDeleted() throws Exception {
AzureBlobFileSystem fileSystem = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient spiedClient = (AbfsBlobClient) addSpyHooksOnClient(
fileSystem);
String srcFile = "/test1/file";
String dstFile = "/test1/file2";
Mockito.doAnswer(answer -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) answer.callRealMethod());
fileSystem.delete(new Path(dstFile), false);
AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
HttpHeaderConfigurations.X_MS_COPY_STATUS);
Mockito.doReturn(httpOp).when(op).getResult();
return op;
})
.when(spiedClient)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
fileSystem.create(new Path(srcFile));
assertFalse(fileSystem.rename(new Path(srcFile), new Path(dstFile)));
assertFalse(fileSystem.exists(new Path(dstFile)));
}
/**
* Verifies the behavior when attempting to copy a blob after the source has been deleted
* in the Azure Blob FileSystem. The test ensures the following:
* <ul>
* <li>A source blob is created and then deleted.</li>
* <li>An attempt to copy the deleted source blob results in a 'not found' error.</li>
* <li>The test checks that the correct HTTP error (404 Not Found) is returned when copying a non-existent source.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testCopyAfterSourceHasBeenDeleted() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) fs.getAbfsClient();
fs.create(new Path("/src"));
TracingContext tracingContext = new TracingContext("clientCorrelationId",
"fileSystemId", FSOperationType.TEST_OP,
getConfiguration().getTracingHeaderFormat(),
null);
client.deleteBlobPath(new Path("/src"), null,
getTestTracingContext(fs, true));
Boolean srcBlobNotFoundExReceived = false;
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
() -> {
client.copyBlob(new Path("/src"), new Path("/dst"),
null, getTestTracingContext(fs, true));
});
Assertions.assertThat(ex.getStatusCode())
.describedAs("Source has to be not found at copy")
.isEqualTo(HTTP_NOT_FOUND);
}
/**
* Verifies that parallel rename operations in the Azure Blob FileSystem fail when
* trying to perform an atomic rename with lease acquisition. The test ensures the following:
* <ul>
* <li>A directory is created and a rename operation is attempted.</li>
* <li>A parallel thread attempts to rename the directory while the lease is being acquired.</li>
* <li>The parallel rename operation should fail due to a lease conflict, triggering an exception.</li>
* <li>The test verifies that the expected conflict exception is thrown when attempting a parallel rename.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testParallelRenameForAtomicRenameShouldFail() throws Exception {
Configuration config = getRawConfiguration();
config.set(FS_AZURE_LEASE_THREADS, "2");
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(config));
assumeBlobServiceType();
fs.setWorkingDirectory(new Path(ROOT_PATH));
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
Path src = new Path("/hbase/src");
Path dst = new Path("/hbase/dst");
fs.mkdirs(src);
AtomicBoolean leaseAcquired = new AtomicBoolean(false);
AtomicBoolean exceptionOnParallelRename = new AtomicBoolean(false);
AtomicBoolean parallelThreadDone = new AtomicBoolean(false);
Mockito.doAnswer(answer -> {
AbfsRestOperation op = (AbfsRestOperation) answer.callRealMethod();
leaseAcquired.set(true);
while (!parallelThreadDone.get()) {}
return op;
})
.when(client)
.acquireLease(Mockito.anyString(), Mockito.anyInt(),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
new Thread(() -> {
while (!leaseAcquired.get()) {}
try {
fs.rename(src, dst);
} catch (Exception e) {
if (e.getCause() instanceof AbfsLease.LeaseException
&& e.getCause().getCause() instanceof AbfsRestOperationException
&& ((AbfsRestOperationException) e.getCause()
.getCause()).getStatusCode() == HTTP_CONFLICT) {
exceptionOnParallelRename.set(true);
}
} finally {
parallelThreadDone.set(true);
}
}).start();
fs.rename(src, dst);
while (!parallelThreadDone.get()) {}
Assertions.assertThat(exceptionOnParallelRename.get())
.describedAs("Parallel rename should fail")
.isTrue();
}
/**
* Verifies the behavior of appending data to a blob during a rename operation in the
* Azure Blob FileSystem. The test ensures the following:
* <ul>
* <li>A file is created and data is appended to it while a rename operation is in progress.</li>
* <li>The append operation should fail due to the rename operation in progress.</li>
* <li>The test checks that the append operation is properly interrupted and fails as expected.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testAppendAtomicBlobDuringRename() throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
Path src = new Path("/hbase/src");
Path dst = new Path("/hbase/dst");
FSDataOutputStream os = fs.create(src);
AtomicBoolean copyInProgress = new AtomicBoolean(false);
AtomicBoolean outputStreamClosed = new AtomicBoolean(false);
AtomicBoolean appendFailed = new AtomicBoolean(false);
Mockito.doAnswer(answer -> {
copyInProgress.set(true);
while (!outputStreamClosed.get()) {}
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
new Thread(() -> {
while (!copyInProgress.get()) {}
try {
os.write(1);
os.close();
} catch (IOException e) {
appendFailed.set(true);
} finally {
outputStreamClosed.set(true);
}
}).start();
fs.rename(src, dst);
Assertions.assertThat(appendFailed.get())
.describedAs("Append should fail")
.isTrue();
}
/**
* Verifies the behavior of renaming a directory in the Azure Blob FileSystem when
* there is a neighboring directory with the same prefix. The test ensures the following:
* <ul>
* <li>Two directories with similar prefixes are created, along with files inside them.</li>
* <li>The rename operation moves one directory to a new location.</li>
* <li>Files in the renamed directory are moved, while files in the neighboring directory with the same prefix remain unaffected.</li>
* <li>Correct existence checks are performed to confirm the renamed directory and its files are moved, and the original directory is deleted.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testBlobRenameOfDirectoryHavingNeighborWithSamePrefix()
throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
fs.mkdirs(new Path("/testDir/dir"));
fs.mkdirs(new Path("/testDir/dirSamePrefix"));
fs.create(new Path("/testDir/dir/file1"));
fs.create(new Path("/testDir/dir/file2"));
fs.create(new Path("/testDir/dirSamePrefix/file1"));
fs.create(new Path("/testDir/dirSamePrefix/file2"));
fs.rename(new Path("/testDir/dir"), new Path("/testDir/dir2"));
Assertions.assertThat(fs.exists(new Path("/testDir/dirSamePrefix/file1")))
.isTrue();
Assertions.assertThat(fs.exists(new Path("/testDir/dir/file1")))
.isFalse();
Assertions.assertThat(fs.exists(new Path("/testDir/dir/file2")))
.isFalse();
Assertions.assertThat(fs.exists(new Path("/testDir/dir/")))
.isFalse();
}
/**
* Verifies the behavior of renaming a directory in the Azure Blob FileSystem when
* the `listPath` operation returns paginated results with one object per list.
* The test ensures the following:
* <ul>
* <li>A directory and its files are created.</li>
* <li>The `listPath` operation is mocked to return one file at a time in each paginated result.</li>
* <li>The rename operation successfully moves the directory and its files to a new location.</li>
* <li>All files are verified to exist in the new location after the rename.</li>
* </ul>
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testBlobRenameWithListGivingPaginatedResultWithOneObjectPerList()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient spiedClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.mkdirs(new Path("/testDir/dir1"));
for (int i = 0; i < 10; i++) {
fs.create(new Path("/testDir/dir1/file" + i));
}
Mockito.doAnswer(answer -> {
String path = answer.getArgument(0);
boolean recursive = answer.getArgument(1);
String continuation = answer.getArgument(3);
TracingContext context = answer.getArgument(4);
return getFileSystem().getAbfsClient()
.listPath(path, recursive, 1, continuation, context, null);
})
.when(spiedClient)
.listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class), Mockito.nullable(URI.class));
fs.rename(new Path("/testDir/dir1"), new Path("/testDir/dir2"));
for (int i = 0; i < 10; i++) {
Assertions.assertThat(fs.exists(new Path("/testDir/dir2/file" + i)))
.describedAs("File " + i + " should exist in /testDir/dir2")
.isTrue();
}
}
/**
* Verifies that the producer stops on a rename failure due to an access denial
* (HTTP_FORBIDDEN error) in the Azure Blob FileSystem. The test ensures the following:
* <ul>
* <li>Multiple file creation tasks are submitted concurrently.</li>
* <li>The rename operation is attempted but fails with an access denied exception.</li>
* <li>On failure, the list operation for the source directory is invoked at most twice.</li>
* </ul>
* The test simulates a failure scenario where the rename operation encounters an access
* denied error, and the list operation should stop after the failure.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testProducerStopOnRenameFailure() throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
fs.mkdirs(new Path("/src"));
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < MAX_ITERATIONS; i++) {
int iter = i;
Future future = executorService.submit(() -> {
try {
fs.create(new Path("/src/file" + iter));
} catch (IOException ex) {}
});
futureList.add(future);
}
for (Future future : futureList) {
future.get();
}
AbfsBlobClient client = (AbfsBlobClient) fs.getAbfsClient();
AbfsBlobClient spiedClient = Mockito.spy(client);
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
store.setClient(spiedClient);
Mockito.doReturn(store).when(fs).getAbfsStore();
final int[] copyCallInvocation = new int[1];
Mockito.doAnswer(answer -> {
throw new AbfsRestOperationException(HTTP_FORBIDDEN, "", "",
new Exception());
}).when(spiedClient)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient,
(blobRenameHandler) -> {
Mockito.doAnswer(answer -> {
try {
answer.callRealMethod();
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HTTP_FORBIDDEN) {
copyCallInvocation[0]++;
}
throw ex;
}
throw new AssertionError("List Consumption should have failed");
})
.when(blobRenameHandler).listRecursiveAndTakeAction();
return null;
});
final int[] listCallInvocation = new int[1];
Mockito.doAnswer(answer -> {
if (answer.getArgument(0).equals("/src")) {
if (listCallInvocation[0] == 1) {
while (copyCallInvocation[0] == 0) {}
}
listCallInvocation[0]++;
return getFileSystem().getAbfsClient().listPath(answer.getArgument(0),
answer.getArgument(1), 1,
answer.getArgument(3), answer.getArgument(4), answer.getArgument(5));
}
return answer.callRealMethod();
})
.when(spiedClient)
.listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.nullable(URI.class));
intercept(AccessDeniedException.class,
() -> {
fs.rename(new Path("/src"), new Path("/dst"));
});
Assertions.assertThat(listCallInvocation[0])
.describedAs("List on src should have been invoked at-most twice."
+ "One before consumption and the other after consumption has starting."
+ "Once consumption fails, listing would be stopped.")
.isLessThanOrEqualTo(2);
}
/**
* Verifies the behavior of renaming a directory through the Azure Blob FileSystem
* when the source directory is deleted just before the rename operation is resumed.
* It ensures that:
* <ul>
* <li>No blobs are copied during the resume operation.</li>
* </ul>
* The test simulates a crash, deletes the source directory, and checks for the expected result.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameResumeThroughListStatusWithSrcDirDeletedJustBeforeResume()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path srcPath = new Path("hbase/test1/");
Path failurePath = new Path(srcPath, "file");
fs.mkdirs(srcPath);
fs.create(failurePath);
crashRename(fs, client, srcPath.toUri().getPath());
fs.delete(srcPath, true);
AtomicInteger copiedBlobs = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copiedBlobs.incrementAndGet();
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
fs.listStatus(new Path("hbase"));
Assertions.assertThat(copiedBlobs.get())
.describedAs("No Copy on resume")
.isEqualTo(0);
}
/**
* Verifies the behavior of renaming a directory through the Azure Blob FileSystem
* when the source directory's ETag changes just before the rename operation is resumed.
* It ensures that:
* <ul>
* <li>No blobs are copied during the resume operation.</li>
* <li>The pending rename JSON file is deleted.</li>
* </ul>
* The test simulates a crash, retries the operation, and checks for the expected results.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameResumeThroughListStatusWithSrcDirETagChangedJustBeforeResume()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path srcPath = new Path("hbase/test1/");
Path failurePath = new Path(srcPath, "file");
fs.mkdirs(srcPath);
fs.create(failurePath);
crashRename(fs, client, srcPath.toUri().getPath()
);
fs.delete(srcPath, true);
fs.mkdirs(srcPath);
AtomicInteger copiedBlobs = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copiedBlobs.incrementAndGet();
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
AtomicInteger pendingJsonDeleted = new AtomicInteger(0);
Mockito.doAnswer(listAnswer -> {
Path path = listAnswer.getArgument(0);
if (path.toUri().getPath().endsWith(SUFFIX)) {
pendingJsonDeleted.incrementAndGet();
}
return listAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
fs.listStatus(new Path("/hbase"));
Assertions.assertThat(copiedBlobs.get())
.describedAs("No Copy on resume")
.isEqualTo(0);
Assertions.assertThat(pendingJsonDeleted.get())
.describedAs("RenamePendingJson should be deleted")
.isEqualTo(1);
}
/**
* Test case to verify the behavior of renaming a directory through the Azure Blob
* FileSystem when the source directory's ETag changes just before the rename operation
* is resumed. This test specifically checks the following:
*
* @throws Exception if any errors occur during the test execution
*/
@Test
public void testRenameResumeThroughGetStatusWithSrcDirETagChangedJustBeforeResume()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.setWorkingDirectory(new Path(ROOT_PATH));
Path srcPath = new Path("hbase/test1/");
Path failurePath = new Path(srcPath, "file");
fs.mkdirs(srcPath);
fs.create(failurePath);
crashRename(fs, client, srcPath.toUri().getPath()
);
fs.delete(srcPath, true);
fs.mkdirs(srcPath);
AtomicInteger copiedBlobs = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
copiedBlobs.incrementAndGet();
return answer.callRealMethod();
}).when(client).copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
AtomicInteger pendingJsonDeleted = new AtomicInteger(0);
Mockito.doAnswer(listAnswer -> {
Path path = listAnswer.getArgument(0);
if (path.toUri().getPath().endsWith(SUFFIX)) {
pendingJsonDeleted.incrementAndGet();
}
return listAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
Assertions.assertThat(fs.exists(srcPath))
.describedAs("Source should exist")
.isTrue();
Assertions.assertThat(copiedBlobs.get())
.describedAs("No Copy on resume")
.isEqualTo(0);
Assertions.assertThat(pendingJsonDeleted.get())
.describedAs("RenamePendingJson should be deleted")
.isEqualTo(1);
}
/**
* Test to assert that the CID in src marker blob copy and delete contains the
* total number of blobs operated in the rename directory.
* Also, to assert that all operations in the rename-directory flow have same
* primaryId and opType.
*/
@Test
public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
throws Exception {
AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
assumeBlobServiceType();
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
String dirPathStr = "/testDir/dir1";
fs.mkdirs(new Path(dirPathStr));
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int iter = i;
Future future = executorService.submit(() ->
fs.create(new Path("/testDir/dir1/file" + iter)));
futures.add(future);
}
for (Future future : futures) {
future.get();
}
executorService.shutdown();
final TracingHeaderValidator tracingHeaderValidator
= new TracingHeaderValidator(
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.RENAME, true, 0);
fs.registerListener(tracingHeaderValidator);
Mockito.doAnswer(copyAnswer -> {
if (dirPathStr.equalsIgnoreCase(
((Path) copyAnswer.getArgument(0)).toUri().getPath())) {
tracingHeaderValidator.setOperatedBlobCount(BLOB_COUNT);
return copyAnswer.callRealMethod();
}
return copyAnswer.callRealMethod();
})
.when(client)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
Mockito.doAnswer(deleteAnswer -> {
if (dirPathStr.equalsIgnoreCase(
((Path) deleteAnswer.getArgument(0)).toUri().getPath())) {
Object result = deleteAnswer.callRealMethod();
tracingHeaderValidator.setOperatedBlobCount(null);
return result;
}
return deleteAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
fs.rename(new Path(dirPathStr), new Path("/dst/"));
}
/**
* Test the renaming of a directory with different parallelism configurations.
*/
@Test
public void testRenameDirWithDifferentParallelismConfig() throws Exception {
try (AzureBlobFileSystem currentFs = getFileSystem()) {
assumeBlobServiceType();
Path src = new Path("/hbase/A1/A2");
Path dst = new Path("/hbase/A1/A3");
// Create sample files in the source directory
createFiles(currentFs, src, TOTAL_FILES);
// Test renaming with different configurations
renameDir(currentFs, "10", "5", "2", src, dst);
renameDir(currentFs, "100", "5", "2", dst, src);
String errorMessage = intercept(PathIOException.class,
() -> renameDir(currentFs, "50", "50", "5", src, dst))
.getMessage();
// Validate error message for invalid configuration
Assertions.assertThat(errorMessage)
.describedAs("maxConsumptionLag should be lesser than maxSize")
.contains(
"Invalid configuration value detected for \"fs.azure.blob.dir.list.consumer.max.lag\". "
+ "maxConsumptionLag should be lesser than maxSize");
}
}
/**
* Tests renaming a file or directory when the destination path contains
* a colon (":"). The test ensures that:
* - The source directory exists before the rename.
* - The file is successfully renamed to the destination path.
* - The old source directory no longer exists after the rename.
* - The new destination directory exists after the rename.
*
* @throws Exception if an error occurs during file system operations
*/
@Test
public void testRenameWhenDestinationPathContainsColon() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
fs.setWorkingDirectory(new Path(ROOT_PATH));
String fileName = "file";
Path src = new Path("/test1/");
Path dst = new Path("/test1:/");
// Create the file
fs.create(new Path(src, fileName));
// Perform the rename operation and validate the results
performRenameAndValidate(fs, src, dst, fileName);
}
/**
* Tests the behavior of the atomic rename key for the root folder
* in Azure Blob File System. The test verifies that the atomic rename key
* returns false for the root folder path.
*
* @throws Exception if an error occurs during the atomic rename key check
*/
@Test
public void testGetAtomicRenameKeyForRootFolder() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
assumeBlobServiceType();
AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
Assertions.assertThat(abfsBlobClient.isAtomicRenameKey("/hbase"))
.describedAs("Atomic rename key should return false for Root folder")
.isFalse();
}
/**
* Tests the behavior of the atomic rename key for non-root folders
* in Azure Blob File System. The test verifies that the atomic rename key
* works for specific folders as defined in the configuration.
* It checks the atomic rename key for various paths,
* ensuring it returns true for matching paths and false for others.
*
* @throws Exception if an error occurs during the atomic rename key check
*/
@Test
public void testGetAtomicRenameKeyForNonRootFolder() throws Exception {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set(FS_AZURE_ATOMIC_RENAME_KEY, "/hbase,/a,/b");
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config);
assumeBlobServiceType();
AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient();
// Test for various paths
validateAtomicRenameKey(abfsBlobClient, "/hbase1/test", false);
validateAtomicRenameKey(abfsBlobClient, "/hbase/test", true);
validateAtomicRenameKey(abfsBlobClient, "/a/b/c", true);
validateAtomicRenameKey(abfsBlobClient, "/test/a", false);
}
/**
* Test case to verify path status when there is no pending rename JSON file.
*
* This test ensures that when no rename pending JSON file is present, the path status is
* successfully retrieved, the ETag is present, and no redo rename operation is triggered.
*
* @throws Exception if any error occurs during the test execution
*/
@Test
public void testGetPathStatusWithoutPendingJsonFile() throws Exception {
try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
assumeBlobServiceType();
Path path = new Path("/hbase/A1/A2");
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.create(new Path(path, "file1.txt"));
fs.create(new Path(path, "file2.txt"));
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
AtomicInteger redoRenameCall = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
redoRenameCall.incrementAndGet();
return answer.callRealMethod();
}).when(client).getRedoRenameAtomicity(
Mockito.any(Path.class), Mockito.anyInt(),
Mockito.any(TracingContext.class));
TracingContext tracingContext = new TracingContext(
conf.getClientCorrelationId(), fs.getFileSystemId(),
FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
null);
AbfsHttpOperation abfsHttpOperation = client.getPathStatus(
path.toUri().getPath(), true,
tracingContext, null).getResult();
Assertions.assertThat(abfsHttpOperation.getStatusCode())
.describedAs("Path should be found.")
.isEqualTo(HTTP_OK);
Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
.describedAs("Etag should be present.")
.isNotNull();
Assertions.assertThat(redoRenameCall.get())
.describedAs("There should be no redo rename call.")
.isEqualTo(0);
}
}
/**
* Test case to verify path status when there is a pending rename JSON directory.
*
* This test simulates the scenario where a directory is created with a rename pending JSON
* file (indicated by a specific suffix). It ensures that the path is found, the ETag is present,
* and no redo rename operation is triggered. It also verifies that the rename pending directory
* exists.
*
* @throws Exception if any error occurs during the test execution
*/
@Test
public void testGetPathStatusWithPendingJsonDir() throws Exception {
try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
assumeBlobServiceType();
Path path = new Path("/hbase/A1/A2");
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
fs.create(new Path(path, "file1.txt"));
fs.create(new Path(path, "file2.txt"));
fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX));
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
AtomicInteger redoRenameCall = new AtomicInteger(0);
Mockito.doAnswer(answer -> {
redoRenameCall.incrementAndGet();
return answer.callRealMethod();
}).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
Mockito.anyInt(), Mockito.any(TracingContext.class));
TracingContext tracingContext = new TracingContext(
conf.getClientCorrelationId(), fs.getFileSystemId(),
FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
AbfsHttpOperation abfsHttpOperation
= client.getPathStatus(path.toUri().getPath(), true,
tracingContext, null).getResult();
Assertions.assertThat(abfsHttpOperation.getStatusCode())
.describedAs("Path should be found.")
.isEqualTo(HTTP_OK);
Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
.describedAs("Etag should be present.")
.isNotNull();
Assertions.assertThat(redoRenameCall.get())
.describedAs("There should be no redo rename call.")
.isEqualTo(0);
Assertions.assertThat(fs.exists(new Path(path.getParent(), path.getName() + SUFFIX)))
.describedAs("Directory with suffix -RenamePending.json should exist.")
.isTrue();
}
}
/**
* Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying
* after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt,
* checks that the operation correctly retries using the appropriate transaction ID,
* and ensures that the source file is renamed to the destination path once successful.
*
* @throws Exception if an error occurs during the file system operations or mocking
*/
@Test
public void testRenamePathRetryIdempotency() throws Exception {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID, "true");
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(false);
AbfsDfsClient abfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsClient);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
final List<AbfsHttpHeader> headers = new ArrayList<>();
mockRetriedRequest(abfsClient, headers, 0);
AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doAnswer(answer -> {
String requiredHeader = null;
for (AbfsHttpHeader httpHeader : headers) {
if (X_MS_CLIENT_TRANSACTION_ID.equalsIgnoreCase(
httpHeader.getName())) {
requiredHeader = httpHeader.getValue();
break;
}
}
return requiredHeader;
}).when(op).getResponseHeader(X_MS_CLIENT_TRANSACTION_ID);
Mockito.doReturn(true).when(getPathRestOp).hasResult();
Mockito.doReturn(op).when(getPathRestOp).getResult();
Mockito.doReturn(DIRECTORY)
.when(op)
.getResponseHeader(X_MS_RESOURCE_TYPE);
Mockito.doReturn(getPathRestOp).when(abfsClient).getPathStatus(
Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class));
Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
.describedAs("Rename should succeed.")
.isTrue();
}
}
/**
* Test to verify that the client transaction ID is included in the response header
* after renaming a file in Azure Blob Storage.
*
* This test ensures that when a file is renamed, the Azure Blob FileSystem client
* properly includes the client transaction ID in the response header for the renamed file.
* The test uses a configuration where client transaction ID is enabled and verifies
* its presence after performing a rename operation.
*
* @throws Exception if any error occurs during test execution
*/
@Test
public void testGetClientTransactionIdAfterRename() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(false);
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsDfsClient);
final String[] clientTransactionId = new String[1];
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
fs.rename(sourceFilePath, destFilePath);
final AbfsHttpOperation getPathStatusOp =
abfsDfsClient.getPathStatus(destFilePath.toUri().getPath(), false,
getTestTracingContext(fs, true), null).getResult();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction id should be present in dest file")
.isNotNull();
Assertions.assertThat(
getPathStatusOp.getResponseHeader(X_MS_CLIENT_TRANSACTION_ID))
.describedAs("Client transaction ID should be equal to the one set in the header")
.isEqualTo(clientTransactionId[0]);
}
}
/**
* Tests the recovery process during a file rename operation in Azure Blob File System when
* the `getPathStatus` method encounters a timeout exception. The test ensures that the proper
* error message is returned when the operation fails during recovery.
*
* @throws Exception If an error occurs during the test setup or execution.
*/
@Test
public void testFailureInGetPathStatusDuringRenameRecovery() throws Exception {
try (AzureBlobFileSystem fs = getFileSystem()) {
assumeRecoveryThroughClientTransactionID(false);
AbfsDfsClient abfsDfsClient = (AbfsDfsClient) Mockito.spy(fs.getAbfsClient());
fs.getAbfsStore().setClient(abfsDfsClient);
final String[] clientTransactionId = new String[1];
mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
mockRetriedRequest(abfsDfsClient, new ArrayList<>(), 1);
int[] flag = new int[1];
Mockito.doAnswer(getPathStatus -> {
if (flag[0] == 1) {
flag[0] += 1;
throw new AbfsRestOperationException(HTTP_CLIENT_TIMEOUT, "", "", new Exception());
}
flag[0] += 1;
return getPathStatus.callRealMethod();
}).when(abfsDfsClient).getPathStatus(
Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class));
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
String errorMessage = intercept(AbfsDriverException.class,
() -> fs.rename(sourceFilePath, destFilePath)).getErrorMessage();
Assertions.assertThat(errorMessage)
.describedAs("getPathStatus should fail while recovering")
.contains(ERR_RENAME_RECOVERY);
}
}
/**
* Tests renaming a directory when the destination parent directory does not exist.
* The test verifies that the rename operation fails as expected.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameWithDestParentNotExist() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
Path src = new Path("/A1/A2");
Path dst = new Path("/A3/A4");
fs.create(new Path(src, "file.txt"));
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should fail as destination parent not exist.")
.isFalse();
}
}
/**
* Tests renaming a directory when the destination parent directory is the root.
* The test verifies that the rename operation succeeds as expected.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameWithDestParentAsRoot() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
Path src = new Path("/A1/A2");
Path dst = new Path("/A3");
fs.create(new Path(src, "file.txt"));
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path(dst, "file.txt")))
.describedAs("File should exist in destination directory.")
.isTrue();
}
}
/**
* Tests renaming a file when the destination is root.
* The test verifies that the rename operation succeeds as expected.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testFileRenameWithDestAsRoot() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
Path src = new Path("/A1/A2/file.txt");
Path dst = new Path("/");
fs.create(src);
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path(dst, "file.txt")))
.describedAs("File should exist in root.")
.isTrue();
}
}
/**
* Tests renaming a directory when the destination is root.
* The test verifies that the rename operation succeeds as expected.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testDirRenameWithDestAsRoot() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
Path src = new Path("/A1/A2");
Path dst = new Path("/");
fs.create(new Path(src, "file.txt"));
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path(dst, src.getName())))
.describedAs("A2 directory should exist in root.")
.isTrue();
}
}
/**
* Tests the rename operation with multiple directories in the source path.
* This test verifies that the rename operation correctly handles
* multiple directories and files, ensuring that the source directory
* is renamed to the destination path.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameWithMultipleDirsInSource() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
assumeBlobServiceType();
fs.mkdirs(new Path("/testDir/dir1"));
for (int i = 0; i < 10; i++) {
fs.create(new Path("/testDir/dir1/file" + i));
}
fs.mkdirs(new Path("/testDir/dir2"));
fs.create(new Path("/testDir/dir2/file2"));
createAzCopyFolder(new Path("/testDir/dir3"));
Assertions.assertThat(fs.rename(new Path("/testDir"),
new Path("/testDir2")))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path("/testDir")))
.describedAs("Old directory should not exist.")
.isFalse();
Assertions.assertThat(fs.exists(new Path("/testDir2")))
.describedAs("New directory should exist.")
.isTrue();
}
}
/**
* Tests the rename operation with multiple implicit directories in the source path.
* This test verifies that the rename operation correctly handles
* multiple directories and files, ensuring that the source directory
* is renamed to the destination path.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameWithMultipleImplicitDirsInSource() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
assumeBlobServiceType();
createAzCopyFolder(new Path("/testDir/dir1"));
for (int i = 0; i < 10; i++) {
createAzCopyFile(new Path("/testDir/dir1/file" + i));
}
createAzCopyFolder(new Path("/testDir/dir2"));
createAzCopyFile(new Path("/testDir/dir2/file2"));
createAzCopyFolder(new Path("/testDir/dir3"));
Assertions.assertThat(fs.rename(new Path("/testDir"),
new Path("/testDir2")))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path("/testDir")))
.describedAs("Old directory should not exist.")
.isFalse();
Assertions.assertThat(fs.exists(new Path("/testDir2")))
.describedAs("New directory should exist.")
.isTrue();
}
}
/**
* Tests renaming a directory with an explicit directory in the source path.
* This test verifies that the rename operation correctly handles
* the explicit directory and files, ensuring that the source directory
* is renamed to the destination path.
*
* @throws Exception if an error occurs during the test execution
*/
@Test
public void testRenameWithExplicitDirInSource() throws Exception {
try (AzureBlobFileSystem fs = this.getFileSystem()) {
assumeBlobServiceType();
fs.create(new Path("/testDir/dir3/file2"));
fs.create(new Path("/testDir/dir3/file1"));
Assertions.assertThat(fs.rename(new Path("/testDir"),
new Path("/testDir2")))
.describedAs("Rename should succeed.")
.isTrue();
Assertions.assertThat(fs.exists(new Path("/testDir")))
.describedAs("Old directory should not exist.")
.isFalse();
Assertions.assertThat(fs.exists(new Path("/testDir2")))
.describedAs("New directory should exist.")
.isTrue();
}
}
/**
* Spies on the AzureBlobFileSystem's store and client to enable mocking and verification
* of client interactions in tests. It replaces the actual store and client with mocked versions.
*
* @param fs the AzureBlobFileSystem instance
* @return the spied AbfsClient for interaction verification
*/
private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
AbfsClient client = Mockito.spy(store.getClient());
Mockito.doReturn(client).when(store).getClient();
return client;
}
/**
* Simulates a rename failure, performs a recovery action, and verifies that the "RenamePendingJson"
* file is deleted. It checks that the rename operation is successfully completed after recovery.
*
* @param fs the AzureBlobFileSystem instance
* @param client the AbfsBlobClient instance
* @param srcPath the source path for the rename operation
* @param recoveryCallable the recovery action to perform
* @throws Exception if an error occurs during recovery or verification
*/
private void crashRenameAndRecover(final AzureBlobFileSystem fs,
AbfsBlobClient client,
final String srcPath,
final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryCallable)
throws Exception {
crashRename(fs, client, srcPath);
AzureBlobFileSystem fs2 = Mockito.spy(getFileSystem());
fs2.setWorkingDirectory(new Path(ROOT_PATH));
client = (AbfsBlobClient) addSpyHooksOnClient(fs2);
int[] renameJsonDeleteCounter = new int[1];
Mockito.doAnswer(answer -> {
if ((ROOT_PATH + srcPath + SUFFIX)
.equalsIgnoreCase(((Path) answer.getArgument(0)).toUri().getPath())) {
renameJsonDeleteCounter[0] = 1;
}
return answer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
recoveryCallable.apply(fs2);
Assertions.assertThat(renameJsonDeleteCounter[0])
.describedAs("RenamePendingJson should be deleted")
.isEqualTo(1);
//List would complete the rename orchestration.
assertFalse(fs2.exists(new Path("hbase/test1/test2")));
assertFalse(fs2.exists(new Path("hbase/test1/test2/test3")));
assertTrue(fs2.exists(new Path("hbase/test4/test2/test3")));
assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file")));
assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file")));
assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1")));
assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1")));
}
/**
* Simulates a rename failure by triggering an `AbfsRestOperationException` during the rename process.
* It intercepts the exception and ensures that all leases acquired during the atomic rename are released.
*
* @param fs the AzureBlobFileSystem instance used for the rename operation
* @param client the AbfsBlobClient instance used for mocking the rename failure
* @param srcPath the source path for the rename operation
* @throws Exception if an error occurs during the simulated failure or lease release
*/
private void crashRename(final AzureBlobFileSystem fs,
final AbfsBlobClient client,
final String srcPath) throws Exception {
BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1];
AbfsClientTestUtil.mockGetRenameBlobHandler(client,
blobRenameHandler -> {
blobRenameHandlers[0] = blobRenameHandler;
return null;
});
//Fail rename orchestration on path hbase/test1/test2/test3/file1
Mockito.doThrow(new AbfsRestOperationException(HTTP_FORBIDDEN, "", "",
new Exception()))
.when(client)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
LambdaTestUtils.intercept(AccessDeniedException.class, () -> {
fs.rename(new Path(srcPath),
new Path("hbase/test4"));
});
//Release all the leases taken by atomic rename orchestration
List<AbfsLease> leases = new ArrayList<>(blobRenameHandlers[0].getLeases());
for (AbfsLease lease : leases) {
lease.free();
}
}
/**
* A helper method to set up the test environment and execute the common logic for handling
* failed rename operations and recovery in HBase. This method performs the necessary setup
* (creating directories and files) and then triggers the `crashRenameAndRecover` method
* with a provided recovery action.
* This method is used by different tests that require different recovery actions, such as
* performing `listStatus` or checking the existence of a path after a failed rename.
*
* @param fs the AzureBlobFileSystem instance to be used in the test
* @param client the AbfsBlobClient instance to be used in the test
* @param srcPath the source path for the rename operation
* @param failedCopyPath the path that simulates a failed copy during rename
* @param recoveryAction the specific recovery action to be performed after the rename failure
* (e.g., listing directory status or checking path existence)
* @throws Exception if any error occurs during setup or execution of the recovery action
*/
private void setupAndTestHBaseFailedRenameRecovery(
final AzureBlobFileSystem fs,
final AbfsBlobClient client,
final String srcPath,
final String failedCopyPath,
final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryAction)
throws Exception {
fs.setWorkingDirectory(new Path("/"));
fs.mkdirs(new Path(srcPath));
fs.mkdirs(new Path(srcPath, "test3"));
fs.create(new Path(srcPath + "/test3/file"));
fs.create(new Path(failedCopyPath));
fs.mkdirs(new Path("hbase/test4/"));
fs.create(new Path("hbase/test4/file1"));
crashRenameAndRecover(fs, client, srcPath, recoveryAction);
}
/**
* Tests renaming a directory in AzureBlobFileSystem when the creation of the "RenamePendingJson"
* file fails on the first attempt. It ensures the renaming operation is retried.
* The test verifies that the creation of the "RenamePendingJson" file is attempted twice:
* once on failure and once on retry.
*
* @param fs the AzureBlobFileSystem instance for the test
* @throws Exception if an error occurs during the test
*/
private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs)
throws Exception {
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
Path src = new Path("hbase/test1/test2");
Path dest = new Path("hbase/test4");
fs.mkdirs(src);
fs.mkdirs(new Path(src, "test3"));
final int[] renamePendingJsonWriteCounter = new int[1];
/*
* Fail the creation of RenamePendingJson file on the first attempt.
*/
Answer renamePendingJsonCreateAns = createAnswer -> {
Path path = createAnswer.getArgument(0);
Mockito.doAnswer(clientFlushAns -> {
if (renamePendingJsonWriteCounter[0]++ == 0) {
fs.delete(path, true);
}
return clientFlushAns.callRealMethod();
})
.when(client)
.flush(Mockito.any(byte[].class), Mockito.anyString(),
Mockito.anyBoolean(), Mockito.nullable(String.class),
Mockito.nullable(String.class), Mockito.anyString(),
Mockito.nullable(ContextEncryptionAdapter.class),
Mockito.any(TracingContext.class), Mockito.anyString());
return createAnswer.callRealMethod();
};
RenameAtomicityTestUtils.addCreatePathMock(client,
renamePendingJsonCreateAns);
fs.rename(src, dest);
Assertions.assertThat(renamePendingJsonWriteCounter[0])
.describedAs("Creation of RenamePendingJson should be attempted twice")
.isEqualTo(2);
}
/**
* Tests the behavior of the redo operation when an invalid "RenamePendingJson" file exists.
* It verifies that the file is deleted and that no copy operation is performed.
* The test simulates a scenario where the "RenamePendingJson" file is partially written and
* ensures that the `redo` method correctly deletes the file and does not trigger a copy operation.
*
* @param fs the AzureBlobFileSystem instance for the test
* @throws Exception if an error occurs during the test
*/
private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs)
throws Exception {
AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
Path path = new Path("/hbase/test1/test2");
fs.mkdirs(new Path(path, "test3"));
Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
OutputStream os = fs.create(renameJson);
os.write("{".getBytes(StandardCharsets.UTF_8));
os.close();
int[] renameJsonDeleteCounter = new int[1];
Mockito.doAnswer(deleteAnswer -> {
Path ansPath = deleteAnswer.getArgument(0);
if (renameJson.toUri()
.getPath()
.equalsIgnoreCase(ansPath.toUri().getPath())) {
renameJsonDeleteCounter[0]++;
}
return deleteAnswer.callRealMethod();
})
.when(client)
.deleteBlobPath(Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
new RenameAtomicity(renameJson, 1,
getTestTracingContext(fs, true), null, client).redo();
Assertions.assertThat(renameJsonDeleteCounter[0])
.describedAs("RenamePendingJson should be deleted")
.isEqualTo(1);
Mockito.verify(client, Mockito.times(0)).copyBlob(Mockito.any(Path.class),
Mockito.any(Path.class), Mockito.nullable(String.class),
Mockito.any(TracingContext.class));
}
/**
* Mocks the progress status for a copy blob operation.
* This method simulates a copy operation that is pending and not yet completed.
* It intercepts the `copyBlob` method and modifies its response to return a "COPY_STATUS_PENDING"
* status for the copy operation.
*
* @param spiedClient The {@link AbfsBlobClient} instance that is being spied on.
* @throws AzureBlobFileSystemException if the mock setup fails.
*/
private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient spiedClient)
throws AzureBlobFileSystemException {
Mockito.doAnswer(answer -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) answer.callRealMethod());
AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
HttpHeaderConfigurations.X_MS_COPY_STATUS);
Mockito.doReturn(httpOp).when(op).getResult();
return op;
})
.when(spiedClient)
.copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
Mockito.nullable(String.class), Mockito.any(TracingContext.class));
}
/**
* Mocks the final status of a blob copy operation.
* This method ensures that when checking the status of a copy operation in progress,
* it returns the specified final status (e.g., success, failure, aborted).
*
* @param spiedClient The mocked Azure Blob client to apply the mock behavior.
* @param requiredCopyFinalStatus The final status of the copy operation to be returned
* (e.g., COPY_STATUS_FAILED, COPY_STATUS_ABORTED).
*/
private void addMockForCopyOperationFinalStatus(final AbfsBlobClient spiedClient,
final String requiredCopyFinalStatus) {
AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient,
blobRenameHandler -> {
Mockito.doAnswer(onHandleCopyInProgress -> {
Path handlePath = onHandleCopyInProgress.getArgument(0);
TracingContext tracingContext = onHandleCopyInProgress.getArgument(
1);
Mockito.doAnswer(onStatusCheck -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) onStatusCheck.callRealMethod());
AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
Mockito.doReturn(requiredCopyFinalStatus)
.when(httpOp)
.getResponseHeader(
HttpHeaderConfigurations.X_MS_COPY_STATUS);
Mockito.doReturn(httpOp).when(op).getResult();
return op;
})
.when(spiedClient)
.getPathStatus(handlePath.toUri().getPath(),
tracingContext, null, false);
return onHandleCopyInProgress.callRealMethod();
})
.when(blobRenameHandler)
.handleCopyInProgress(Mockito.any(Path.class),
Mockito.any(TracingContext.class), Mockito.any(String.class));
return null;
});
}
/**
* Helper method to configure the AzureBlobFileSystem and rename directories.
*
* @param currentFs The current AzureBlobFileSystem to use for renaming.
* @param producerQueueSize Maximum size of the producer queue.
* @param consumerMaxLag Maximum lag allowed for the consumer.
* @param maxThread Maximum threads for the rename operation.
* @param src The source path of the directory to rename.
* @param dst The destination path of the renamed directory.
* @throws IOException If an I/O error occurs during the operation.
*/
private void renameDir(AzureBlobFileSystem currentFs, String producerQueueSize,
String consumerMaxLag, String maxThread, Path src, Path dst)
throws Exception {
Configuration config = new Configuration(this.getRawConfiguration());
config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) {
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should succeed.")
.isTrue();
}
}
/**
* Performs the rename operation and validates the existence of the directories and files.
*
* @param fs the AzureBlobFileSystem instance
* @param src the source path to be renamed
* @param dst the destination path for the rename
* @param fileName the name of the file to be renamed
*/
private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path dst, String fileName)
throws IOException {
// Assert the source directory exists
Assertions.assertThat(fs.exists(src))
.describedAs("Old directory should exist before rename")
.isTrue();
// Perform rename
Assertions.assertThat(fs.rename(src, dst))
.describedAs("Rename should succeed.")
.isTrue();
// Assert the destination directory and file exist after rename
Assertions.assertThat(fs.exists(new Path(dst, fileName)))
.describedAs("Rename should be successful")
.isTrue();
// Assert the source directory no longer exists
Assertions.assertThat(fs.exists(src))
.describedAs("Old directory should not exist")
.isFalse();
// Assert the new destination directory exists
Assertions.assertThat(fs.exists(dst))
.describedAs("New directory should exist")
.isTrue();
}
/**
* Validates the atomic rename key for a specific path.
*
* @param abfsBlobClient the AbfsBlobClient instance
* @param path the path to check for atomic rename key
* @param expected the expected value (true or false)
*/
private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String path, boolean expected) {
Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path))
.describedAs("Atomic rename key check for path: " + path)
.isEqualTo(expected);
}
/**
* Mocks the retry behavior for an AbfsDfsClient request. The method intercepts
* the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the
* first invocation. It creates a mock HTTP operation with a PUT method and
* specific status codes and error messages.
*
* @param abfsDfsClient The AbfsDfsClient to mock operations for.
* @param headers The list of HTTP headers to which request headers will be added.
*
* @throws Exception If an error occurs during mock creation or operation execution.
*/
private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
final List<AbfsHttpHeader> headers, int failedCall) throws Exception {
TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
new MockIntercept<AbfsRestOperation>() {
private int count = 0;
@Override
public void answer(final AbfsRestOperation mockedObj,
final InvocationOnMock answer)
throws AbfsRestOperationException {
if (count == 0) {
count = 1;
AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
.getStorageErrorCode();
Mockito.doReturn(true).when(mockedObj).hasResult();
Mockito.doReturn(op).when(mockedObj).getResult();
Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
headers.addAll(mockedObj.getRequestHeaders());
throw new AbfsRestOperationException(HTTP_NOT_FOUND,
SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, op);
}
}
}, failedCall);
}
}