DelegationClientBuilder.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.object.tos;
import com.volcengine.tos.TOSClientConfiguration;
import com.volcengine.tos.TosException;
import com.volcengine.tos.transport.TransportConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.conf.TosKeys;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.object.tos.auth.CredentialsProvider;
import org.apache.hadoop.fs.tosfs.util.ParseUtils;
import org.apache.hadoop.fs.tosfs.util.TOSClientContextUtils;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.VersionInfo;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
public class DelegationClientBuilder {
public static final int DISABLE_TOS_RETRY_VALUE = -1;
private static final String TOS_ENDPOINT_KEY =
ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME);
private static final String TOS_REGION_KEY = ConfKeys.FS_OBJECT_STORAGE_REGION.key(TOS_SCHEME);
@VisibleForTesting
static final Map<String, DelegationClient> CACHE = new ConcurrentHashMap<>();
private String bucket;
private Configuration conf;
public DelegationClientBuilder bucket(String bucketInput) {
this.bucket = bucketInput;
return this;
}
public DelegationClientBuilder conf(Configuration confInput) {
this.conf = confInput;
return this;
}
public DelegationClient build() throws TosException {
Preconditions.checkNotNull(bucket, "Bucket cannot be null");
Preconditions.checkNotNull(conf, "Conf cannot be null");
String endpoint = getAndCheckEndpoint(conf);
String region = getAndCheckRegion(conf, endpoint);
if (conf.getBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE,
TosKeys.FS_TOS_DISABLE_CLIENT_CACHE_DEFAULT)) {
return createNewClient(conf, endpoint, region, bucket, false);
}
return CACHE.computeIfAbsent(bucket,
client -> createNewClient(conf, endpoint, region, bucket, true));
}
private DelegationClient createNewClient(Configuration config, String endpoint, String region,
String bucketName, boolean cached) {
CredentialsProvider provider = createProvider(config, bucketName);
TOSClientConfiguration clientConfiguration = TOSClientConfiguration.builder()
.region(region)
.endpoint(endpoint)
.credentials(provider)
.enableCrc(config.getBoolean(
TosKeys.FS_TOS_CRC_CHECK_ENABLED, TosKeys.FS_TOS_CRC_CHECK_ENABLED_DEFAULT))
.transportConfig(createTransportConfig(config))
.userAgentProductName(config.get(
TosKeys.FS_TOS_USER_AGENT_PREFIX, TosKeys.FS_TOS_USER_AGENT_PREFIX_DEFAULT))
.userAgentSoftName(Constants.TOS_FS)
.userAgentSoftVersion(VersionInfo.getVersion())
.build();
int maxRetryTimes = config.getInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES,
TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES_DEFAULT);
List<String> nonRetryable409ErrorCodes = Arrays.asList(
config.getTrimmedStrings(TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES,
TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES_DEFAULT));
if (cached) {
return new CachedClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes);
} else {
return new DelegationClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes);
}
}
private CredentialsProvider createProvider(Configuration config, String bucketName) {
try {
CredentialsProvider provider = (CredentialsProvider) Class.forName(
config.get(TosKeys.FS_TOS_CREDENTIALS_PROVIDER,
TosKeys.FS_TOS_CREDENTIALS_PROVIDER_DEFAULT))
.getDeclaredConstructor()
.newInstance();
provider.initialize(config, bucketName);
return provider;
} catch (ClassNotFoundException |
InstantiationException |
IllegalAccessException |
InvocationTargetException |
NoSuchMethodException e) {
throw new TosException(e);
}
}
private String getAndCheckEndpoint(Configuration config) {
String endpoint = config.get(TOS_ENDPOINT_KEY);
if (StringUtils.isBlank(endpoint)) {
endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT);
}
Preconditions.checkNotNull(endpoint, "%s cannot be null", TOS_ENDPOINT_KEY);
return endpoint.trim();
}
private String getAndCheckRegion(Configuration config, String endpoint) {
String region = config.get(TOS_REGION_KEY);
if (StringUtils.isNotBlank(region)) {
return region.trim();
}
region = TOSClientContextUtils.parseRegion(endpoint);
Preconditions.checkNotNull(region, "%s cannot be null", TOS_REGION_KEY);
return region.trim();
}
private TransportConfig createTransportConfig(Configuration config) {
TransportConfig.TransportConfigBuilder builder = TransportConfig.builder();
// Disable tos sdk retry with negative number since we have set retry strategy above TOS SDK,
// which cannot support retry all input streams via mark & reset API.
// It's hard to use it as there are some restrictions.
// the TOS SDK will reset the max retry count with 3 if the configured count equal to 0.
builder.maxRetryCount(DISABLE_TOS_RETRY_VALUE);
builder.maxConnections(config.getInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS,
TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS_DEFAULT));
builder.idleConnectionTimeMills(config.getInt(TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS,
TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS_DEFAULT));
builder.connectTimeoutMills(config.getInt(TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS,
TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT));
builder.readTimeoutMills(config.getInt(TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS,
TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT));
builder.writeTimeoutMills(config.getInt(TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS,
TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT));
builder.enableVerifySSL(config.getBoolean(TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL,
TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL_DEFAULT));
builder.dnsCacheTimeMinutes(config.getInt(TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES,
TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES_DEFAULT));
return builder.build();
}
static class CachedClient extends DelegationClient {
protected CachedClient(TOSClientConfiguration configuration, int maxRetryTimes,
List<String> nonRetryable409ErrorCodes) {
super(configuration, maxRetryTimes, nonRetryable409ErrorCodes);
}
@Override
public void close() {
// do nothing as this client may be shared by multiple upper-layer instances
}
}
}