AbfsClientHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromDfsToBlob;
/**
* AbfsClientHandler is a class that provides a way to get the AbfsClient
* based on the service type.
*/
public class AbfsClientHandler {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClientHandler.class);
private AbfsServiceType defaultServiceType;
private final AbfsDfsClient dfsAbfsClient;
private final AbfsBlobClient blobAbfsClient;
public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}
public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}
/**
* Initialize the default service type based on the user configuration.
* @param abfsConfiguration set by user.
*/
private void initServiceType(final AbfsConfiguration abfsConfiguration) {
this.defaultServiceType = abfsConfiguration.getFsConfiguredServiceType();
}
/**
* Get the AbfsClient based on the default service type.
* @return AbfsClient
*/
public AbfsClient getClient() {
return getClient(defaultServiceType);
}
/**
* Get the AbfsClient based on the service type.
* @param serviceType AbfsServiceType.
* @return AbfsClient
*/
public AbfsClient getClient(AbfsServiceType serviceType) {
return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : blobAbfsClient;
}
/**
* Create the AbfsDfsClient using the url used to configure file system.
* If URL is for Blob endpoint, it will be converted to DFS endpoint.
* @param baseUrl URL.
* @param creds SharedKeyCredentials.
* @param abfsConfiguration AbfsConfiguration.
* @param tokenProvider AccessTokenProvider.
* @param sasTokenProvider SASTokenProvider.
* @param encryptionContextProvider EncryptionContextProvider.
* @param abfsClientContext AbfsClientContext.
* @return AbfsDfsClient with DFS endpoint URL.
* @throws IOException if URL conversion fails.
*/
private AbfsDfsClient createDfsClient(final URL baseUrl,
final SharedKeyCredentials creds,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
if (tokenProvider != null) {
LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
}
/**
* Create the AbfsBlobClient using the url used to configure file system.
* If URL is for DFS endpoint, it will be converted to Blob endpoint.
* @param baseUrl URL.
* @param creds SharedKeyCredentials.
* @param abfsConfiguration AbfsConfiguration.
* @param tokenProvider AccessTokenProvider.
* @param sasTokenProvider SASTokenProvider.
* @param encryptionContextProvider EncryptionContextProvider.
* @param abfsClientContext AbfsClientContext.
* @return AbfsBlobClient with Blob endpoint URL.
* @throws IOException if URL conversion fails.
*/
private AbfsBlobClient createBlobClient(final URL baseUrl,
final SharedKeyCredentials creds,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL blobUrl = changeUrlFromDfsToBlob(baseUrl);
if (tokenProvider != null) {
LOG.debug("Creating AbfsBlobClient with access token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
LOG.debug("Creating AbfsBlobClient with SAS token provider using the URL: {}", blobUrl);
return new AbfsBlobClient(blobUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
}
}