BlobRenameHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
/**
* Orchestrator for rename over Blob endpoint. Handles both directory and file
* renames. Blob Endpoint does not expose rename API, this class is responsible
* for copying the blobs and deleting the source blobs.
* <p>
* For directory rename, it recursively lists the blobs in the source directory and
* copies them to the destination directory.
*/
public class BlobRenameHandler extends ListActionTaker {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private final String srcEtag;
private final Path src, dst;
private final boolean isAtomicRename, isAtomicRenameRecovery;
private final TracingContext tracingContext;
private AbfsLease srcAbfsLease;
private String srcLeaseId;
private final List<AbfsLease> leases = new ArrayList<>();
private final AtomicInteger operatedBlobCount = new AtomicInteger(0);
/** Constructor.
*
* @param src source path
* @param dst destination path
* @param abfsClient AbfsBlobClient to use for the rename operation
* @param srcEtag eTag of the source path
* @param isAtomicRename true if the rename operation is atomic
* @param isAtomicRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
* @param tracingContext object of tracingContext used for the tracing of the server calls.
*/
public BlobRenameHandler(final String src,
final String dst,
final AbfsBlobClient abfsClient,
final String srcEtag,
final boolean isAtomicRename,
final boolean isAtomicRenameRecovery,
final TracingContext tracingContext) {
super(new Path(src), abfsClient, tracingContext);
this.srcEtag = srcEtag;
this.tracingContext = tracingContext;
this.src = new Path(src);
this.dst = new Path(dst);
this.isAtomicRename = isAtomicRename;
this.isAtomicRenameRecovery = isAtomicRenameRecovery;
}
/** {@inheritDoc} */
@Override
int getMaxConsumptionParallelism() {
return getAbfsClient().getAbfsConfiguration()
.getBlobRenameDirConsumptionParallelism();
}
/**
* Orchestrates the rename operation.
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @return AbfsClientRenameResult containing the result of the rename operation
* @throws AzureBlobFileSystemException if server call fails
*/
public boolean execute(final boolean isRenameRecovery) throws AzureBlobFileSystemException {
PathInformation pathInformation = getPathInformation(src, tracingContext);
boolean result = false;
if (preCheck(src, dst, pathInformation, isRenameRecovery)) {
RenameAtomicity renameAtomicity = null;
if (pathInformation.getIsDirectory()
&& pathInformation.getIsImplicit()) {
try {
AbfsRestOperation createMarkerOp = getAbfsClient().createMarkerAtPath(
src.toUri().getPath(), null, null, tracingContext);
pathInformation.setETag(
extractEtagHeader(createMarkerOp.getResult()));
} catch (AbfsRestOperationException ex) {
LOG.debug("Marker creation failed for src path {} ", src.toUri().getPath());
}
}
try {
if (isAtomicRename) {
/*
* Conditionally get a lease on the source blob to prevent other writers
* from changing it. This is used for correctness in HBase when log files
* are renamed. When the HBase master renames a log file folder, the lease
* locks out other writers. This prevents a region server that the master
* thinks is dead, but is still alive, from committing additional updates.
* This is different than when HBase runs on HDFS, where the region server
* recovers the lease on a log file, to gain exclusive access to it, before
* it splits it.
*/
getAbfsClient().getAbfsCounters()
.incrementCounter(AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS, 1);
if (srcAbfsLease == null) {
srcAbfsLease = takeLease(src, srcEtag);
}
srcLeaseId = srcAbfsLease.getLeaseID();
if (!isAtomicRenameRecovery && pathInformation.getIsDirectory()) {
/*
* if it is not a resume of a previous failed atomic rename operation,
* create renameJson file.
*/
renameAtomicity = getRenameAtomicity(pathInformation);
renameAtomicity.preRename();
}
}
if (pathInformation.getIsDirectory()) {
result = listRecursiveAndTakeAction() && finalSrcRename();
} else {
result = renameInternal(src, dst);
}
} finally {
if (srcAbfsLease != null) {
// If the operation is successful, cancel the timer and no need to release
// the lease as rename on the blob-path has taken place.
if (result) {
srcAbfsLease.cancelTimer();
} else {
srcAbfsLease.free();
}
}
}
if (result && renameAtomicity != null) {
renameAtomicity.postRename();
}
}
return result;
}
/** Final rename operation after all the blobs have been copied.
*
* @return true if rename is successful
* @throws AzureBlobFileSystemException if server call fails
*/
private boolean finalSrcRename() throws AzureBlobFileSystemException {
tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
try {
return renameInternal(src, dst);
} catch(AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// If the destination path already exists, then delete the source path.
getAbfsClient().deleteBlobPath(src, null, tracingContext);
return true;
}
throw e;
} finally {
tracingContext.setOperatedBlobCount(null);
}
}
/** Gets the rename atomicity object.
*
* @param pathInformation object containing the path information of the source path
*
* @return RenameAtomicity object
*/
@VisibleForTesting
public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation) {
return new RenameAtomicity(src,
dst,
new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX),
tracingContext,
pathInformation.getETag(),
getAbfsClient());
}
/** Takes a lease on the path.
*
* @param path path on which the lease is to be taken
* @param eTag eTag of the path
*
* @return object containing the lease information
* @throws AzureBlobFileSystemException if server call fails
*/
private AbfsLease takeLease(final Path path, final String eTag)
throws AzureBlobFileSystemException {
AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(),
false,
getAbfsClient().getAbfsConfiguration()
.getAtomicRenameLeaseRefreshDuration(),
eTag, tracingContext);
leases.add(lease);
return lease;
}
/** Checks if the path contains a colon.
*
* @param p path to check
*
* @return true if the path contains a colon
*/
private boolean containsColon(Path p) {
return p.toUri().getPath().contains(COLON);
}
/**
* Since, server doesn't have a rename API and would not be able to check HDFS
* contracts, client would have to ensure that no HDFS contract is violated.
*
* @param src source path
* @param dst destination path
* @param pathInformation object in which path information of the source path would be stored
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @return true if the pre-checks pass
* @throws AzureBlobFileSystemException if server call fails or given paths are invalid.
*/
private boolean preCheck(final Path src, final Path dst,
final PathInformation pathInformation, final boolean isRenameRecovery)
throws AzureBlobFileSystemException {
validateDestinationIsNotSubDir(src, dst);
validateSourcePath(pathInformation);
validateDestinationPathNotExist(src, dst, pathInformation, isRenameRecovery);
validateDestinationParentExist(src, dst, pathInformation);
return true;
}
/**
* Validate if the destination path is not a sub-directory of the source path.
*
* @param src source path
* @param dst destination path
*/
private void validateDestinationIsNotSubDir(final Path src,
final Path dst) throws AbfsRestOperationException {
LOG.debug("Check if the destination is subDirectory");
Path nestedDstParent = dst.getParent();
if (nestedDstParent != null && nestedDstParent.toUri()
.getPath()
.indexOf(src.toUri().getPath()) == 0) {
LOG.info("Rename src: {} dst: {} failed as dst is subDir of src",
src, dst);
throw new AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT,
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode(),
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorMessage(),
new Exception(
AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode()));
}
}
/**
* Validate if the source path exists and if the client knows the ETag of the source path,
* then the ETag should match with the server.
*
* @param pathInformation object containing the path information of the source path
*
* @throws AbfsRestOperationException if the source path is not found or if the ETag of the source
* path does not match with the server.
*/
private void validateSourcePath(final PathInformation pathInformation)
throws AzureBlobFileSystemException {
if (!pathInformation.getPathExists()) {
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_NOT_FOUND,
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null,
new Exception(
AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode()));
}
if (srcEtag != null && !srcEtag.equals(pathInformation.getETag())) {
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_CONFLICT,
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
new Exception(
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode()));
}
}
/** Validate if the destination path does not exist.
*
* @param src source path
* @param dst destination path
* @param pathInformation object containing the path information of the source path
* @param isRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation
*
* @throws AbfsRestOperationException if the destination path already exists
*/
private void validateDestinationPathNotExist(final Path src,
final Path dst, final PathInformation pathInformation,
final boolean isRenameRecovery) throws AzureBlobFileSystemException {
/*
* Destination path name can be same to that of source path name only in the
* case of a directory rename.
*
* In case the directory is being renamed to some other name, the destination
* check would happen on the AzureBlobFileSystem#rename method.
*/
if (!isRenameRecovery && pathInformation.getIsDirectory()
&& dst.getName().equals(src.getName())) {
PathInformation dstPathInformation = getPathInformation(
dst,
tracingContext);
if (dstPathInformation.getPathExists()) {
LOG.info(
"Rename src: {} dst: {} failed as qualifiedDst already exists",
src, dst);
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_CONFLICT,
AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null,
null);
}
}
}
/** Validate if the parent of the destination path exists.
*
* @param src source path
* @param dst destination path
* @param pathInformation object containing the path information of the source path
*
* @throws AbfsRestOperationException if the parent of the destination path does not exist
*/
private void validateDestinationParentExist(final Path src,
final Path dst,
final PathInformation pathInformation)
throws AzureBlobFileSystemException {
final Path nestedDstParent = dst.getParent();
if (!dst.isRoot() && nestedDstParent != null && !nestedDstParent.isRoot()
&& (
!pathInformation.getIsDirectory() || !dst.getName()
.equals(src.getName()))) {
PathInformation nestedDstInfo = getPathInformation(
nestedDstParent,
tracingContext);
if (!nestedDstInfo.getPathExists() || !nestedDstInfo.getIsDirectory()) {
throw new AbfsRestOperationException(
HttpURLConnection.HTTP_NOT_FOUND,
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode(), null,
new Exception(
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()));
}
}
}
/** {@inheritDoc} */
@Override
boolean takeAction(final Path path) throws AzureBlobFileSystemException {
return renameInternal(path, getDstPathForBlob(dst, path, src));
}
/** Renames the source path to the destination path.
*
* @param path source path
* @param destinationPathForBlobPartOfRenameSrcDir destination path
*
* @return true if rename is successful
* @throws AzureBlobFileSystemException if server call fails
*/
private boolean renameInternal(final Path path,
final Path destinationPathForBlobPartOfRenameSrcDir)
throws AzureBlobFileSystemException {
final String leaseId;
AbfsLease abfsLease = null;
if (isAtomicRename) {
/*
* To maintain atomicity of rename of the path, lease is taken on the path.
*/
if (path.equals(src)) {
abfsLease = srcAbfsLease;
leaseId = srcLeaseId;
} else {
abfsLease = takeLease(path, null);
leaseId = abfsLease.getLeaseID();
}
} else {
leaseId = null;
}
boolean operated = false;
try {
copyPath(path, destinationPathForBlobPartOfRenameSrcDir, leaseId);
getAbfsClient().deleteBlobPath(path, leaseId, tracingContext);
operated = true;
} finally {
if (abfsLease != null) {
// If the operation is successful, cancel the timer and no need to release
// the lease as delete on the blob-path has taken place.
if (operated) {
abfsLease.cancelTimer();
} else {
abfsLease.free();
}
}
}
operatedBlobCount.incrementAndGet();
return true;
}
/** Copies the source path to the destination path.
*
* @param src source path
* @param dst destination path
* @param leaseId lease id for the source path
*
* @throws AzureBlobFileSystemException if server call fails
*/
private void copyPath(final Path src, final Path dst, final String leaseId)
throws AzureBlobFileSystemException {
String copyId;
try {
AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, leaseId,
tracingContext);
final String progress = copyPathOp.getResult()
.getResponseHeader(X_MS_COPY_STATUS);
if (COPY_STATUS_SUCCESS.equalsIgnoreCase(progress)) {
return;
}
copyId = copyPathOp.getResult()
.getResponseHeader(X_MS_COPY_ID);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
AbfsRestOperation dstPathStatus = getAbfsClient().getPathStatus(
dst.toUri().getPath(),
tracingContext, null, false);
final String srcCopyPath = ROOT_PATH + getAbfsClient().getFileSystem()
+ src.toUri().getPath();
if (dstPathStatus != null && dstPathStatus.getResult() != null
&& (srcCopyPath.equals(getDstSource(dstPathStatus)))) {
return;
}
}
throw ex;
}
final long pollWait = getAbfsClient().getAbfsConfiguration()
.getBlobCopyProgressPollWaitMillis();
final long maxWait = getAbfsClient().getAbfsConfiguration()
.getBlobCopyProgressMaxWaitMillis();
long startTime = System.currentTimeMillis();
while (handleCopyInProgress(dst, tracingContext, copyId)
== BlobCopyProgress.PENDING) {
if (System.currentTimeMillis() - startTime > maxWait) {
throw new TimeoutException(
String.format("Blob copy progress wait time exceeded "
+ "for source: %s and destination: %s", src, dst));
}
try {
Thread.sleep(pollWait);
} catch (InterruptedException ignored) {
}
}
}
/** Gets the source path of the copy operation.
*
* @param dstPathStatus server response for the GetBlobProperties API on the
* destination path.
*
* @return source path of the copy operation
*/
private String getDstSource(final AbfsRestOperation dstPathStatus) {
try {
String responseHeader = dstPathStatus.getResult()
.getResponseHeader(X_MS_COPY_SOURCE);
if (responseHeader == null) {
return null;
}
return new URL(responseHeader).toURI().getPath();
} catch (URISyntaxException | MalformedURLException e) {
throw new RuntimeException(e);
}
}
/**
* Verifies if the blob copy is success or a failure or still in progress.
*
* @param dstPath path of the destination for the copying
* @param tracingContext object of tracingContext used for the tracing of the
* server calls.
* @param copyId id returned by server on the copy server-call. This id gets
* attached to blob and is returned by GetBlobProperties API on the destination.
*
* @return BlobCopyProgress indicating the status of the copy operation
*
* @throws AzureBlobFileSystemException exception returned in making server call
* for GetBlobProperties on the path. It can be thrown if the copyStatus is failure
* or is aborted.
*/
@VisibleForTesting
public BlobCopyProgress handleCopyInProgress(final Path dstPath,
final TracingContext tracingContext,
final String copyId) throws AzureBlobFileSystemException {
AbfsRestOperation op = getAbfsClient().getPathStatus(
dstPath.toUri().getPath(),
tracingContext, null, false);
if (op.getResult() != null && copyId != null
&& copyId.equals(op.getResult().getResponseHeader(X_MS_COPY_ID))) {
final String copyStatus = op.getResult()
.getResponseHeader(X_MS_COPY_STATUS);
if (COPY_STATUS_SUCCESS.equalsIgnoreCase(copyStatus)) {
return BlobCopyProgress.SUCCESS;
}
if (COPY_STATUS_FAILED.equalsIgnoreCase(copyStatus)) {
throw new AbfsRestOperationException(
COPY_BLOB_FAILED.getStatusCode(), COPY_BLOB_FAILED.getErrorCode(),
String.format("copy to path %s failed due to: %s",
dstPath.toUri().getPath(),
op.getResult().getResponseHeader(X_MS_COPY_STATUS_DESCRIPTION)),
new Exception(COPY_BLOB_FAILED.getErrorCode()));
}
if (COPY_STATUS_ABORTED.equalsIgnoreCase(copyStatus)) {
throw new AbfsRestOperationException(
COPY_BLOB_ABORTED.getStatusCode(), COPY_BLOB_ABORTED.getErrorCode(),
String.format("copy to path %s aborted", dstPath.toUri().getPath()),
new Exception(COPY_BLOB_ABORTED.getErrorCode()));
}
}
return BlobCopyProgress.PENDING;
}
/**
* Translates the destination path for a blob part of a source directory getting
* renamed.
*
* @param destinationDir destination directory for the rename operation
* @param blobPath path of blob inside sourceDir being renamed.
* @param sourceDir source directory for the rename operation
*
* @return translated path for the blob
*/
private Path getDstPathForBlob(final Path destinationDir,
final Path blobPath, final Path sourceDir) {
String destinationPathStr = destinationDir.toUri().getPath();
String sourcePathStr = sourceDir.toUri().getPath();
String srcBlobPropertyPathStr = blobPath.toUri().getPath();
if (sourcePathStr.equals(srcBlobPropertyPathStr)) {
return destinationDir;
}
return new Path(
destinationPathStr + ROOT_PATH + srcBlobPropertyPathStr.substring(
sourcePathStr.length()));
}
/** Get information of the path.
*
* @param path path for which the path information is to be fetched
* @param tracingContext object of tracingContext used for the tracing of the
* server calls.
*
* @return object containing the path information
* @throws AzureBlobFileSystemException if server call fails
*/
private PathInformation getPathInformation(Path path,
TracingContext tracingContext)
throws AzureBlobFileSystemException {
try {
AbfsRestOperation op = getAbfsClient().getPathStatus(path.toString(),
tracingContext, null, true);
return new PathInformation(true,
getAbfsClient().checkIsDir(op.getResult()),
extractEtagHeader(op.getResult()),
op.getResult() instanceof AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus);
} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
return new PathInformation(false, false, null, false);
}
throw e;
}
}
@VisibleForTesting
public List<AbfsLease> getLeases() {
return leases;
}
}