AbfsClient.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Hashtable;
import java.util.List;
import java.util.Locale;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH_ENCODE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MD5;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_ARCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PLUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PLUS_ENCODE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UTF_8;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType.BLOB;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
/**
* AbfsClient.
*/
public abstract class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON;
public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client";
public static final String FNS_BLOB_USER_AGENT_IDENTIFIER = "FNS";
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion();
private final ExponentialRetryPolicy exponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private String clientProvidedEncryptionKey = null;
private String clientProvidedEncryptionKeySHA = null;
private final IdentityTransformerInterface identityTransformer;
private final String userName;
private String primaryUserGroup;
private final String accountName;
private final AuthType authType;
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private Timer timer;
private final String abfsMetricUrl;
private boolean isMetricCollectionEnabled = false;
private final MetricFormat metricFormat;
private final AtomicBoolean isMetricCollectionStopped;
private final int metricAnalysisPeriod;
private final int metricIdlePeriod;
private EncryptionContextProvider encryptionContextProvider = null;
private EncryptionType encryptionType = EncryptionType.NONE;
private final AbfsThrottlingIntercept intercept;
private final ListeningScheduledExecutorService executorService;
private boolean renameResilience;
private TimerTask runningTimerTask;
private boolean isSendMetricCall;
private SharedKeyCredentials metricSharedkeyCredentials = null;
private KeepAliveCache keepAliveCache;
private AbfsApacheHttpClient abfsApacheHttpClient;
/**
* logging the rename failure if metadata is in an incomplete state.
*/
protected static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG);
private AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
this.renameResilience = abfsConfiguration.getRenameResilience();
if (encryptionContextProvider != null) {
this.encryptionContextProvider = encryptionContextProvider;
// Version update needed to support x-ms-encryption-context header
// @link https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id}
encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
clientProvidedEncryptionKey =
abfsConfiguration.getEncodedClientProvidedEncryptionKey();
this.clientProvidedEncryptionKeySHA =
abfsConfiguration.getEncodedClientProvidedEncryptionKeySHA();
encryptionType = EncryptionType.GLOBAL_KEY;
}
String sslProviderName = null;
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
try {
LOG.trace("Initializing DelegatingSSLSocketFactory with {} SSL "
+ "Channel Mode", this.abfsConfiguration.getPreferredSSLFactoryOption());
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
} catch (IOException e) {
// Suppress exception, failure to init DelegatingSSLSocketFactory would have only performance impact.
LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
+ "{}", e.getMessage());
}
}
if (abfsConfiguration.getPreferredHttpOperationType()
== HttpOperationType.APACHE_HTTP_CLIENT) {
keepAliveCache = new KeepAliveCache(abfsConfiguration);
abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
abfsConfiguration.getHttpReadTimeout(),
keepAliveCache);
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
this.abfsCounters = abfsClientContext.getAbfsCounters();
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
this.executorService = MoreExecutors.listeningDecorator(
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
this.metricFormat = abfsConfiguration.getMetricFormat();
this.isMetricCollectionStopped = new AtomicBoolean(false);
this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
if (StringUtils.isNotEmpty(metricFormat.toString())) {
String metricAccountName = abfsConfiguration.getMetricAccount();
String metricAccountKey = abfsConfiguration.getMetricAccountKey();
if (StringUtils.isNotEmpty(metricAccountName) && StringUtils.isNotEmpty(metricAccountKey)) {
isMetricCollectionEnabled = true;
abfsCounters.initializeMetrics(metricFormat);
int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
throw new InvalidUriException(
metricAccountName + " - account name is not fully qualified.");
}
try {
metricSharedkeyCredentials = new SharedKeyCredentials(
metricAccountName.substring(0, dotIndex),
metricAccountKey);
} catch (IllegalArgumentException e) {
throw new IOException("Exception while initializing metric credentials ", e);
}
}
}
if (isMetricCollectionEnabled) {
this.timer = new Timer(
ABFS_CLIENT_TIMER_THREAD_NAME, true);
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
this.abfsMetricUrl = abfsConfiguration.getMetricUri();
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
abfsConfiguration.getRawConfiguration().getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
try {
this.identityTransformer = identityTransformerClass.getConstructor(
Configuration.class).newInstance(abfsConfiguration.getRawConfiguration());
} catch (IllegalAccessException | InstantiationException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException e) {
LOG.error("IdentityTransformer Init Falied", e);
throw new IOException(e);
}
LOG.trace("IdentityTransformer init complete");
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
this.userName = userGroupInformation.getShortUserName();
LOG.trace("UGI init complete");
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
try {
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
} catch (IOException ex) {
LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
this.primaryUserGroup = userName;
}
} else {
//Provide a default group name
this.primaryUserGroup = userName;
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext);
this.tokenProvider = tokenProvider;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext);
this.sasTokenProvider = sasTokenProvider;
}
@Override
public void close() throws IOException {
if (isMetricCollectionEnabled && runningTimerTask != null) {
runningTimerTask.cancel();
timer.cancel();
}
if (keepAliveCache != null) {
keepAliveCache.close();
}
if (abfsApacheHttpClient != null) {
abfsApacheHttpClient.close();
}
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG,
(Closeable) tokenProvider);
}
HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS);
}
public String getFileSystem() {
return filesystem;
}
protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
ExponentialRetryPolicy getExponentialRetryPolicy() {
return exponentialRetryPolicy;
}
StaticRetryPolicy getStaticRetryPolicy() {
return staticRetryPolicy;
}
/**
* Returns the retry policy to be used for Abfs Rest Operation Failure.
* @param failureReason helps to decide which type of retryPolicy to be used.
* @return retry policy to be used.
*/
public AbfsRetryPolicy getRetryPolicy(final String failureReason) {
return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason)
&& getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()
? getStaticRetryPolicy()
: getExponentialRetryPolicy();
}
SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
SharedKeyCredentials getMetricSharedkeyCredentials() {
return metricSharedkeyCredentials;
}
public void setEncryptionType(EncryptionType encryptionType) {
this.encryptionType = encryptionType;
}
public EncryptionType getEncryptionType() {
return encryptionType;
}
AbfsThrottlingIntercept getIntercept() {
return intercept;
}
/**
* Create request headers for Rest Operation using the current API version.
* @return default request headers
*/
@VisibleForTesting
protected abstract List<AbfsHttpHeader> createDefaultHeaders();
/**
* Create request headers for Rest Operation using the specified API version.
* @param xMsVersion Azure services API version to be used.
* @return default request headers
*/
@VisibleForTesting
public abstract List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion);
/**
* Create request headers common to both service endpoints.
* @param xMsVersion azure services API version to be used.
* @return common request headers
*/
protected List<AbfsHttpHeader> createCommonHeaders(ApiVersion xMsVersion) {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString()));
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, UTF_8));
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, getUserAgent()));
return requestHeaders;
}
/**
* This method adds following headers:
* <ol>
* <li>X_MS_ENCRYPTION_KEY</li>
* <li>X_MS_ENCRYPTION_KEY_SHA256</li>
* <li>X_MS_ENCRYPTION_ALGORITHM</li>
* </ol>
* Above headers have to be added in following operations:
* <ol>
* <li>createPath</li>
* <li>append</li>
* <li>flush</li>
* <li>setPathProperties</li>
* <li>getPathStatus for fs.setXAttr and fs.getXAttr</li>
* <li>read</li>
* </ol>
* @param path path of the file / directory to be created / overwritten.
* @param requestHeaders list of headers to be added to the request.
* @param isCreateFileRequest defines if file or directory has to be created / overwritten.
* @param contextEncryptionAdapter object that contains the encryptionContext and
* encryptionKey created from the developer provided implementation of {@link EncryptionContextProvider}
* @param tracingContext to trace service calls.
* @throws AzureBlobFileSystemException if namespace is not enabled.
*/
protected void addEncryptionKeyRequestHeaders(String path,
List<AbfsHttpHeader> requestHeaders, boolean isCreateFileRequest,
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext)
throws AzureBlobFileSystemException {
String encodedKey, encodedKeySHA256;
switch (encryptionType) {
case GLOBAL_KEY:
encodedKey = clientProvidedEncryptionKey;
encodedKeySHA256 = clientProvidedEncryptionKeySHA;
break;
case ENCRYPTION_CONTEXT:
if (isCreateFileRequest) {
// get new context for create file request
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_CONTEXT,
contextEncryptionAdapter.getEncodedContext()));
}
// else use cached encryption keys from input/output streams
encodedKey = contextEncryptionAdapter.getEncodedKey();
encodedKeySHA256 = contextEncryptionAdapter.getEncodedKeySHA();
break;
default: return; // no client-provided encryption keys
}
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_KEY, encodedKey));
requestHeaders.add(
new AbfsHttpHeader(X_MS_ENCRYPTION_KEY_SHA256, encodedKeySHA256));
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_ALGORITHM,
SERVER_SIDE_ENCRYPTION_ALGORITHM));
}
/**
* Creates a AbfsUriQueryBuilder with default query parameter timeout.
* @return default AbfsUriQueryBuilder.
*/
protected AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
return abfsUriQueryBuilder;
}
/**
* Create a new filesystem using Azure REST API Service.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation createFilesystem(TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Sets user-defined metadata on filesystem.
* @param properties list of metadata key-value pairs.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, String> properties,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* List paths and their properties in the current filesystem.
* @param relativePath to return only blobs within this directory.
* @param recursive to return all blobs in the path, including those in subdirectories.
* @param listMaxResults maximum number of blobs to return.
* @param continuation marker to specify the continuation token.
* @param tracingContext for tracing the server calls.
* @param uri to be used for the path conversion.
* @return {@link ListResponseData}. containing listing response.
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
*/
public abstract ListResponseData listPath(String relativePath, boolean recursive,
int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException;
/**
* Post-processing of the list operation.
* @param relativePath which is used to list the blobs.
* @param fileStatuses list of file statuses to be processed.
* @param tracingContext for tracing the server calls.
* @param uri to be used for the path conversion.
* @return list of file statuses to be returned.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract List<FileStatus> postListProcessing(String relativePath,
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException;
/**
* Retrieves user-defined metadata on filesystem.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
* */
public abstract AbfsRestOperation getFilesystemProperties(TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Deletes the filesystem using Azure REST API Service.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Method for calling createPath API to the backend. Method can be called from:
* <ol>
* <li>create new file</li>
* <li>overwrite file</li>
* <li>create new directory</li>
* </ol>
*
* @param path: path of the file / directory to be created / overwritten.
* @param isFile: defines if file or directory has to be created / overwritten.
* @param overwrite: defines if the file / directory to be overwritten.
* @param permissions: contains permission and umask
* @param isAppendBlob: defines if directory in the path is enabled for appendBlob
* @param eTag: required in case of overwrite of file / directory. Path would be
* overwritten only if the provided eTag is equal to the one present in backend for
* the path.
* @param contextEncryptionAdapter: object that contains the encryptionContext and
* encryptionKey created from the developer provided implementation of
* {@link org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider}
* @param tracingContext: Object of {@link org.apache.hadoop.fs.azurebfs.utils.TracingContext}
* correlating to the current fs.create() request.
* @return object of {@link AbfsRestOperation} which contain all the information
* about the communication with the server. The information is in
* {@link AbfsRestOperation#getResult()}
* @throws AzureBlobFileSystemException throws back the exception it receives from the
* {@link AbfsRestOperation#execute(TracingContext)} method call.
*/
public abstract AbfsRestOperation createPath(String path,
boolean isFile,
boolean overwrite,
Permissions permissions,
boolean isAppendBlob,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Conditionally creates or overwrites a file at the specified relative path.
* This method ensures that the file is created or overwritten based on the provided parameters.
*
* @param relativePath The relative path of the file to be created or overwritten.
* @param statistics The file system statistics to be updated.
* @param permissions The permissions to be set on the file.
* @param isAppendBlob Specifies if the file is an append blob.
* @param contextEncryptionAdapter The encryption context adapter for handling encryption.
* @param tracingContext The tracing context for tracking the operation.
* @return An AbfsRestOperation object containing the result of the operation.
* @throws IOException If an I/O error occurs during the operation.
*/
public abstract AbfsRestOperation conditionalCreateOverwriteFile(String relativePath,
FileSystem.Statistics statistics,
Permissions permissions,
boolean isAppendBlob,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws IOException;
/**
* Performs a pre-check for a createNonRecursivePreCheck operation. Checks if parentPath
* exists or not.
*
* @param parentPath parent path of the file to be created.
* @param tracingContext trace context
*
* @throws IOException if parentPath does not exist or server error.
*/
public abstract void createNonRecursivePreCheck(Path parentPath,
TracingContext tracingContext)
throws IOException;
/**
* Acquire lease on specified path.
* @param path on which lease has to be acquired.
* @param duration for which lease has to be acquired.
* @param eTag required to acquire lease on the path.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation acquireLease(String path,
int duration,
String eTag,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Renew lease on specified path.
* @param path on which lease has to be renewed.
* @param leaseId of the lease to be renewed.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation renewLease(String path, String leaseId,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Release lease on specified path.
* @param path on which lease has to be released.
* @param leaseId of the lease to be released.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation releaseLease(String path, String leaseId,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Break lease on specified path.
* @param path on which lease has to be broke.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation breakLease(String path,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Rename a file or directory.
* If a source etag is passed in, the operation will attempt to recover
* from a missing source file by probing the destination for
* existence and comparing etags.
* The second value in the result will be true to indicate that this
* took place.
* As rename recovery is only attempted if the source etag is non-empty,
* in normal rename operations rename recovery will never happen.
*
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @param isMetadataIncompleteState was there a rename failure due to
* incomplete metadata state?
* @return AbfsClientRenameResult result of rename operation indicating the
* AbfsRest operation, rename recovery and incomplete metadata state failure.
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
*/
public abstract AbfsClientRenameResult renamePath(
String source,
String destination,
String continuation,
TracingContext tracingContext,
String sourceEtag,
boolean isMetadataIncompleteState)
throws IOException;
/**
* Checks if the rest operation results indicate if the path is a directory.
* @param result executed rest operation containing response from server.
* @return True if the path is a directory, False otherwise.
*/
public abstract boolean checkIsDir(AbfsHttpOperation result);
/**
* Creates a rest operation for rename.
* @param url to be used for the operation.
* @param requestHeaders list of headers to be added to the request.
* @return un-executed rest operation.
*/
@VisibleForTesting
AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) {
AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.RenamePath,
HTTP_METHOD_PUT,
url,
requestHeaders);
return op;
}
/**
* Increments AbfsCounters for rename path attempts by 1.
* Will be called each time a rename path operation is attempted.
*/
protected void incrementAbfsRenamePath() {
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
}
/**
* Check if the rename request failure is post a retry and if earlier rename
* request might have succeeded at back-end.
*
* If a source etag was passed in, and the error was 404, get the
* etag of any file at the destination.
* If it matches the source etag, then the rename is considered
* a success.
* Exceptions raised in the probe of the destination are swallowed,
* so that they do not interfere with the original rename failures.
* @param source source path
* @param op Rename request REST operation response with non-null HTTP response
* @param destination rename destination path
* @param sourceEtag etag of source file. may be null or empty
* @param tracingContext Tracks identifiers for request header
* @return true if the file was successfully copied
*/
public boolean renameIdempotencyCheckOp(
final String source,
final String sourceEtag,
final AbfsRestOperation op,
final String destination,
TracingContext tracingContext) {
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
// removing isDir from debug logs as it can be misleading
LOG.debug("rename({}, {}) failure {}; retry={} etag {}",
source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), sourceEtag);
if (!(op.isARetriedRequest()
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) {
// only attempt recovery if the failure was a 404 on a retried rename request.
return false;
}
if (isNotEmpty(sourceEtag)) {
// Server has returned HTTP 404, we have an etag, so see
// if the rename has actually taken place,
LOG.info("rename {} to {} failed, checking etag of destination",
source, destination);
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false, tracingContext, null);
final AbfsHttpOperation result = destStatusOp.getResult();
final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
LOG.info("File rename has taken place: recovery {}",
recovered ? "succeeded" : "failed");
return recovered;
} catch (AzureBlobFileSystemException ex) {
// GetFileStatus on the destination failed, the rename did not take place
// or some other failure. log and swallow.
LOG.debug("Failed to get status of path {}", destination, ex);
}
} else {
LOG.debug("No source etag; unable to probe for the operation's success");
}
return false;
}
/**
* Uploads data to be appended to a file.
* @param path to which data has to be appended.
* @param buffer containing data to be appended.
* @param reqParams containing parameters for append operation like offset, length etc.
* @param cachedSasToken to be used for the authenticating operation.
* @param contextEncryptionAdapter to provide encryption context.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation append(String path, byte[] buffer,
AppendRequestParameters reqParams, String cachedSasToken,
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Returns true if the status code lies in the range of user error.
* @param responseStatusCode http response status code.
* @return True or False.
*/
public abstract boolean checkUserError(int responseStatusCode);
/**
* To check if the failure exception returned by server is due to MD5 Mismatch
* @param e Exception returned by AbfsRestOperation
* @return boolean whether exception is due to MD5Mismatch or not
*/
protected boolean isMd5ChecksumError(final AbfsRestOperationException e) {
AzureServiceErrorCode storageErrorCode = e.getErrorCode();
return storageErrorCode == AzureServiceErrorCode.MD5_MISMATCH;
}
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
// Hence, we pass/succeed the appendblob append call
// in case we are doing a retry after checking the length of the file.
public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
final long length, TracingContext tracingContext)
throws AzureBlobFileSystemException {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) {
final AbfsRestOperation destStatusOp = getPathStatus(path, false, tracingContext, null);
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
String fileLength = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (length <= Long.parseLong(fileLength)) {
LOG.debug("Returning success response from append blob idempotency code");
return true;
}
}
}
return false;
}
/**
* Flush previously uploaded data to a file.
* @param path on which data has to be flushed.
* @param position to which data has to be flushed.
* @param retainUncommittedData whether to retain uncommitted data after flush.
* @param isClose specify if this is the last flush to the file.
* @param cachedSasToken to be used for the authenticating operation.
* @param leaseId if there is an active lease on the path.
* @param contextEncryptionAdapter to provide encryption context.
* @param tracingContext for tracing the server calls.
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation flush(String path, long position,
boolean retainUncommittedData, boolean isClose,
String cachedSasToken, String leaseId,
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext, String blobMd5)
throws AzureBlobFileSystemException;
/**
* Flushes previously uploaded data to the specified path.
*
* @param buffer The buffer containing block IDs to be flushed.
* @param path The file path to which data should be flushed.
* @param isClose True if this is the final flush (i.e., the file is being closed).
* @param cachedSasToken SAS token used for authentication (if applicable).
* @param leaseId Lease ID, if a lease is active on the file.
* @param eTag ETag used for conditional request headers (e.g., If-Match).
* @param contextEncryptionAdapter Adapter to provide encryption context, if encryption is enabled.
* @param tracingContext Context for tracing the server calls.
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation.
* @return The executed {@link AbfsRestOperation} containing the server response.
*
* @throws AzureBlobFileSystemException if the flush operation fails.
*/
public abstract AbfsRestOperation flush(byte[] buffer,
String path,
boolean isClose,
String cachedSasToken,
String leaseId,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException;
/**
* Set the properties of a file or directory.
* @param path on which properties have to be set.
* @param properties list of metadata key-value pairs.
* @param tracingContext for tracing the server calls.
* @param contextEncryptionAdapter to provide encryption context.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation setPathProperties(String path, Hashtable<String, String> properties,
TracingContext tracingContext, ContextEncryptionAdapter contextEncryptionAdapter)
throws AzureBlobFileSystemException;
/**
* Get the properties of a file or directory.
* @param path of which properties have to be fetched.
* @param includeProperties to include user defined properties.
* @param tracingContext for tracing the server calls.
* @param contextEncryptionAdapter to provide encryption context.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation getPathStatus(String path,
boolean includeProperties, TracingContext tracingContext,
ContextEncryptionAdapter contextEncryptionAdapter)
throws AzureBlobFileSystemException;
/**
* Read the contents of the file at specified path.
* @param path of the file to be read.
* @param position in the file from where data has to be read.
* @param buffer to store the data read.
* @param bufferOffset offset in the buffer to start storing the data.
* @param bufferLength length of data to be read.
* @param eTag to specify conditional headers.
* @param cachedSasToken to be used for the authenticating operation.
* @param contextEncryptionAdapter to provide encryption context.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation read(String path,
long position,
byte[] buffer,
int bufferOffset,
int bufferLength,
String eTag,
String cachedSasToken,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Delete the file or directory at specified path.
* @param path to be deleted.
* @param recursive if the path is a directory, delete recursively.
* @param continuation to specify continuation token.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation deletePath(String path, boolean recursive,
String continuation,
TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Check if the delete request failure is post a retry and if delete failure
* qualifies to be a success response assuming idempotency.
*
* There are below scenarios where delete could be incorrectly deducted as
* success post request retry:
* 1. Target was originally not existing and initial delete request had to be
* re-tried.
* 2. Parallel delete issued from any other store interface rather than
* delete issued from this filesystem instance.
* These are few corner cases and usually returning a success at this stage
* should help the job to continue.
* @param op Delete request REST operation response with non-null HTTP response.
* @return REST operation response post idempotency check.
*/
public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) {
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
// Server has returned HTTP 404, which means path no longer
// exists. Assuming delete result to be idempotent, return success.
LOG.debug("Returning success response from delete idempotency logic");
return getSuccessOp(
AbfsRestOperationType.DeletePath,
HTTP_METHOD_DELETE,
op.getUrl(),
op.getRequestHeaders());
}
return op;
}
/**
* Sets the owner on tha path.
* @param path on which owner has to be set.
* @param owner to be set.
* @param group to be set.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation setOwner(String path, String owner, String group,
TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Sets the permission on the path.
* @param path on which permission has to be set.
* @param permission to be set.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation setPermission(String path, String permission,
TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Sets the ACL.
* @param path on which ACL has to be set.
* @param aclSpecString to be set.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public AbfsRestOperation setAcl(final String path, final String aclSpecString,
TracingContext tracingContext) throws AzureBlobFileSystemException {
return setAcl(path, aclSpecString, EMPTY_STRING, tracingContext);
}
/**
* Sets the ACL on the path that matches ETag.
* @param path on which ACL has to be set.
* @param aclSpecString to be set.
* @param eTag to specify conditional headers. Set only if etag matches.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation setAcl(String path, String aclSpecString, String eTag,
TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Retrieves the ACL properties of blob at specified path.
* @param path of which properties have to be fetched.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public AbfsRestOperation getAclStatus(final String path, TracingContext tracingContext)
throws AzureBlobFileSystemException {
return getAclStatus(path, abfsConfiguration.isUpnUsed(), tracingContext);
}
/**
* Retrieves the ACL properties of blob at specified path.
* @param path of which properties have to be fetched.
* @param useUPN whether to use UPN with rest operation.
* @param tracingContext for tracing the server calls.
* @return executed rest operation containing response from server.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract AbfsRestOperation getAclStatus(String path, boolean useUPN,
TracingContext tracingContext) throws AzureBlobFileSystemException;
/**
* Talks to the server to check whether the permission specified in
* the rwx parameter is present for the path specified in the path parameter.
*
* @param path Path for which access check needs to be performed
* @param rwx The permission to be checked on the path
* @param tracingContext Tracks identifiers for request header
* @return The {@link AbfsRestOperation} object for the operation
* @throws AzureBlobFileSystemException in case of bad requests
*/
public abstract AbfsRestOperation checkAccess(String path, String rwx, TracingContext tracingContext)
throws AzureBlobFileSystemException;
/**
* Get the directory query parameter used by the List Paths REST API and used
* as the path in the continuation token. If the input path is null or the
* root path "/", empty string is returned. If the input path begins with '/',
* the return value is the substring beginning at offset 1. Otherwise, the
* input path is returned.
* @param path the path to be listed.
* @return the value of the directory query parameter
*/
public static String getDirectoryQueryParameter(final String path) {
String directory = path;
if (Strings.isNullOrEmpty(directory)) {
directory = EMPTY_STRING;
} else if (directory.charAt(0) == '/') {
directory = directory.substring(1);
}
return directory;
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder.
* @param path for which SAS token is required.
* @param operation for which SAS token is required.
* @param queryBuilder to which SAS token is appended.
* @return sasToken - returned for optional re-use.
* @throws SASTokenProviderException if SAS token cannot be acquired.
*/
protected String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
return appendSASTokenToQuery(path, operation, queryBuilder, null);
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder.
* @param path for which SAS token is required.
* @param operation for which SAS token is required.
* @param queryBuilder to which SAS token is appended.
* @param cachedSasToken - previously acquired SAS token to be reused.
* @return sasToken - returned for optional re-use.
* @throws SASTokenProviderException if SAS token cannot be acquired.
*/
protected String appendSASTokenToQuery(String path,
String operation,
AbfsUriQueryBuilder queryBuilder,
String cachedSasToken)
throws SASTokenProviderException {
String sasToken = null;
if (this.authType == AuthType.SAS) {
try {
LOG.trace("Fetch SAS token for {} on {}", operation, path);
if (cachedSasToken == null) {
sasToken = sasTokenProvider.getSASToken(this.accountName,
this.filesystem, path, operation);
if ((sasToken == null) || sasToken.isEmpty()) {
throw new UnsupportedOperationException("SASToken received is empty or null");
}
} else {
sasToken = cachedSasToken;
LOG.trace("Using cached SAS token.");
}
// if SAS Token contains a prefix of ?, it should be removed
if (sasToken.charAt(0) == '?') {
sasToken = sasToken.substring(1);
}
queryBuilder.setSASToken(sasToken);
LOG.trace("SAS token fetch complete for {} on {}", operation, path);
} catch (Exception ex) {
throw new SASTokenProviderException(String.format(
"Failed to acquire a SAS token for %s on %s due to %s", operation, path,
ex.toString()));
}
}
return sasToken;
}
/**
* Creates REST operation URL with empty path for the given query.
* @param query to be added to the URL.
* @return URL for the REST operation.
* @throws AzureBlobFileSystemException if URL creation fails.
*/
@VisibleForTesting
protected URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
return createRequestUrl(EMPTY_STRING, query);
}
/**
* Creates REST operation URL with given path and query.
* @param path for which URL has to be created.
* @param query to be added to the URL.
* @return URL for the REST operation.
* @throws AzureBlobFileSystemException if URL creation fails.
*/
@VisibleForTesting
protected URL createRequestUrl(final String path, final String query)
throws AzureBlobFileSystemException {
return createRequestUrl(baseUrl, path, query);
}
/**
* Creates REST operation URL with given baseUrl, path and query.
* @param baseUrl to be used for the operation.
* @param path for which URL has to be created.
* @param query to be added to the URL.
* @return URL for the REST operation.
* @throws AzureBlobFileSystemException if URL creation fails.
*/
@VisibleForTesting
protected URL createRequestUrl(final URL baseUrl, final String path, final String query)
throws AzureBlobFileSystemException {
String encodedPath = path;
try {
encodedPath = urlEncode(path);
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Unexpected error.", ex);
throw new InvalidUriException(path);
}
final StringBuilder sb = new StringBuilder();
if (baseUrl == null) {
throw new InvalidUriException("URL provided is null");
}
sb.append(baseUrl.toString());
sb.append(encodedPath);
sb.append(query);
final URL url;
try {
url = new URL(sb.toString());
} catch (MalformedURLException ex) {
throw new InvalidUriException("URL is malformed" + sb.toString());
}
return url;
}
/**
* returns the url encoded string for a given value.
* @param value to be encoded.
* @return url encoded string.
* @throws AzureBlobFileSystemException if encoding fails.
*/
public static String urlEncode(final String value) throws AzureBlobFileSystemException {
String encodedString;
try {
encodedString = URLEncoder.encode(value, UTF_8)
.replace(PLUS, PLUS_ENCODE)
.replace(FORWARD_SLASH_ENCODE, FORWARD_SLASH);
} catch (UnsupportedEncodingException ex) {
throw new InvalidUriException(value);
}
return encodedString;
}
public synchronized String getAccessToken() throws IOException {
if (tokenProvider != null) {
return "Bearer " + tokenProvider.getToken().getAccessToken();
} else {
return null;
}
}
protected Boolean getIsPaginatedDeleteEnabled() {
return abfsConfiguration.isPaginatedDeleteEnabled();
}
protected Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) {
return getIsPaginatedDeleteEnabled() && isNamespaceEnabled && isRecursiveDelete;
}
public AuthType getAuthType() {
return authType;
}
public EncryptionContextProvider getEncryptionContextProvider() {
return encryptionContextProvider;
}
@VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
final String sslProviderName) {
StringBuilder sb = new StringBuilder();
sb.append(APN_VERSION);
sb.append(SINGLE_WHITE_SPACE);
sb.append(CLIENT_VERSION);
sb.append(SINGLE_WHITE_SPACE);
sb.append("(");
sb.append(System.getProperty(JAVA_VENDOR)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
sb.append(SINGLE_WHITE_SPACE);
sb.append("JavaJRE");
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(JAVA_VERSION));
sb.append(SEMICOLON);
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(OS_NAME)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(OS_VERSION));
sb.append(FORWARD_SLASH);
sb.append(System.getProperty(OS_ARCH));
sb.append(SEMICOLON);
appendIfNotEmpty(sb, sslProviderName, true);
appendIfNotEmpty(sb,
ExtensionHelper.getUserAgentSuffix(tokenProvider, EMPTY_STRING), true);
if (abfsConfiguration.isExpectHeaderEnabled()) {
sb.append(SINGLE_WHITE_SPACE);
sb.append(HUNDRED_CONTINUE);
sb.append(SEMICOLON);
}
sb.append(SINGLE_WHITE_SPACE)
.append(abfsConfiguration.getPreferredHttpOperationType())
.append(SEMICOLON);
sb.append(SINGLE_WHITE_SPACE);
sb.append(abfsConfiguration.getClusterName());
sb.append(FORWARD_SLASH);
sb.append(abfsConfiguration.getClusterType());
// Add a unique identifier in FNS-Blob user agent string
// Current filesystem init restricts HNS-Blob combination
// so namespace check not required.
if (abfsConfiguration.getFsConfiguredServiceType() == BLOB) {
sb.append(SEMICOLON)
.append(SINGLE_WHITE_SPACE)
.append(FNS_BLOB_USER_AGENT_IDENTIFIER);
}
sb.append(")");
appendIfNotEmpty(sb, abfsConfiguration.getCustomUserAgentPrefix(), false);
return String.format(Locale.ROOT, sb.toString());
}
private void appendIfNotEmpty(StringBuilder sb, String regEx,
boolean shouldAppendSemiColon) {
if (regEx == null || regEx.trim().isEmpty()) {
return;
}
sb.append(SINGLE_WHITE_SPACE);
sb.append(regEx);
if (shouldAppendSemiColon) {
sb.append(SEMICOLON);
}
}
/**
* Add MD5 hash as request header to the append request.
*
* @param requestHeaders to be updated with checksum header
* @param reqParams for getting offset and length
*/
protected void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
final AppendRequestParameters reqParams) {
if (reqParams.getMd5() != null) {
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, reqParams.getMd5()));
}
}
/**
* To verify the checksum information received from server for the data read.
* @param buffer stores the data received from server.
* @param result HTTP Operation Result.
* @param bufferOffset Position where data returned by server is saved in buffer.
* @throws AbfsRestOperationException if Md5Mismatch.
*/
protected void verifyCheckSumForRead(final byte[] buffer,
final AbfsHttpOperation result, final int bufferOffset)
throws AbfsRestOperationException {
// Number of bytes returned by server could be less than or equal to what
// caller requests. In case it is less, extra bytes will be initialized to 0
// Server returned MD5 Hash will be computed on what server returned.
// We need to get exact data that server returned and compute its md5 hash
// Computed hash should be equal to what server returned.
int numberOfBytesRead = (int) result.getBytesReceived();
if (numberOfBytesRead == 0) {
return;
}
String md5HashComputed = computeMD5Hash(buffer, bufferOffset,
numberOfBytesRead);
String md5HashActual = result.getResponseHeader(CONTENT_MD5);
if (!md5HashComputed.equals(md5HashActual)) {
LOG.debug("Md5 Mismatch Error in Read Operation. Server returned Md5: {}, Client computed Md5: {}", md5HashActual, md5HashComputed);
throw new AbfsInvalidChecksumException(result.getRequestId());
}
}
/**
* Conditions check for allowing checksum support for read operation.
* Sending MD5 Hash in request headers. For more details refer to
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read">Path - Read Azure Storage Rest API</a>.
* 1. Range header must be present as one of the request headers.
* 2. buffer length must be less than or equal to 4 MB.
* @param requestHeaders to be checked for range header.
* @param rangeHeader must be present.
* @param bufferLength must be less than or equal to 4 MB.
* @return true if all conditions are met.
*/
protected boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeaders,
final AbfsHttpHeader rangeHeader, final int bufferLength) {
return getAbfsConfiguration().getIsChecksumValidationEnabled()
&& requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB;
}
/**
* Conditions check for allowing checksum support for write operation.
* Server will support this if client sends the MD5 Hash as a request header.
* For azure stoage service documentation and more details refer to
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
* @return true if checksum validation enabled.
*/
protected boolean isChecksumValidationEnabled() {
return getAbfsConfiguration().getIsChecksumValidationEnabled();
}
/**
* Compute MD5Hash of the given byte array starting from given offset up to given length.
* @param data byte array from which data is fetched to compute MD5 Hash.
* @param off offset in the array from where actual data starts.
* @param len length of the data to be used to compute MD5Hash.
* @return MD5 Hash of the data as String.
* @throws AbfsRestOperationException if computation fails.
*/
@VisibleForTesting
public String computeMD5Hash(final byte[] data, final int off, final int len)
throws AbfsRestOperationException {
try {
MessageDigest md5Digest = MessageDigest.getInstance(MD5);
md5Digest.update(data, off, len);
byte[] md5Bytes = md5Digest.digest();
return Base64.getEncoder().encodeToString(md5Bytes);
} catch (NoSuchAlgorithmException ex) {
throw new AbfsDriverException(ex);
}
}
@VisibleForTesting
URL getBaseUrl() {
return baseUrl;
}
@VisibleForTesting
public SASTokenProvider getSasTokenProvider() {
return this.sasTokenProvider;
}
@VisibleForTesting
void setEncryptionContextProvider(EncryptionContextProvider provider) {
encryptionContextProvider = provider;
}
/**
* Getter for abfsCounters from AbfsClient.
* @return AbfsCounters instance.
*/
public AbfsCounters getAbfsCounters() {
return abfsCounters;
}
public ApiVersion getxMsVersion() {
return xMsVersion;
}
/**
* Getter for abfsConfiguration from AbfsClient.
* @return AbfsConfiguration instance
*/
protected AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}
public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit timeUnit) {
return executorService.schedule(callable, delay, timeUnit);
}
public ListenableFuture<?> submit(Runnable runnable) {
return executorService.submit(runnable);
}
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
Futures.addCallback(future, callback, executorService);
}
@VisibleForTesting
protected AccessTokenProvider getTokenProvider() {
return tokenProvider;
}
/**
* Retrieves a TracingContext object configured for metric tracking.
* This method creates a TracingContext object with the validated client correlation ID,
* the host name of the local machine (or "UnknownHost" if unable to determine),
* the file system operation type set to GET_ATTR, and additional configuration parameters
* for metric tracking.
* The TracingContext is intended for use in tracking metrics related to Azure Blob FileSystem (ABFS) operations.
*
* @return A TracingContext object configured for metric tracking.
*/
private TracingContext getMetricTracingContext() {
String hostName;
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostName = "UnknownHost";
}
return new TracingContext(TracingContext.validateClientCorrelationID(
abfsConfiguration.getClientCorrelationId()),
hostName, FSOperationType.GET_ATTR, true,
abfsConfiguration.getTracingHeaderFormat(),
null, abfsCounters.toString());
}
/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isMetricCollectionEnabled && isMetricCollectionStopped.get()) {
synchronized (this) {
if (isMetricCollectionStopped.get()) {
resumeTimer();
}
}
}
break;
case SUSPEND:
long now = System.currentTimeMillis();
long lastExecutionTime = abfsCounters.getLastExecutionTime().get();
if (isMetricCollectionEnabled && (now - lastExecutionTime >= metricAnalysisPeriod)) {
synchronized (this) {
if (!isMetricCollectionStopped.get()) {
timerTask.cancel();
timer.purge();
isMetricCollectionStopped.set(true);
return true;
}
}
}
break;
default:
break;
}
return false;
}
private void resumeTimer() {
isMetricCollectionStopped.set(false);
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
/**
* Initiates a metric call to the Azure Blob FileSystem (ABFS) for retrieving file system properties.
* This method performs a HEAD request to the specified metric URL, using default headers and query parameters.
*
* @param tracingContext The tracing context to be used for capturing tracing information.
* @throws IOException throws IOException.
*/
public void getMetricCall(TracingContext tracingContext) throws IOException {
this.isSendMetricCall = true;
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
// Construct the URL for the metric call
// In case of blob storage, the URL is changed to DFS URL
final URL url = UriUtils.changeUrlFromBlobToDfs(
createRequestUrl(new URL(abfsMetricUrl),
EMPTY_STRING, abfsUriQueryBuilder.toString()));
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
HTTP_METHOD_HEAD,
url,
requestHeaders);
try {
op.execute(tracingContext);
} finally {
this.isSendMetricCall = false;
}
}
public boolean isSendMetricCall() {
return isSendMetricCall;
}
public boolean isMetricCollectionEnabled() {
return isMetricCollectionEnabled;
}
class TimerTaskImpl extends TimerTask {
TimerTaskImpl() {
runningTimerTask = this;
}
@Override
public void run() {
try {
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
try {
getMetricCall(getMetricTracingContext());
} finally {
abfsCounters.initializeMetrics(metricFormat);
}
}
} catch (IOException e) {
}
}
}
/**
* Creates an AbfsRestOperation with additional parameters for buffer and SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @param buffer The byte buffer containing data for the operation.
* @param bufferOffset The offset within the buffer where the data starts.
* @param bufferLength The length of the data within the buffer.
* @param sasTokenForReuse The SAS token for reusing authentication.
* @return An AbfsRestOperation instance.
*/
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final byte[] buffer,
final int bufferOffset,
final int bufferLength,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength,
sasTokenForReuse,
abfsConfiguration);
}
/**
* Creates an AbfsRestOperation with basic parameters and no buffer or SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @return An AbfsRestOperation instance.
*/
@VisibleForTesting
public AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
abfsConfiguration
);
}
/**
* Creates an AbfsRestOperation with parameters including request headers and SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @param sasTokenForReuse The SAS token for reusing authentication.
* @return An AbfsRestOperation instance.
*/
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders, sasTokenForReuse, abfsConfiguration);
}
@VisibleForTesting
AbfsApacheHttpClient getAbfsApacheHttpClient() {
return abfsApacheHttpClient;
}
@VisibleForTesting
KeepAliveCache getKeepAliveCache() {
return keepAliveCache;
}
@VisibleForTesting
protected Timer getTimer() {
return timer;
}
protected String getUserAgent() {
return userAgent;
}
/**
* Checks if the namespace is enabled.
* Filesystem init will fail if namespace is not correctly configured,
* so instead of swallowing the exception, we should throw the exception
* in case namespace is not configured correctly.
*
* @return True if the namespace is enabled, false otherwise.
* @throws AzureBlobFileSystemException if the conversion fails.
*/
public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
try {
return getAbfsConfiguration().getIsNamespaceEnabledAccount().toBoolean();
} catch (TrileanConversionException ex) {
LOG.error("Failed to convert namespace enabled account property to boolean", ex);
throw new InvalidConfigurationValueException("Failed to determine account type", ex);
}
}
protected boolean isRenameResilience() {
return renameResilience;
}
/**
* Parses response of Listing API from server based on Endpoint used.
* @param result AbfsHttpOperation of list Operation.
* @param uri to be used for the path conversion.
* @return {@link ListResponseData} containing the list of entries.
* @throws IOException if parsing fails
*/
public abstract ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) throws IOException;
/**
* Parses response of Get Block List from server based on Endpoint used.
* @param stream InputStream of the response
* @return List of block IDs
* @throws IOException if parsing fails
*/
public abstract List<String> parseBlockListResponse(InputStream stream) throws IOException;
/**
* Parses response from ErrorStream returned by server based on Endpoint used.
* @param stream InputStream of the response
* @return StorageErrorResponseSchema
* @throws IOException if parsing fails
*/
public abstract StorageErrorResponseSchema processStorageErrorResponse(InputStream stream) throws IOException;
/**
* Returns user-defined metadata from server response based on Endpoint used.
* @param result response from server
* @return user-defined metadata key-value pairs
* @throws InvalidFileSystemPropertyException if parsing fails
* @throws InvalidAbfsRestOperationException if parsing fails
*/
public abstract Hashtable<String, String> getXMSProperties(AbfsHttpOperation result)
throws InvalidFileSystemPropertyException,
InvalidAbfsRestOperationException;
/**
* Encode attribute with encoding based on Endpoint used.
* @param value to be encoded
* @return encoded value
* @throws UnsupportedEncodingException if encoding fails
*/
public abstract byte[] encodeAttribute(String value) throws UnsupportedEncodingException;
/**
* Decode attribute with decoding based on Endpoint used.
* @param value to be decoded
* @return decoded value
* @throws UnsupportedEncodingException if decoding fails
*/
public abstract String decodeAttribute(byte[] value) throws UnsupportedEncodingException;
/**
* Get the dummy success operation.
* @param operationType type of the operation
* @param httpMethod http method
* @param url url to be used
* @param requestHeaders list of headers to be sent with the request
* @return success operation
*/
protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationType,
final String httpMethod, final URL url,
final List<AbfsHttpHeader> requestHeaders) {
final AbfsRestOperation successOp = getAbfsRestOperation(
operationType, httpMethod, url, requestHeaders);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
/**
* Creates a VersionedFileStatus object from the ListResultEntrySchema.
* @param entry ListResultEntrySchema object.
* @param uri to be used for the path conversion.
* @return VersionedFileStatus object.
* @throws AzureBlobFileSystemException if transformation fails.
*/
protected VersionedFileStatus getVersionedFileStatusFromEntry(
ListResultEntrySchema entry, URI uri) throws AzureBlobFileSystemException {
long blockSize = abfsConfiguration.getAzureBlockSize();
String owner = null, group = null;
try{
if (identityTransformer != null) {
owner = identityTransformer.transformIdentityForGetRequest(entry.owner(),
true, userName);
group = identityTransformer.transformIdentityForGetRequest(entry.group(),
false, primaryUserGroup);
}
} catch (IOException ex) {
LOG.error("Failed to get owner/group for path {}", entry.name(), ex);
throw new AbfsDriverException(ex);
}
final String encryptionContext = entry.getXMsEncryptionContext();
final FsPermission fsPermission = entry.permissions() == null
? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(entry.permissions());
final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
long lastModifiedMillis = 0;
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
boolean isDirectory = entry.isDirectory() != null && entry.isDirectory();
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
entry.lastModified());
}
Path entryPath = new Path(File.separator + entry.name());
if (uri != null) {
entryPath = entryPath.makeQualified(uri, entryPath);
}
return new VersionedFileStatus(
owner,
group,
fsPermission,
hasAcl,
contentLength,
isDirectory,
1,
blockSize,
lastModifiedMillis,
entryPath,
entry.eTag(),
encryptionContext);
}
}