RenameAtomicity.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
import static org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;
/**
* For a directory enabled for atomic-rename, before rename starts, a file with
* -RenamePending.json suffix is created. In this file, the states required for the
* rename operation are given. This file is created by {@link #preRename()} method.
* This is important in case the JVM process crashes during rename, the atomicity
* will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)}
* or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem,
* it will be checked if there is any RenamePending JSON file. If yes, the crashed rename
* operation would be resumed as per the file.
*/
public class RenameAtomicity {
private final TracingContext tracingContext;
private Path src, dst;
private String srcEtag;
private final AbfsBlobClient abfsClient;
private final Path renameJsonPath;
public static final String SUFFIX = "-RenamePending.json";
private int preRenameRetryCount = 0;
private int renamePendingJsonLen;
private final ObjectMapper objectMapper = new ObjectMapper();
private static final Random RANDOM = new Random();
/**
* Performs pre-rename operations. Creates a file with -RenamePending.json
* suffix in the source parent directory. This file contains the states
* required for the rename operation.
*
* @param src Source path
* @param dst Destination path
* @param renameJsonPath Path of the JSON file to be created
* @param tracingContext Tracing context
* @param srcEtag ETag of the source directory
* @param abfsClient AbfsClient instance
*/
public RenameAtomicity(final Path src, final Path dst,
final Path renameJsonPath,
TracingContext tracingContext,
final String srcEtag,
final AbfsClient abfsClient) {
this.src = src;
this.dst = dst;
this.abfsClient = (AbfsBlobClient) abfsClient;
this.renameJsonPath = renameJsonPath;
this.tracingContext = tracingContext;
this.srcEtag = srcEtag;
}
/**
* Resumes the rename operation from the JSON file.
*
* @param renameJsonPath Path of the JSON file
* @param renamePendingJsonFileLen Length of the JSON file
* @param tracingContext Tracing context
* @param srcEtag ETag of the source directory
* @param abfsClient AbfsClient instance
*/
public RenameAtomicity(final Path renameJsonPath,
final int renamePendingJsonFileLen,
TracingContext tracingContext,
final String srcEtag,
final AbfsClient abfsClient) {
this.abfsClient = (AbfsBlobClient) abfsClient;
this.renameJsonPath = renameJsonPath;
this.tracingContext = tracingContext;
this.srcEtag = srcEtag;
this.renamePendingJsonLen = renamePendingJsonFileLen;
}
/**
* Redo the rename operation from the JSON file.
*
* @throws AzureBlobFileSystemException If the redo operation fails.
*/
public void redo() throws AzureBlobFileSystemException {
byte[] buffer = readRenamePendingJson(renameJsonPath, renamePendingJsonLen);
String contents = new String(buffer, Charset.defaultCharset());
try {
final RenamePendingJsonFormat renamePendingJsonFormatObj;
try {
renamePendingJsonFormatObj = objectMapper.readValue(contents,
RenamePendingJsonFormat.class);
} catch (JsonProcessingException e) {
return;
}
if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty(
renamePendingJsonFormatObj.getOldFolderName())
&& StringUtils.isNotEmpty(
renamePendingJsonFormatObj.getNewFolderName())
&& StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) {
this.src = new Path(renamePendingJsonFormatObj.getOldFolderName());
this.dst = new Path(renamePendingJsonFormatObj.getNewFolderName());
this.srcEtag = renamePendingJsonFormatObj.getETag();
BlobRenameHandler blobRenameHandler = new BlobRenameHandler(
this.src.toUri().getPath(), dst.toUri().getPath(),
abfsClient, srcEtag, true,
true, tracingContext);
blobRenameHandler.execute(true);
}
} finally {
deleteRenamePendingJson();
}
}
/** Read the JSON file.
*
* @param path Path of the JSON file
* @param len Length of the JSON file
* @return Contents of the JSON file
* @throws AzureBlobFileSystemException If the read operation fails.
*/
@VisibleForTesting
byte[] readRenamePendingJson(Path path, int len)
throws AzureBlobFileSystemException {
byte[] bytes = new byte[len];
abfsClient.read(path.toUri().getPath(), 0, bytes, 0,
len, null, null, null,
tracingContext);
return bytes;
}
/** Generate a random block ID.
*
* @return Random block ID
*/
public static String generateBlockId() {
// PutBlock on the path.
byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
RANDOM.nextBytes(blockIdByteArray);
return new String(Base64.encodeBase64(blockIdByteArray),
StandardCharsets.UTF_8);
}
/** Create the JSON file with the contents.
*
* @param path Path of the JSON file
* @param bytes Contents of the JSON file
* @throws AzureBlobFileSystemException If the create operation fails.
*/
@VisibleForTesting
void createRenamePendingJson(Path path, byte[] bytes)
throws AzureBlobFileSystemException {
// PutBlob on the path.
AbfsRestOperation putBlobOp = abfsClient.createPath(path.toUri().getPath(),
true,
true,
null,
false,
null,
null,
tracingContext);
String eTag = extractEtagHeader(putBlobOp.getResult());
String blockId = generateBlockId();
String blockList = generateBlockListXml(blockId);
byte[] buffer = blockList.getBytes(StandardCharsets.UTF_8);
String computedMd5 = abfsClient.computeMD5Hash(buffer, 0, buffer.length);
AppendRequestParameters appendRequestParameters
= new AppendRequestParameters(0, 0,
bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, null,
abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
new BlobAppendRequestParameters(blockId, eTag), abfsClient.computeMD5Hash(bytes, 0, bytes.length));
abfsClient.append(path.toUri().getPath(), bytes,
appendRequestParameters, null, null, tracingContext);
abfsClient.flush(buffer,
path.toUri().getPath(), true, null, null, eTag, null, tracingContext, computedMd5);
}
/**
* Before starting the atomic rename, create a file with -RenamePending.json
* suffix in the source parent directory. This file contains the states
* required source, destination, and source-eTag for the rename operation.
* <p>
* If the path that is getting renamed is a /sourcePath, then the JSON file
* will be /sourcePath-RenamePending.json.
*
* @return Length of the JSON file.
* @throws AzureBlobFileSystemException If the pre-rename operation fails.
*/
@VisibleForTesting
public int preRename() throws AzureBlobFileSystemException {
String makeRenamePendingFileContents = makeRenamePendingFileContents(
srcEtag);
try {
createRenamePendingJson(renameJsonPath,
makeRenamePendingFileContents.getBytes(StandardCharsets.UTF_8));
return makeRenamePendingFileContents.length();
} catch (AzureBlobFileSystemException e) {
/*
* Scenario: file has been deleted by parallel thread before the RenameJSON
* could be written and flushed. In such case, there has to be one retry of
* preRename.
* ref: https://issues.apache.org/jira/browse/HADOOP-12678
* On DFS endpoint, flush API is called. If file is not there, server returns
* 404.
* On blob endpoint, flush API is not there. PutBlockList is called with
* if-match header. If file is not there, the conditional header will fail,
* the server will return 412.
*/
if (isPreRenameRetriableException(e)) {
preRenameRetryCount++;
if (preRenameRetryCount == 1) {
return preRename();
}
}
throw e;
}
}
/** Check if the exception is retryable for pre-rename operation.
*
* @param e Exception to be checked
* @return true if the exception is retryable, false otherwise
*/
private boolean isPreRenameRetriableException(IOException e) {
AbfsRestOperationException ex;
while (e != null) {
if (e instanceof AbfsRestOperationException) {
ex = (AbfsRestOperationException) e;
return ex.getStatusCode() == HTTP_NOT_FOUND
|| ex.getStatusCode() == HTTP_PRECON_FAILED;
}
e = (IOException) e.getCause();
}
return false;
}
/** Delete the JSON file after rename is done.
* @throws AzureBlobFileSystemException If the delete operation fails.
*/
public void postRename() throws AzureBlobFileSystemException {
deleteRenamePendingJson();
}
/** Delete the JSON file.
*
* @throws AzureBlobFileSystemException If the delete operation fails.
*/
private void deleteRenamePendingJson() throws AzureBlobFileSystemException {
try {
abfsClient.deleteBlobPath(renameJsonPath, null,
tracingContext);
} catch (AzureBlobFileSystemException e) {
if (e instanceof AbfsRestOperationException
&& ((AbfsRestOperationException) e).getStatusCode()
== HTTP_NOT_FOUND) {
return;
}
throw e;
}
}
/**
* Return the contents of the JSON file to represent the operations
* to be performed for a folder rename.
*
* @return JSON string which represents the operation.
*/
private String makeRenamePendingFileContents(String eTag) throws
AzureBlobFileSystemException {
final RenamePendingJsonFormat renamePendingJsonFormat
= new RenamePendingJsonFormat();
renamePendingJsonFormat.setOldFolderName(src.toUri().getPath());
renamePendingJsonFormat.setNewFolderName(dst.toUri().getPath());
renamePendingJsonFormat.setETag(eTag);
try {
return objectMapper.writeValueAsString(renamePendingJsonFormat);
} catch (JsonProcessingException e) {
throw new AbfsDriverException(e);
}
}
}