AzureBlobFileSystemStore.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet;
/**
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
private AbfsClient client;
/**
* Variable to hold the client handler which will determine the operative
* client based on the service type configured.
* Initialized in the {@link #initializeClient(URI, String, String, boolean)}.
*/
private AbfsClientHandler clientHandler;
private URI uri;
private String userName;
private String primaryUserGroup;
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
private final Map<AbfsLease, Object> leaseRefs;
private final AbfsConfiguration abfsConfiguration;
private Set<String> azureInfiniteLeaseDirSet;
private final AuthType authType;
private final UserGroupInformation userGroupInformation;
private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker;
private final AbfsCounters abfsCounters;
/**
* The set of directories where we should store files as append blobs.
*/
private Set<String> appendBlobDirSet;
/** BlockFactory being used by this instance.*/
private DataBlocks.BlockFactory blockFactory;
/** Number of active data blocks per AbfsOutputStream */
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
/**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
* Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
* required.
* @param abfsStoreBuilder Builder for AzureBlobFileSystemStore.
* @throws IOException Throw IOE in case of failure during constructing.
*/
public AzureBlobFileSystemStore(
AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException {
this.uri = abfsStoreBuilder.uri;
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
this.fsBackRef = abfsStoreBuilder.fsBackRef;
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration,
accountName, fileSystemName, getAbfsServiceTypeFromUrl());
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
LOG.trace("AbfsConfiguration init complete");
this.userGroupInformation = UserGroupInformation.getCurrentUser();
this.userName = userGroupInformation.getShortUserName();
LOG.trace("UGI init complete");
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
try {
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
} catch (IOException ex) {
LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
this.primaryUserGroup = userName;
}
} else {
//Provide a default group name
this.primaryUserGroup = userName;
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
this.abfsCounters = abfsStoreBuilder.abfsCounters;
initializeClient(uri, fileSystemName, accountName, useHttps);
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
try {
this.identityTransformer =
identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration);
} catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) {
throw new IOException(e);
}
LOG.trace("IdentityTransformer init complete");
// Extract the directories that should contain append blobs
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
if (appendBlobDirs.trim().isEmpty()) {
this.appendBlobDirSet = new HashSet<String>();
} else {
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
}
/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
* @param key The key to check.
* @return True if the key should be stored as a page blob, false otherwise.
*/
public boolean isAppendBlobKey(String key) {
return isKeyForDirectorySet(key, appendBlobDirSet);
}
/**
* @return local user name.
* */
public String getUser() {
return this.userName;
}
/**
* @return primary group that user belongs to.
* */
public String getPrimaryGroup() {
return this.primaryUserGroup;
}
@Override
public void close() throws IOException {
List<ListenableFuture<?>> futures = new ArrayList<>();
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease == null) {
continue;
}
ListenableFuture<?> future = getClient().submit(() -> lease.free());
futures.add(future);
}
try {
Futures.allAsList(futures).get();
// shutdown the threadPool and set it to null.
HadoopExecutors.shutdown(boundedThreadPool, LOG,
30, TimeUnit.SECONDS);
boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, getClient());
}
}
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
return getClient().encodeAttribute(value);
}
String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
// DFS Client works with ISO_8859_1 encoding, Blob Works with UTF-8.
return getClient().decodeAttribute(value);
}
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
final String authority = uri.getRawAuthority();
if (null == authority) {
throw new InvalidUriAuthorityException(uri.toString());
}
if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
throw new InvalidUriAuthorityException(uri.toString());
}
final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
if (authorityParts.length < 2 || authorityParts[0] != null
&& authorityParts[0].isEmpty()) {
final String errMsg = String
.format("'%s' has a malformed authority, expected container name. "
+ "Authority takes the form "
+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
uri.toString());
throw new InvalidUriException(errMsg);
}
return authorityParts;
}
/**
* Resolves namespace information of the filesystem from the state of {@link #isNamespaceEnabled()}.
* if the state is UNKNOWN, it will be determined by making a GET_ACL request
* to the root of the filesystem. GET_ACL call is synchronized to ensure a single
* call is made to determine the namespace information in case multiple threads are
* calling this method at the same time. The resolution of namespace information
* would be stored back as {@link #setNamespaceEnabled(boolean)}.
*
* @param tracingContext tracing context
* @return true if namespace is enabled, false otherwise.
* @throws AzureBlobFileSystemException server errors.
*/
public boolean getIsNamespaceEnabled(TracingContext tracingContext)
throws AzureBlobFileSystemException {
try {
return isNamespaceEnabled();
} catch (TrileanConversionException e) {
LOG.debug("isNamespaceEnabled is UNKNOWN; fall back and determine through"
+ " getAcl server call", e);
}
return getNamespaceEnabledInformationFromServer(tracingContext);
}
/**
* In case the namespace configuration is not set or invalid, this method will
* make a call to the server to determine if namespace is enabled or not.
* This method is synchronized to ensure that only one thread
* is making the call to the server to determine the namespace
*
* @param tracingContext tracing context
* @return true if namespace is enabled, false otherwise.
* @throws AzureBlobFileSystemException server errors.
*/
private synchronized boolean getNamespaceEnabledInformationFromServer(
final TracingContext tracingContext) throws AzureBlobFileSystemException {
if (getAbfsConfiguration().getIsNamespaceEnabledAccount() != Trilean.UNKNOWN) {
return isNamespaceEnabled();
}
try {
LOG.debug("Get root ACL status");
getClient(AbfsServiceType.DFS).getAclStatus(AbfsHttpConstants.ROOT_PATH, tracingContext);
// If getAcl succeeds, namespace is enabled.
setNamespaceEnabled(true);
} catch (AbfsRestOperationException ex) {
// Get ACL status is a HEAD request, its response doesn't contain errorCode
// So can only rely on its status code to determine account type.
if (HttpURLConnection.HTTP_BAD_REQUEST != ex.getStatusCode()) {
// If getAcl fails with anything other than 400, namespace is enabled.
setNamespaceEnabled(true);
// Continue to throw exception as earlier.
LOG.debug("Failed to get ACL status with non 400. Inferring namespace enabled", ex);
throw ex;
}
// If getAcl fails with 400, namespace is disabled.
LOG.debug("Failed to get ACL status with 400. "
+ "Inferring namespace disabled and ignoring error", ex);
setNamespaceEnabled(false);
} catch (AzureBlobFileSystemException ex) {
throw ex;
}
return isNamespaceEnabled();
}
/**
* @return true if namespace is enabled, false otherwise.
* @throws TrileanConversionException if namespaceEnabled information is UNKNOWN
*/
@VisibleForTesting
boolean isNamespaceEnabled() throws TrileanConversionException {
return getAbfsConfiguration().getIsNamespaceEnabledAccount().toBoolean();
}
@VisibleForTesting
URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
final URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setScheme(scheme);
// For testing purposes, an IP address and port may be provided to override
// the host specified in the FileSystem URI. Also note that the format of
// the Azure Storage Service URI changes from
// http[s]://[account][domain-suffix]/[filesystem] to
// http[s]://[ip]:[port]/[account]/[filesystem].
String endPoint = abfsConfiguration.get(AZURE_ABFS_ENDPOINT);
if (endPoint == null || !endPoint.contains(AbfsHttpConstants.COLON)) {
uriBuilder.setHost(hostName);
return uriBuilder;
}
// Split ip and port
String[] data = endPoint.split(AbfsHttpConstants.COLON);
if (data.length != 2) {
throw new RuntimeException(String.format("ABFS endpoint is not set correctly : %s, "
+ "Do not specify scheme when using {IP}:{PORT}", endPoint));
}
uriBuilder.setHost(data[0].trim());
uriBuilder.setPort(Integer.parseInt(data[1].trim()));
uriBuilder.setPath("/" + UriUtils.extractAccountNameFromHostName(hostName));
return uriBuilder;
}
public AbfsConfiguration getAbfsConfiguration() {
return this.abfsConfiguration;
}
public Hashtable<String, String> getFilesystemProperties(
TracingContext tracingContext) throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("getFilesystemProperties",
"getFilesystemProperties")) {
LOG.debug("getFilesystemProperties for filesystem: {}",
getClient().getFileSystem());
final Hashtable<String, String> parsedXmsProperties;
final AbfsRestOperation op = getClient()
.getFilesystemProperties(tracingContext);
perfInfo.registerResult(op.getResult());
// Handling difference in request headers formats between DFS and Blob Clients.
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);
return parsedXmsProperties;
}
}
public void setFilesystemProperties(
final Hashtable<String, String> properties, TracingContext tracingContext)
throws AzureBlobFileSystemException {
if (properties == null || properties.isEmpty()) {
LOG.trace("setFilesystemProperties no properties present");
return;
}
LOG.debug("setFilesystemProperties for filesystem: {} with properties: {}",
getClient().getFileSystem(),
properties);
try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties",
"setFilesystemProperties")) {
final AbfsRestOperation op = getClient()
.setFilesystemProperties(properties, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public Hashtable<String, String> getPathStatus(final Path path,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("getPathStatus", "getPathStatus")){
LOG.debug("getPathStatus for filesystem: {} path: {}",
getClient().getFileSystem(),
path);
final Hashtable<String, String> parsedXmsProperties;
final String relativePath = getRelativePath(path);
final ContextEncryptionAdapter contextEncryptionAdapter
= createEncryptionAdapterFromServerStoreContext(relativePath,
tracingContext);
final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, true, tracingContext,
contextEncryptionAdapter);
perfInfo.registerResult(op.getResult());
contextEncryptionAdapter.destroy();
// Handling difference in request headers formats between DFS and Blob Clients.
parsedXmsProperties = getClient().getXMSProperties(op.getResult());
perfInfo.registerSuccess(true);
return parsedXmsProperties;
}
}
/**
* Creates an object of {@link ContextEncryptionAdapter}
* from a file path. It calls {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient
* #getPathStatus(String, boolean, TracingContext, EncryptionAdapter)} method to get
* contextValue (x-ms-encryption-context) from the server. The contextValue is passed
* to the constructor of EncryptionAdapter to create the required object of
* EncryptionAdapter.
* @param path Path of the file for which the object of EncryptionAdapter is required.
* @return <ul>
* <li>
* {@link NoContextEncryptionAdapter}: if encryptionType is not of type
* {@link org.apache.hadoop.fs.azurebfs.utils.EncryptionType#ENCRYPTION_CONTEXT}.
* </li>
* <li>
* new object of {@link ContextProviderEncryptionAdapter} containing required encryptionKeys for the give file:
* if encryptionType is of type {@link org.apache.hadoop.fs.azurebfs.utils.EncryptionType#ENCRYPTION_CONTEXT}.
* </li>
* </ul>
*/
private ContextEncryptionAdapter createEncryptionAdapterFromServerStoreContext(final String path,
final TracingContext tracingContext) throws IOException {
if (getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT) {
return NoContextEncryptionAdapter.getInstance();
}
final String responseHeaderEncryptionContext = getClient().getPathStatus(path,
false, tracingContext, null).getResult()
.getResponseHeader(X_MS_ENCRYPTION_CONTEXT);
if (responseHeaderEncryptionContext == null) {
throw new PathIOException(path,
"EncryptionContext not present in GetPathStatus response");
}
byte[] encryptionContext = responseHeaderEncryptionContext.getBytes(
StandardCharsets.UTF_8);
try {
return new ContextProviderEncryptionAdapter(getClient().getEncryptionContextProvider(),
new Path(path).toUri().getPath(), encryptionContext);
} catch (IOException e) {
LOG.debug("Could not initialize EncryptionAdapter");
throw e;
}
}
public void setPathProperties(final Path path,
final Hashtable<String, String> properties, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){
LOG.debug("setPathProperties for filesystem: {} path: {} with properties: {}",
getClient().getFileSystem(),
path,
properties);
final String relativePath = getRelativePath(path);
final ContextEncryptionAdapter contextEncryptionAdapter
= createEncryptionAdapterFromServerStoreContext(relativePath,
tracingContext);
final AbfsRestOperation op = getClient()
.setPathProperties(getRelativePath(path), properties,
tracingContext, contextEncryptionAdapter);
contextEncryptionAdapter.destroy();
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public void createFilesystem(TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("createFilesystem", "createFilesystem")){
LOG.debug("createFilesystem for filesystem: {}",
getClient().getFileSystem());
final AbfsRestOperation op = getClient().createFilesystem(tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public void deleteFilesystem(TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("deleteFilesystem", "deleteFilesystem")) {
LOG.debug("deleteFilesystem for filesystem: {}",
getClient().getFileSystem());
final AbfsRestOperation op = getClient().deleteFilesystem(tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
/**
* Checks existence of parent of the given path.
*
* @param path Path to check.
* @param statistics FileSystem statistics.
* @param overwrite Overwrite flag.
* @param permission Permission of tha path.
* @param umask Umask of the path.
* @param tracingContext tracing context
*
* @return OutputStream output stream of the created file.
* @throws IOException if there is an issue with the operation.
*/
public OutputStream createNonRecursive(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext)
throws IOException {
LOG.debug("CreateNonRecursive for filesystem: {} path: {} overwrite: {} permission: {} umask: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask);
getClient().createNonRecursivePreCheck(path.getParent(),
tracingContext);
return createFile(path, statistics, overwrite, permission,
umask, tracingContext);
}
public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
if (isAppendBlobKey(path.toString())) {
isAppendBlob = true;
}
// if "fs.azure.enable.conditional.create.overwrite" is enabled and
// is a create request with overwrite=true, create will follow different
// flow.
boolean triggerConditionalCreateOverwrite = false;
if (overwrite
&& abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
triggerConditionalCreateOverwrite = true;
}
final ContextEncryptionAdapter contextEncryptionAdapter;
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
AbfsRestOperation op;
if (triggerConditionalCreateOverwrite) {
op = conditionalCreateOverwriteFile(relativePath,
statistics,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
contextEncryptionAdapter,
tracingContext
);
} else {
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
null,
contextEncryptionAdapter,
tracingContext);
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
}
/**
* Conditional create overwrite flow ensures that create overwrites is done
* only if there is match for eTag of existing file.
* @param relativePath
* @param statistics
* @param permissions contains permission and umask
* @param isAppendBlob
* @return
* @throws AzureBlobFileSystemException
*/
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
final FileSystem.Statistics statistics,
final Permissions permissions,
final boolean isAppendBlob,
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
AbfsClient createClient = getClientHandler().getIngressClient();
op = createClient.conditionalCreateOverwriteFile(relativePath, statistics,
permissions, isAppendBlob, contextEncryptionAdapter, tracingContext);
return op;
}
/**
* Method to populate AbfsOutputStreamContext with different parameters to
* be used to construct {@link AbfsOutputStream}.
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
}
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
.enableFlush(abfsConfiguration.isFlushEnabled())
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}
/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
createClient.getFileSystem(),
path,
permission,
umask,
isNamespaceEnabled);
boolean overwrite =
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public AbfsInputStream openFileForRead(final Path path,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws IOException {
return openFileForRead(path, Optional.empty(), statistics,
tracingContext);
}
public AbfsInputStream openFileForRead(Path path,
final Optional<OpenFileParameters> parameters,
final FileSystem.Statistics statistics, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
"getPathStatus")) {
LOG.debug("openFileForRead filesystem: {} path: {}",
getClient().getFileSystem(), path);
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);
String resourceType, eTag;
long contentLength;
ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
* 1. fileStatus is null or not an object of VersionedFileStatus: as eTag
* would not be there in the fileStatus object.
* 2. fileStatus is an object of VersionedFileStatus and the object doesn't
* have encryptionContext field when client's encryptionType is
* ENCRYPTION_CONTEXT.
*/
if ((fileStatus instanceof VersionedFileStatus) && (
getClient().getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT
|| ((VersionedFileStatus) fileStatus).getEncryptionContext()
!= null)) {
path = path.makeQualified(this.uri, path);
Preconditions.checkArgument(fileStatus.getPath().equals(path),
String.format(
"Filestatus path [%s] does not match with given path [%s]",
fileStatus.getPath(), path));
resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
contentLength = fileStatus.getLen();
eTag = ((VersionedFileStatus) fileStatus).getVersion();
final String encryptionContext
= ((VersionedFileStatus) fileStatus).getEncryptionContext();
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
AbfsHttpOperation op = getClient().getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
contentLength = extractContentLength(op);
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
/*
* For file created with ENCRYPTION_CONTEXT, client shall receive
* encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
*/
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String fileEncryptionContext = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
if (fileEncryptionContext == null) {
LOG.debug("EncryptionContext missing in GetPathStatus response");
throw new PathIOException(path.toString(),
"EncryptionContext not present in GetPathStatus response headers");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
}
if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
}
perfInfo.registerSuccess(true);
// Add statistics for InputStream
return new AbfsInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);
}
}
private AbfsInputStreamContext populateAbfsInputStreamContext(
Optional<Configuration> options, ContextEncryptionAdapter contextEncryptionAdapter) {
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
.build();
}
public OutputStream openFileForWrite(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
getClient().getFileSystem(),
path,
overwrite);
String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();
final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
perfInfo.registerResult(op.getResult());
if (getClient().checkIsDir(op.getResult())) {
throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForWrite must be used with files and not directories",
null);
}
final long contentLength = extractContentLength(op.getResult());
final long offset = overwrite ? 0 : contentLength;
perfInfo.registerSuccess(true);
boolean isAppendBlob = false;
if (isAppendBlobKey(path.toString())) {
isAppendBlob = true;
}
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
if (encryptionContext == null) {
throw new PathIOException(path.toString(),
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
}
/**
* Break any current lease on an ABFS file.
*
* @param path file name
* @param tracingContext TracingContext instance to track correlation IDs
* @throws AzureBlobFileSystemException on any exception while breaking the lease
*/
public void breakLease(final Path path, final TracingContext tracingContext) throws AzureBlobFileSystemException {
LOG.debug("lease path: {}", path);
getClient().breakLease(getRelativePath(path), tracingContext);
}
/**
* Rename a file or directory.
* If a source etag is passed in, the operation will attempt to recover
* from a missing source file by probing the destination for
* existence and comparing etags.
* @param source path to source file
* @param destination destination of rename.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
* @return true if recovery was needed and succeeded.
*/
public boolean rename(final Path source,
final Path destination,
final TracingContext tracingContext,
final String sourceEtag) throws IOException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue;
LOG.debug("renameAsync filesystem: {} source: {} destination: {}",
getClient().getFileSystem(),
source,
destination);
String continuation = null;
String sourceRelativePath = getRelativePath(source);
String destinationRelativePath = getRelativePath(destination);
// was any operation recovered from?
boolean recovered = false;
do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
final AbfsClientRenameResult abfsClientRenameResult =
getClient().renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag, false);
AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
// update the recovery flag.
recovered |= abfsClientRenameResult.isRenameRecovered();
populateRenameRecoveryStatistics(abfsClientRenameResult);
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);
return recovered;
}
public void delete(final Path path, final boolean recursive,
TracingContext tracingContext) throws AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue = true;
LOG.debug("delete filesystem: {} path: {} recursive: {}",
getClient().getFileSystem(),
path,
String.valueOf(recursive));
String continuation = null;
String relativePath = getRelativePath(path);
do {
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = getClient().deletePath(relativePath, recursive,
continuation, tracingContext);
perfInfo.registerResult(op.getResult());
continuation = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);
}
public FileStatus getFileStatus(final Path path,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("getFileStatus", "undetermined")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
isNamespaceEnabled);
final AbfsRestOperation op;
if (path.isRoot()) {
if (isNamespaceEnabled) {
perfInfo.registerCallee("getAclStatus");
op = getClient().getAclStatus(getRelativePath(path), tracingContext);
} else {
perfInfo.registerCallee("getFilesystemProperties");
op = getClient().getFilesystemProperties(tracingContext);
}
} else {
perfInfo.registerCallee("getPathStatus");
op = getClient().getPathStatus(getRelativePath(path), false, tracingContext, null);
}
perfInfo.registerResult(op.getResult());
final long blockSize = abfsConfiguration.getAzureBlockSize();
final AbfsHttpOperation result = op.getResult();
String eTag = extractEtagHeader(result);
final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
final String encryptionContext = op.getResult().getResponseHeader(X_MS_ENCRYPTION_CONTEXT);
final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
final long contentLength;
final boolean resourceIsDir;
if (path.isRoot()) {
contentLength = 0;
resourceIsDir = true;
} else {
contentLength = extractContentLength(result);
resourceIsDir = getClient().checkIsDir(result);
}
final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
true,
userName);
final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
false,
primaryUserGroup);
perfInfo.registerSuccess(true);
return new VersionedFileStatus(
transformedOwner,
transformedGroup,
permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(permissions),
hasAcl,
contentLength,
resourceIsDir,
1,
blockSize,
DateTimeUtils.parseLastModifiedTime(lastModified),
path,
eTag,
encryptionContext);
}
}
/**
* @param path The list path.
* @param tracingContext Tracks identifiers for request header
* @return the entries in the path.
* */
@Override
public FileStatus[] listStatus(final Path path, TracingContext tracingContext) throws IOException {
return listStatus(path, null, tracingContext);
}
/**
* @param path Path the list path.
* @param startFrom the entry name that list results should start with.
* For example, if folder "/folder" contains four files: "afile", "bfile", "hfile", "ifile".
* Then listStatus(Path("/folder"), "hfile") will return "/folder/hfile" and "folder/ifile"
* Notice that if startFrom is a non-existent entry name, then the list response contains
* all entries after this non-existent entry in lexical order:
* listStatus(Path("/folder"), "cfile") will return "/folder/hfile" and "/folder/ifile".
* @param tracingContext Tracks identifiers for request header
* @return the entries in the path start from "startFrom" in lexical order.
* */
@InterfaceStability.Unstable
@Override
public FileStatus[] listStatus(final Path path, final String startFrom, TracingContext tracingContext) throws IOException {
List<FileStatus> fileStatuses = new ArrayList<>();
listStatus(path, startFrom, fileStatuses, true, null, tracingContext);
return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
}
@Override
public String listStatus(final Path path, final String startFrom,
List<FileStatus> fileStatuses, final boolean fetchAll,
String continuation, TracingContext tracingContext) throws IOException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
boolean shouldContinue = true;
LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}",
getClient().getFileSystem(),
path,
startFrom);
final String relativePath = getRelativePath(path);
AbfsClient listingClient = getClient();
if (continuation == null || continuation.isEmpty()) {
// generate continuation token if a valid startFrom is provided.
if (startFrom != null && !startFrom.isEmpty()) {
/*
* Blob Endpoint Does not support startFrom yet. Fallback to DFS Client.
* startFrom remains null for all HDFS APIs. This is used only for tests.
*/
listingClient = getClient(AbfsServiceType.DFS);
continuation = getIsNamespaceEnabled(tracingContext)
? generateContinuationTokenForXns(startFrom)
: generateContinuationTokenForNonXns(relativePath, startFrom);
}
}
List<FileStatus> fileStatusList = new ArrayList<>();
do {
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
ListResponseData listResponseData = listingClient.listPath(relativePath,
false, abfsConfiguration.getListMaxResults(), continuation,
tracingContext, this.uri);
AbfsRestOperation op = listResponseData.getOp();
perfInfo.registerResult(op.getResult());
continuation = listResponseData.getContinuationToken();
List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
fileStatusList.addAll(fileStatusListInCurrItr);
}
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue =
fetchAll && continuation != null && !continuation.isEmpty();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);
fileStatuses.addAll(listingClient.postListProcessing(
relativePath, fileStatusList, tracingContext, uri));
return continuation;
}
// generate continuation token for xns account
private String generateContinuationTokenForXns(final String firstEntryName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
&& !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH),
"startFrom must be a dir/file name and it can not be a full path");
StringBuilder sb = new StringBuilder();
sb.append(firstEntryName).append("#$").append("0");
CRC64 crc64 = new CRC64();
StringBuilder token = new StringBuilder();
token.append(crc64.compute(sb.toString().getBytes(StandardCharsets.UTF_8)))
.append(SINGLE_WHITE_SPACE)
.append("0")
.append(SINGLE_WHITE_SPACE)
.append(firstEntryName);
return Base64.encode(token.toString().getBytes(StandardCharsets.UTF_8));
}
// generate continuation token for non-xns account
private String generateContinuationTokenForNonXns(String path, final String firstEntryName) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
&& !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH),
"startFrom must be a dir/file name and it can not be a full path");
// Notice: non-xns continuation token requires full path (first "/" is not included) for startFrom
path = AbfsClient.getDirectoryQueryParameter(path);
final String startFrom = (path.isEmpty() || path.equals(ROOT_PATH))
? firstEntryName
: path + ROOT_PATH + firstEntryName;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TOKEN_DATE_PATTERN, Locale.US);
String date = simpleDateFormat.format(new Date());
String token = String.format("%06d!%s!%06d!%s!%06d!%s!",
path.length(), path, startFrom.length(), startFrom, date.length(), date);
String base64EncodedToken = Base64.encode(token.getBytes(StandardCharsets.UTF_8));
StringBuilder encodedTokenBuilder = new StringBuilder(base64EncodedToken.length() + 5);
encodedTokenBuilder.append(String.format("%s!%d!", TOKEN_VERSION, base64EncodedToken.length()));
for (int i = 0; i < base64EncodedToken.length(); i++) {
char current = base64EncodedToken.charAt(i);
if (CHAR_FORWARD_SLASH == current) {
current = CHAR_UNDERSCORE;
} else if (CHAR_PLUS == current) {
current = CHAR_STAR;
} else if (CHAR_EQUALS == current) {
current = CHAR_HYPHEN;
}
encodedTokenBuilder.append(current);
}
return encodedTokenBuilder.toString();
}
public void setOwner(final Path path, final String owner, final String group,
TracingContext tracingContext) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfo = startTracking("setOwner", "setOwner")) {
LOG.debug(
"setOwner filesystem: {} path: {} owner: {} group: {}",
getClient().getFileSystem(),
path,
owner,
group);
final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner);
final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group);
final AbfsRestOperation op = getClient().setOwner(getRelativePath(path),
transformedOwner,
transformedGroup,
tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public void setPermission(final Path path, final FsPermission permission,
TracingContext tracingContext) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfo = startTracking("setPermission", "setPermission")) {
LOG.debug(
"setPermission filesystem: {} path: {} permission: {}",
getClient().getFileSystem(),
path,
permission);
final AbfsRestOperation op = getClient().setPermission(getRelativePath(path),
String.format(AbfsHttpConstants.PERMISSION_FORMAT,
permission.toOctal()), tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec,
TracingContext tracingContext) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfoGet = startTracking("modifyAclEntries", "getAclStatus")) {
LOG.debug(
"modifyAclEntries filesystem: {} path: {} aclSpec: {}",
getClient().getFileSystem(),
path,
AclEntry.aclSpecToString(aclSpec));
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
String relativePath = getRelativePath(path);
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, useUpn, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
AbfsAclHelper.modifyAclEntriesInternal(aclEntries, modifyAclEntries);
perfInfoGet.registerSuccess(true).finishTracking();
try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", "setAcl")) {
final AbfsRestOperation setAclOp = getClient()
.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries),
eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
.registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT);
}
}
}
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec,
TracingContext tracingContext) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfoGet = startTracking("removeAclEntries", "getAclStatus")) {
LOG.debug(
"removeAclEntries filesystem: {} path: {} aclSpec: {}",
getClient().getFileSystem(),
path,
AclEntry.aclSpecToString(aclSpec));
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
String relativePath = getRelativePath(path);
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, isUpnFormat, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
perfInfoGet.registerSuccess(true).finishTracking();
try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", "setAcl")) {
final AbfsRestOperation setAclOp = getClient()
.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries),
eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
.registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT);
}
}
}
public void removeDefaultAcl(final Path path, TracingContext tracingContext)
throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfoGet = startTracking("removeDefaultAcl", "getAclStatus")) {
LOG.debug(
"removeDefaultAcl filesystem: {} path: {}",
getClient().getFileSystem(),
path);
String relativePath = getRelativePath(path);
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> defaultAclEntries = new HashMap<>();
for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
if (aclEntry.getKey().startsWith("default:")) {
defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
}
}
aclEntries.keySet().removeAll(defaultAclEntries.keySet());
perfInfoGet.registerSuccess(true).finishTracking();
try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", "setAcl")) {
final AbfsRestOperation setAclOp = getClient()
.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries),
eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
.registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT);
}
}
}
public void removeAcl(final Path path, TracingContext tracingContext)
throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfoGet = startTracking("removeAcl", "getAclStatus")){
LOG.debug(
"removeAcl filesystem: {} path: {}",
getClient().getFileSystem(),
path);
String relativePath = getRelativePath(path);
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
final Map<String, String> newAclEntries = new HashMap<>();
newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
perfInfoGet.registerSuccess(true).finishTracking();
try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) {
final AbfsRestOperation setAclOp = getClient()
.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(newAclEntries),
eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
.registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT);
}
}
}
public void setAcl(final Path path, final List<AclEntry> aclSpec,
TracingContext tracingContext) throws AzureBlobFileSystemException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfoGet = startTracking("setAcl", "getAclStatus")) {
LOG.debug(
"setAcl filesystem: {} path: {} aclspec: {}",
getClient().getFileSystem(),
path,
AclEntry.aclSpecToString(aclSpec));
identityTransformer.transformAclEntriesForSetRequest(aclSpec);
final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
String relativePath = getRelativePath(path);
final AbfsRestOperation op = getClient()
.getAclStatus(relativePath, isUpnFormat, tracingContext);
perfInfoGet.registerResult(op.getResult());
final String eTag = extractEtagHeader(op.getResult());
final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
AbfsAclHelper.setAclEntriesInternal(aclEntries, getAclEntries);
perfInfoGet.registerSuccess(true).finishTracking();
try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) {
final AbfsRestOperation setAclOp =
getClient().setAcl(relativePath,
AbfsAclHelper.serializeAclSpec(aclEntries), eTag, tracingContext);
perfInfoSet.registerResult(setAclOp.getResult())
.registerSuccess(true)
.registerAggregates(perfInfoGet.getTrackingStart(), GET_SET_AGGREGATE_COUNT);
}
}
}
public AclStatus getAclStatus(final Path path, TracingContext tracingContext)
throws IOException {
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"This operation is only valid for storage accounts with the hierarchical namespace enabled.");
}
try (AbfsPerfInfo perfInfo = startTracking("getAclStatus", "getAclStatus")) {
LOG.debug(
"getAclStatus filesystem: {} path: {}",
getClient().getFileSystem(),
path);
AbfsRestOperation op = getClient()
.getAclStatus(getRelativePath(path), tracingContext);
AbfsHttpOperation result = op.getResult();
perfInfo.registerResult(result);
final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
true,
userName);
final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
false,
primaryUserGroup);
final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
final List<AclEntry> aclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
identityTransformer.transformAclEntriesForGetRequest(aclEntries, userName, primaryUserGroup);
final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
: AbfsPermission.valueOf(permissions);
final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
aclStatusBuilder.owner(transformedOwner);
aclStatusBuilder.group(transformedGroup);
aclStatusBuilder.setPermission(fsPermission);
aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
aclStatusBuilder.addEntries(aclEntries);
perfInfo.registerSuccess(true);
return aclStatusBuilder.build();
}
}
public void access(final Path path, final FsAction mode,
TracingContext tracingContext) throws AzureBlobFileSystemException {
LOG.debug("access for filesystem: {}, path: {}, mode: {}",
this.getClient().getFileSystem(), path, mode);
if (!this.abfsConfiguration.isCheckAccessEnabled()
|| !getIsNamespaceEnabled(tracingContext)) {
LOG.debug("Returning; either check access is not enabled or the account"
+ " used is not namespace enabled");
return;
}
try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) {
final AbfsRestOperation op = this.getClient()
.checkAccess(getRelativePath(path), mode.SYMBOL, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
}
return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
}
/**
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
* Operations.
*
* @param uri Uniform resource identifier for Abfs.
* @param fileSystemName Name of the fileSystem being used.
* @param accountName Name of the account being used to access Azure
* data store.
* @param isSecure Tells if https is being used or http.
* @throws IOException
*/
private void initializeClient(URI uri, String fileSystemName,
String accountName, boolean isSecure)
throws IOException {
if (this.getClient() != null) {
return;
}
final URIBuilder uriBuilder = getURIBuilder(accountName, isSecure);
final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
URL baseUrl;
try {
baseUrl = new URL(url);
} catch (MalformedURLException e) {
throw new InvalidUriException(uri.toString());
}
SharedKeyCredentials creds = null;
AccessTokenProvider tokenProvider = null;
SASTokenProvider sasTokenProvider = null;
if (authType == AuthType.OAuth) {
AzureADAuthenticator.init(abfsConfiguration);
}
if (authType == AuthType.SharedKey) {
LOG.trace("Fetching SharedKey credentials");
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
throw new InvalidUriException(
uri.toString() + " - account name is not fully qualified.");
}
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
abfsConfiguration.getStorageAccountKey());
} else if (authType == AuthType.SAS) {
LOG.trace("Fetching SAS Token Provider");
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
} else {
LOG.trace("Fetching token provider");
tokenProvider = abfsConfiguration.getTokenProvider();
ExtensionHelper.bind(tokenProvider, uri,
abfsConfiguration.getRawConfiguration());
}
// Encryption setup
EncryptionContextProvider encryptionContextProvider = null;
if (isSecure) {
encryptionContextProvider =
abfsConfiguration.createEncryptionContextProvider();
if (encryptionContextProvider != null) {
if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
throw new PathIOException(uri.getPath(),
"Both global key and encryption context are set, only one allowed");
}
encryptionContextProvider.initialize(
abfsConfiguration.getRawConfiguration(), accountName,
fileSystemName);
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
if (abfsConfiguration.getEncodedClientProvidedEncryptionKeySHA() == null) {
throw new PathIOException(uri.getPath(),
"Encoded SHA256 hash must be provided for global encryption");
}
}
}
LOG.trace("Initializing AbfsClient for {}", baseUrl);
if (tokenProvider != null) {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
} else {
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
}
this.setClient(getClientHandler().getClient());
LOG.trace("AbfsClient init complete");
}
private AbfsServiceType getAbfsServiceTypeFromUrl() {
if (uri.toString().contains(ABFS_BLOB_DOMAIN_NAME)) {
return AbfsServiceType.BLOB;
}
// In case of DFS Domain name or any other custom endpoint, the service
// type is to be identified as default DFS.
LOG.debug("Falling back to default service type DFS");
return AbfsServiceType.DFS;
}
/**
* Populate a new AbfsClientContext instance with the desired properties.
*
* @return an instance of AbfsClientContext.
*/
private AbfsClientContext populateAbfsClientContext() {
return new AbfsClientContextBuilder()
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfiguration))
.withStaticRetryPolicy(
new StaticRetryPolicy(abfsConfiguration))
.withAbfsCounters(abfsCounters)
.withAbfsPerfTracker(abfsPerfTracker)
.build();
}
public String getRelativePath(final Path path) {
Preconditions.checkNotNull(path, "path");
String relPath = path.toUri().getPath();
if (relPath.isEmpty()) {
// This means that path passed by user is absolute path of root without "/" at end.
relPath = ROOT_PATH;
}
return relPath;
}
/**
* Extracts the content length from the HTTP operation's response headers.
*
* @param op The AbfsHttpOperation instance from which to extract the content length.
* This operation contains the HTTP response headers.
* @return The content length as a long value. If the Content-Length header is
* not present or is empty, returns 0.
*/
private long extractContentLength(AbfsHttpOperation op) {
long contentLength;
String contentLengthHeader = op.getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (!StringUtils.isEmpty(contentLengthHeader)) {
contentLength = Long.parseLong(contentLengthHeader);
} else {
contentLength = 0;
}
return contentLength;
}
private boolean parseIsDirectory(final String resourceType) {
return resourceType != null
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}
private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
Hashtable<String, String> properties = new Hashtable<>();
final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
if (xMsProperties != null && !xMsProperties.isEmpty()) {
String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
if (userProperties.length == 0) {
return properties;
}
for (String property : userProperties) {
if (property.isEmpty()) {
throw new InvalidFileSystemPropertyException(xMsProperties);
}
String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
if (nameValue.length != 2) {
throw new InvalidFileSystemPropertyException(xMsProperties);
}
byte[] decodedValue = Base64.decode(nameValue[1]);
final String value;
try {
value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
properties.put(nameValue[0], value);
}
}
return properties;
}
private AbfsPerfInfo startTracking(String callerName, String calleeName) {
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
}
/**
* Permissions class contain provided permission and umask in octalNotation.
* If the object is created for namespace-disabled account, the permission and
* umask would be null.
* */
public static final class Permissions {
private final String permission;
private final String umask;
Permissions(boolean isNamespaceEnabled, FsPermission permission,
FsPermission umask) {
if (isNamespaceEnabled) {
this.permission = getOctalNotation(permission);
this.umask = getOctalNotation(umask);
} else {
this.permission = null;
this.umask = null;
}
}
private String getOctalNotation(FsPermission fsPermission) {
Preconditions.checkNotNull(fsPermission, "fsPermission");
return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
}
public Boolean hasPermission() {
return permission != null && !permission.isEmpty();
}
public Boolean hasUmask() {
return umask != null && !umask.isEmpty();
}
public String getPermission() {
return permission;
}
public String getUmask() {
return umask;
}
@Override
public String toString() {
return String.format("{\"permission\":%s, \"umask\":%s}", permission,
umask);
}
}
/**
* A builder class for AzureBlobFileSystemStore.
*/
public static final class AzureBlobFileSystemStoreBuilder {
private URI uri;
private boolean isSecureScheme;
private Configuration configuration;
private AbfsCounters abfsCounters;
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
private BackReference fsBackRef;
public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) {
this.isSecureScheme = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withConfiguration(
Configuration value) {
this.configuration = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withAbfsCounters(
AbfsCounters value) {
this.abfsCounters = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withBlockFactory(
DataBlocks.BlockFactory value) {
this.blockFactory = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
int value) {
this.blockOutputActiveBlocks = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withBackReference(
BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
public AzureBlobFileSystemStoreBuilder build() {
return this;
}
}
@VisibleForTesting
public AbfsClient getClient() {
return this.client;
}
@VisibleForTesting
public AbfsClient getClient(AbfsServiceType serviceType) {
return getClientHandler().getClient(serviceType);
}
@VisibleForTesting
public AbfsClientHandler getClientHandler() {
return this.clientHandler;
}
@VisibleForTesting
void setClient(AbfsClient client) {
this.client = client;
}
@VisibleForTesting
void setClientHandler(AbfsClientHandler clientHandler) {
this.clientHandler = clientHandler;
}
@VisibleForTesting
DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}
void setNamespaceEnabled(boolean isNamespaceEnabled){
getAbfsConfiguration().setIsNamespaceEnabledAccount(isNamespaceEnabled);
}
@VisibleForTesting
public URI getUri() {
return this.uri;
}
private void updateInfiniteLeaseDirs() {
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
// remove the empty string, since isKeyForDirectory returns true for empty strings
// and we don't want to default to enabling infinite lease dirs
this.azureInfiniteLeaseDirSet.remove("");
}
private AbfsLease maybeCreateLease(String relativePath, TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(getClient(), relativePath, true,
INFINITE_LEASE_DURATION, null, tracingContext);
leaseRefs.put(lease, null);
return lease;
}
@VisibleForTesting
boolean areLeasesFreed() {
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease != null && !lease.isFreed()) {
return false;
}
}
return true;
}
/**
* Get the etag header from a response, stripping any quotations.
* see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
* @param result response to process.
* @return the quote-unwrapped etag.
*/
public static String extractEtagHeader(AbfsHttpOperation result) {
String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (etag != null) {
// strip out any wrapper "" quotes which come back, for consistency with
// list calls
if (etag.startsWith("W/\"")) {
// Weak etag
etag = etag.substring(3);
} else if (etag.startsWith("\"")) {
// strong etag
etag = etag.substring(1);
}
if (etag.endsWith("\"")) {
// trailing quote
etag = etag.substring(0, etag.length() - 1);
}
}
return etag;
}
/**
* Increment rename recovery based counters in IOStatistics.
*
* @param abfsClientRenameResult Result of an ABFS rename operation.
*/
private void populateRenameRecoveryStatistics(
AbfsClientRenameResult abfsClientRenameResult) {
if (abfsClientRenameResult.isRenameRecovered()) {
abfsCounters.incrementCounter(RENAME_RECOVERY, 1);
}
if (abfsClientRenameResult.isIncompleteMetadataState()) {
abfsCounters.incrementCounter(METADATA_INCOMPLETE_RENAME_FAILURES, 1);
}
}
}