DelegationClient.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.object.tos;
import com.volcengine.tos.TOSClientConfiguration;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.auth.Credential;
import com.volcengine.tos.auth.Credentials;
import com.volcengine.tos.TOSV2;
import com.volcengine.tos.comm.HttpStatus;
import com.volcengine.tos.comm.common.ACLType;
import com.volcengine.tos.internal.RequestOptionsBuilder;
import com.volcengine.tos.model.acl.GetObjectAclOutput;
import com.volcengine.tos.model.acl.PutObjectAclInput;
import com.volcengine.tos.model.acl.PutObjectAclOutput;
import com.volcengine.tos.model.bucket.CreateBucketInput;
import com.volcengine.tos.model.bucket.CreateBucketOutput;
import com.volcengine.tos.model.bucket.CreateBucketV2Input;
import com.volcengine.tos.model.bucket.CreateBucketV2Output;
import com.volcengine.tos.model.bucket.DeleteBucketCORSInput;
import com.volcengine.tos.model.bucket.DeleteBucketCORSOutput;
import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainInput;
import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainOutput;
import com.volcengine.tos.model.bucket.DeleteBucketEncryptionInput;
import com.volcengine.tos.model.bucket.DeleteBucketEncryptionOutput;
import com.volcengine.tos.model.bucket.DeleteBucketInput;
import com.volcengine.tos.model.bucket.DeleteBucketInventoryInput;
import com.volcengine.tos.model.bucket.DeleteBucketInventoryOutput;
import com.volcengine.tos.model.bucket.DeleteBucketLifecycleInput;
import com.volcengine.tos.model.bucket.DeleteBucketLifecycleOutput;
import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackInput;
import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackOutput;
import com.volcengine.tos.model.bucket.DeleteBucketOutput;
import com.volcengine.tos.model.bucket.DeleteBucketPolicyInput;
import com.volcengine.tos.model.bucket.DeleteBucketPolicyOutput;
import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogInput;
import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogOutput;
import com.volcengine.tos.model.bucket.DeleteBucketRenameInput;
import com.volcengine.tos.model.bucket.DeleteBucketRenameOutput;
import com.volcengine.tos.model.bucket.DeleteBucketReplicationInput;
import com.volcengine.tos.model.bucket.DeleteBucketReplicationOutput;
import com.volcengine.tos.model.bucket.DeleteBucketTaggingInput;
import com.volcengine.tos.model.bucket.DeleteBucketTaggingOutput;
import com.volcengine.tos.model.bucket.DeleteBucketWebsiteInput;
import com.volcengine.tos.model.bucket.DeleteBucketWebsiteOutput;
import com.volcengine.tos.model.bucket.GetBucketACLInput;
import com.volcengine.tos.model.bucket.GetBucketACLOutput;
import com.volcengine.tos.model.bucket.GetBucketCORSInput;
import com.volcengine.tos.model.bucket.GetBucketCORSOutput;
import com.volcengine.tos.model.bucket.GetBucketEncryptionInput;
import com.volcengine.tos.model.bucket.GetBucketEncryptionOutput;
import com.volcengine.tos.model.bucket.GetBucketInventoryInput;
import com.volcengine.tos.model.bucket.GetBucketInventoryOutput;
import com.volcengine.tos.model.bucket.GetBucketLifecycleInput;
import com.volcengine.tos.model.bucket.GetBucketLifecycleOutput;
import com.volcengine.tos.model.bucket.GetBucketLocationInput;
import com.volcengine.tos.model.bucket.GetBucketLocationOutput;
import com.volcengine.tos.model.bucket.GetBucketMirrorBackInput;
import com.volcengine.tos.model.bucket.GetBucketMirrorBackOutput;
import com.volcengine.tos.model.bucket.GetBucketNotificationInput;
import com.volcengine.tos.model.bucket.GetBucketNotificationOutput;
import com.volcengine.tos.model.bucket.GetBucketNotificationType2Input;
import com.volcengine.tos.model.bucket.GetBucketNotificationType2Output;
import com.volcengine.tos.model.bucket.GetBucketPolicyInput;
import com.volcengine.tos.model.bucket.GetBucketPolicyOutput;
import com.volcengine.tos.model.bucket.GetBucketRealTimeLogInput;
import com.volcengine.tos.model.bucket.GetBucketRealTimeLogOutput;
import com.volcengine.tos.model.bucket.GetBucketRenameInput;
import com.volcengine.tos.model.bucket.GetBucketRenameOutput;
import com.volcengine.tos.model.bucket.GetBucketReplicationInput;
import com.volcengine.tos.model.bucket.GetBucketReplicationOutput;
import com.volcengine.tos.model.bucket.GetBucketTaggingInput;
import com.volcengine.tos.model.bucket.GetBucketTaggingOutput;
import com.volcengine.tos.model.bucket.GetBucketVersioningInput;
import com.volcengine.tos.model.bucket.GetBucketVersioningOutput;
import com.volcengine.tos.model.bucket.GetBucketWebsiteInput;
import com.volcengine.tos.model.bucket.GetBucketWebsiteOutput;
import com.volcengine.tos.model.bucket.HeadBucketOutput;
import com.volcengine.tos.model.bucket.HeadBucketV2Input;
import com.volcengine.tos.model.bucket.HeadBucketV2Output;
import com.volcengine.tos.model.bucket.ListBucketCustomDomainInput;
import com.volcengine.tos.model.bucket.ListBucketCustomDomainOutput;
import com.volcengine.tos.model.bucket.ListBucketInventoryInput;
import com.volcengine.tos.model.bucket.ListBucketInventoryOutput;
import com.volcengine.tos.model.bucket.ListBucketsInput;
import com.volcengine.tos.model.bucket.ListBucketsOutput;
import com.volcengine.tos.model.bucket.ListBucketsV2Input;
import com.volcengine.tos.model.bucket.ListBucketsV2Output;
import com.volcengine.tos.model.bucket.PutBucketACLInput;
import com.volcengine.tos.model.bucket.PutBucketACLOutput;
import com.volcengine.tos.model.bucket.PutBucketCORSInput;
import com.volcengine.tos.model.bucket.PutBucketCORSOutput;
import com.volcengine.tos.model.bucket.PutBucketCustomDomainInput;
import com.volcengine.tos.model.bucket.PutBucketCustomDomainOutput;
import com.volcengine.tos.model.bucket.PutBucketEncryptionInput;
import com.volcengine.tos.model.bucket.PutBucketEncryptionOutput;
import com.volcengine.tos.model.bucket.PutBucketInventoryInput;
import com.volcengine.tos.model.bucket.PutBucketInventoryOutput;
import com.volcengine.tos.model.bucket.PutBucketLifecycleInput;
import com.volcengine.tos.model.bucket.PutBucketLifecycleOutput;
import com.volcengine.tos.model.bucket.PutBucketMirrorBackInput;
import com.volcengine.tos.model.bucket.PutBucketMirrorBackOutput;
import com.volcengine.tos.model.bucket.PutBucketNotificationInput;
import com.volcengine.tos.model.bucket.PutBucketNotificationOutput;
import com.volcengine.tos.model.bucket.PutBucketNotificationType2Input;
import com.volcengine.tos.model.bucket.PutBucketNotificationType2Output;
import com.volcengine.tos.model.bucket.PutBucketPolicyInput;
import com.volcengine.tos.model.bucket.PutBucketPolicyOutput;
import com.volcengine.tos.model.bucket.PutBucketRealTimeLogInput;
import com.volcengine.tos.model.bucket.PutBucketRealTimeLogOutput;
import com.volcengine.tos.model.bucket.PutBucketRenameInput;
import com.volcengine.tos.model.bucket.PutBucketRenameOutput;
import com.volcengine.tos.model.bucket.PutBucketReplicationInput;
import com.volcengine.tos.model.bucket.PutBucketReplicationOutput;
import com.volcengine.tos.model.bucket.PutBucketStorageClassInput;
import com.volcengine.tos.model.bucket.PutBucketStorageClassOutput;
import com.volcengine.tos.model.bucket.PutBucketTaggingInput;
import com.volcengine.tos.model.bucket.PutBucketTaggingOutput;
import com.volcengine.tos.model.bucket.PutBucketVersioningInput;
import com.volcengine.tos.model.bucket.PutBucketVersioningOutput;
import com.volcengine.tos.model.bucket.PutBucketWebsiteInput;
import com.volcengine.tos.model.bucket.PutBucketWebsiteOutput;
import com.volcengine.tos.model.object.AbortMultipartUploadInput;
import com.volcengine.tos.model.object.AbortMultipartUploadOutput;
import com.volcengine.tos.model.object.AppendObjectInput;
import com.volcengine.tos.model.object.AppendObjectOutput;
import com.volcengine.tos.model.object.CompleteMultipartUploadInput;
import com.volcengine.tos.model.object.CompleteMultipartUploadOutput;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Output;
import com.volcengine.tos.model.object.CopyObjectOutput;
import com.volcengine.tos.model.object.CopyObjectV2Input;
import com.volcengine.tos.model.object.CopyObjectV2Output;
import com.volcengine.tos.model.object.CreateMultipartUploadInput;
import com.volcengine.tos.model.object.CreateMultipartUploadOutput;
import com.volcengine.tos.model.object.DeleteMultiObjectsInput;
import com.volcengine.tos.model.object.DeleteMultiObjectsOutput;
import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input;
import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output;
import com.volcengine.tos.model.object.DeleteObjectInput;
import com.volcengine.tos.model.object.DeleteObjectOutput;
import com.volcengine.tos.model.object.DeleteObjectTaggingInput;
import com.volcengine.tos.model.object.DeleteObjectTaggingOutput;
import com.volcengine.tos.model.object.DownloadFileInput;
import com.volcengine.tos.model.object.DownloadFileOutput;
import com.volcengine.tos.model.object.FetchObjectInput;
import com.volcengine.tos.model.object.FetchObjectOutput;
import com.volcengine.tos.model.object.GetFetchTaskInput;
import com.volcengine.tos.model.object.GetFetchTaskOutput;
import com.volcengine.tos.model.object.GetFileStatusInput;
import com.volcengine.tos.model.object.GetFileStatusOutput;
import com.volcengine.tos.model.object.GetObjectACLV2Input;
import com.volcengine.tos.model.object.GetObjectACLV2Output;
import com.volcengine.tos.model.object.GetObjectOutput;
import com.volcengine.tos.model.object.GetObjectTaggingInput;
import com.volcengine.tos.model.object.GetObjectTaggingOutput;
import com.volcengine.tos.model.object.GetObjectToFileInput;
import com.volcengine.tos.model.object.GetObjectToFileOutput;
import com.volcengine.tos.model.object.GetObjectV2Input;
import com.volcengine.tos.model.object.GetObjectV2Output;
import com.volcengine.tos.model.object.GetSymlinkInput;
import com.volcengine.tos.model.object.GetSymlinkOutput;
import com.volcengine.tos.model.object.HeadObjectOutput;
import com.volcengine.tos.model.object.HeadObjectV2Input;
import com.volcengine.tos.model.object.HeadObjectV2Output;
import com.volcengine.tos.model.object.ListMultipartUploadsInput;
import com.volcengine.tos.model.object.ListMultipartUploadsOutput;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Input;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Output;
import com.volcengine.tos.model.object.ListObjectVersionsInput;
import com.volcengine.tos.model.object.ListObjectVersionsOutput;
import com.volcengine.tos.model.object.ListObjectVersionsV2Input;
import com.volcengine.tos.model.object.ListObjectVersionsV2Output;
import com.volcengine.tos.model.object.ListObjectsInput;
import com.volcengine.tos.model.object.ListObjectsOutput;
import com.volcengine.tos.model.object.ListObjectsType2Input;
import com.volcengine.tos.model.object.ListObjectsType2Output;
import com.volcengine.tos.model.object.ListObjectsV2Input;
import com.volcengine.tos.model.object.ListObjectsV2Output;
import com.volcengine.tos.model.object.ListPartsInput;
import com.volcengine.tos.model.object.ListPartsOutput;
import com.volcengine.tos.model.object.ListUploadedPartsInput;
import com.volcengine.tos.model.object.ListUploadedPartsOutput;
import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
import com.volcengine.tos.model.object.PreSignedPolicyURLInput;
import com.volcengine.tos.model.object.PreSignedPolicyURLOutput;
import com.volcengine.tos.model.object.PreSignedPostSignatureInput;
import com.volcengine.tos.model.object.PreSignedPostSignatureOutput;
import com.volcengine.tos.model.object.PreSignedURLInput;
import com.volcengine.tos.model.object.PreSignedURLOutput;
import com.volcengine.tos.model.object.PreSingedPolicyURLInput;
import com.volcengine.tos.model.object.PreSingedPolicyURLOutput;
import com.volcengine.tos.model.object.PutFetchTaskInput;
import com.volcengine.tos.model.object.PutFetchTaskOutput;
import com.volcengine.tos.model.object.PutObjectACLInput;
import com.volcengine.tos.model.object.PutObjectACLOutput;
import com.volcengine.tos.model.object.PutObjectFromFileInput;
import com.volcengine.tos.model.object.PutObjectFromFileOutput;
import com.volcengine.tos.model.object.PutObjectInput;
import com.volcengine.tos.model.object.PutObjectOutput;
import com.volcengine.tos.model.object.PutObjectTaggingInput;
import com.volcengine.tos.model.object.PutObjectTaggingOutput;
import com.volcengine.tos.model.object.PutSymlinkInput;
import com.volcengine.tos.model.object.PutSymlinkOutput;
import com.volcengine.tos.model.object.RenameObjectInput;
import com.volcengine.tos.model.object.RenameObjectOutput;
import com.volcengine.tos.model.object.RestoreObjectInput;
import com.volcengine.tos.model.object.RestoreObjectOutput;
import com.volcengine.tos.model.object.ResumableCopyObjectInput;
import com.volcengine.tos.model.object.ResumableCopyObjectOutput;
import com.volcengine.tos.model.object.SetObjectMetaInput;
import com.volcengine.tos.model.object.SetObjectMetaOutput;
import com.volcengine.tos.model.object.SetObjectTimeInput;
import com.volcengine.tos.model.object.SetObjectTimeOutput;
import com.volcengine.tos.model.object.UploadFileInput;
import com.volcengine.tos.model.object.UploadFileOutput;
import com.volcengine.tos.model.object.UploadFileV2Input;
import com.volcengine.tos.model.object.UploadFileV2Output;
import com.volcengine.tos.model.object.UploadPartCopyInput;
import com.volcengine.tos.model.object.UploadPartCopyOutput;
import com.volcengine.tos.model.object.UploadPartCopyV2Input;
import com.volcengine.tos.model.object.UploadPartCopyV2Output;
import com.volcengine.tos.model.object.UploadPartFromFileInput;
import com.volcengine.tos.model.object.UploadPartFromFileOutput;
import com.volcengine.tos.model.object.UploadPartInput;
import com.volcengine.tos.model.object.UploadPartOutput;
import com.volcengine.tos.model.object.UploadPartV2Input;
import com.volcengine.tos.model.object.UploadPartV2Output;
import com.volcengine.tos.transport.TransportConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.tosfs.object.InputStreamProvider;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.fs.tosfs.util.RetryableUtils;
import org.apache.hadoop.thirdparty.com.google.common.base.Throwables;
import org.apache.hadoop.thirdparty.com.google.common.io.CountingInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import javax.net.ssl.SSLException;
public class DelegationClient implements TOSV2 {
private static final Logger LOG = LoggerFactory.getLogger(DelegationClient.class);
private final Credentials provider;
private final TOSClientConfiguration config;
private int maxRetryTimes;
private TOSV2 client;
private volatile Credential usedCredential;
private final List<String> nonRetryable409ErrorCodes;
protected DelegationClient(TOSClientConfiguration configuration, int maxRetryTimes,
List<String> nonRetryable409ErrorCodes) {
this.config = configuration;
this.maxRetryTimes = maxRetryTimes;
this.provider = configuration.getCredentials();
this.usedCredential = provider.credential();
this.client = new TOSV2ClientBuilder().build(configuration);
this.nonRetryable409ErrorCodes = nonRetryable409ErrorCodes;
}
@VisibleForTesting
void setClient(TOSV2 client) {
this.client = client;
}
public TOSV2 client() {
return client;
}
@VisibleForTesting
void setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
}
public int maxRetryTimes() {
return maxRetryTimes;
}
public TOSClientConfiguration config() {
return config;
}
public Credential usedCredential() {
return usedCredential;
}
@Override
public CreateBucketV2Output createBucket(String bucket) throws TosException {
return retry(() -> client.createBucket(bucket));
}
@Override
public CreateBucketV2Output createBucket(CreateBucketV2Input input) throws TosException {
return retry(() -> client.createBucket(input));
}
@Override
public HeadBucketV2Output headBucket(HeadBucketV2Input input) throws TosException {
return retry(() -> client.headBucket(input));
}
@Override
public DeleteBucketOutput deleteBucket(DeleteBucketInput input) throws TosException {
return retry(() -> client.deleteBucket(input));
}
@Override
public ListBucketsV2Output listBuckets(ListBucketsV2Input input) throws TosException {
return retry(() -> client.listBuckets(input));
}
@Override
public CreateBucketOutput createBucket(CreateBucketInput input) throws TosException {
return retry(() -> client.createBucket(input));
}
@Override
public HeadBucketOutput headBucket(String bucket) throws TosException {
return retry(() -> client.headBucket(bucket));
}
@Override
public DeleteBucketOutput deleteBucket(String bucket) throws TosException {
return retry(() -> client.deleteBucket(bucket));
}
@Override
public ListBucketsOutput listBuckets(ListBucketsInput input) throws TosException {
return retry(() -> client.listBuckets(input));
}
@Override
public PutBucketPolicyOutput putBucketPolicy(String bucket, String policy) throws TosException {
return retry(() -> client.putBucketPolicy(bucket, policy));
}
@Override
public PutBucketPolicyOutput putBucketPolicy(PutBucketPolicyInput input) throws TosException {
return retry(() -> client.putBucketPolicy(input));
}
@Override
public GetBucketPolicyOutput getBucketPolicy(String bucket) throws TosException {
return retry(() -> client.getBucketPolicy(bucket));
}
@Override
public GetBucketPolicyOutput getBucketPolicy(GetBucketPolicyInput input) throws TosException {
return retry(() -> client.getBucketPolicy(input));
}
@Override
public DeleteBucketPolicyOutput deleteBucketPolicy(String bucket) throws TosException {
return retry(() -> client.deleteBucketPolicy(bucket));
}
@Override
public GetObjectOutput getObject(String bucket, String objectKey,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.getObject(bucket, objectKey, builders));
}
@Override
public HeadObjectOutput headObject(String bucket, String objectKey,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.headObject(bucket, objectKey, builders));
}
@Override
public DeleteObjectOutput deleteObject(String bucket, String objectKey,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.deleteObject(bucket, objectKey, builders));
}
@Override
public DeleteMultiObjectsOutput deleteMultiObjects(
String bucket,
DeleteMultiObjectsInput input,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.deleteMultiObjects(bucket, input, builders));
}
@Override
public PutObjectOutput putObject(
String bucket, String objectKey, InputStream inputStream,
RequestOptionsBuilder... builders)
throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public UploadFileOutput uploadFile(
String bucket, UploadFileInput input,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.uploadFile(bucket, input, builders));
}
@Override
public AppendObjectOutput appendObject(
String bucket, String objectKey, InputStream content, long offset,
RequestOptionsBuilder... builders)
throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public SetObjectMetaOutput setObjectMeta(String bucket, String objectKey,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.setObjectMeta(bucket, objectKey, builders));
}
@Override
public ListObjectsOutput listObjects(String bucket, ListObjectsInput input) throws TosException {
return retry(() -> client.listObjects(bucket, input));
}
@Override
public ListObjectVersionsOutput listObjectVersions(String bucket, ListObjectVersionsInput input)
throws TosException {
return retry(() -> client.listObjectVersions(bucket, input));
}
@Override
public CopyObjectOutput copyObject(
String bucket, String srcObjectKey, String dstObjectKey,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.copyObject(bucket, srcObjectKey, dstObjectKey, builders));
}
@Override
public CopyObjectOutput copyObjectTo(
String bucket, String dstBucket, String dstObjectKey,
String srcObjectKey,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() ->
client.copyObjectTo(bucket, dstBucket, dstObjectKey, srcObjectKey, builders));
}
@Override
public CopyObjectOutput copyObjectFrom(
String bucket, String srcBucket, String srcObjectKey, String dstObjectKey,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() ->
client.copyObjectFrom(bucket, srcBucket, srcObjectKey, dstObjectKey, builders));
}
@Override
public UploadPartCopyOutput uploadPartCopy(
String bucket, UploadPartCopyInput input,
RequestOptionsBuilder... builders) throws TosException {
return retry(() -> client.uploadPartCopy(bucket, input, builders));
}
@Override
public PutObjectAclOutput putObjectAcl(String bucket, PutObjectAclInput input)
throws TosException {
return retry(() -> client.putObjectAcl(bucket, input));
}
@Override
public GetObjectAclOutput getObjectAcl(
String bucket, String objectKey,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.getObjectAcl(bucket, objectKey, builders));
}
@Override
public CreateMultipartUploadOutput createMultipartUpload(
String bucket, String objectKey,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.createMultipartUpload(bucket, objectKey, builders));
}
@Override
public UploadPartOutput uploadPart(
String bucket, UploadPartInput input,
RequestOptionsBuilder... builders)
throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public CompleteMultipartUploadOutput completeMultipartUpload(
String bucket,
CompleteMultipartUploadInput input)
throws TosException {
return retry(() -> client.completeMultipartUpload(bucket, input));
}
@Override
public AbortMultipartUploadOutput abortMultipartUpload(
String bucket,
AbortMultipartUploadInput input)
throws TosException {
return retry(() -> client.abortMultipartUpload(bucket, input));
}
@Override
public ListUploadedPartsOutput listUploadedParts(
String bucket,
ListUploadedPartsInput input,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.listUploadedParts(bucket, input, builders));
}
@Override
public ListMultipartUploadsOutput listMultipartUploads(
String bucket,
ListMultipartUploadsInput input)
throws TosException {
return retry(() -> client.listMultipartUploads(bucket, input));
}
@Override
public String preSignedURL(
String httpMethod, String bucket, String objectKey, Duration ttl,
RequestOptionsBuilder... builders)
throws TosException {
return retry(() -> client.preSignedURL(httpMethod, bucket, objectKey, ttl, builders));
}
@Override
public DeleteBucketPolicyOutput deleteBucketPolicy(DeleteBucketPolicyInput input)
throws TosException {
return retry(() -> client.deleteBucketPolicy(input));
}
@Override
public PutBucketCORSOutput putBucketCORS(PutBucketCORSInput input)
throws TosException {
return retry(() -> client.putBucketCORS(input));
}
@Override
public GetBucketCORSOutput getBucketCORS(GetBucketCORSInput input)
throws TosException {
return retry(() -> client.getBucketCORS(input));
}
@Override
public DeleteBucketCORSOutput deleteBucketCORS(DeleteBucketCORSInput input)
throws TosException {
return retry(() -> client.deleteBucketCORS(input));
}
@Override
public PutBucketStorageClassOutput putBucketStorageClass(PutBucketStorageClassInput input)
throws TosException {
return retry(() -> client.putBucketStorageClass(input));
}
@Override
public GetBucketLocationOutput getBucketLocation(GetBucketLocationInput input)
throws TosException {
return retry(() -> client.getBucketLocation(input));
}
@Override
public PutBucketLifecycleOutput putBucketLifecycle(PutBucketLifecycleInput input)
throws TosException {
return retry(() -> client.putBucketLifecycle(input));
}
@Override
public GetBucketLifecycleOutput getBucketLifecycle(GetBucketLifecycleInput input)
throws TosException {
return retry(() -> client.getBucketLifecycle(input));
}
@Override
public DeleteBucketLifecycleOutput deleteBucketLifecycle(DeleteBucketLifecycleInput input)
throws TosException {
return retry(() -> client.deleteBucketLifecycle(input));
}
@Override
public PutBucketMirrorBackOutput putBucketMirrorBack(PutBucketMirrorBackInput input)
throws TosException {
return retry(() -> client.putBucketMirrorBack(input));
}
@Override
public GetBucketMirrorBackOutput getBucketMirrorBack(GetBucketMirrorBackInput input)
throws TosException {
return retry(() -> client.getBucketMirrorBack(input));
}
@Override
public DeleteBucketMirrorBackOutput deleteBucketMirrorBack(DeleteBucketMirrorBackInput input)
throws TosException {
return retry(() -> client.deleteBucketMirrorBack(input));
}
@Override
public PutBucketReplicationOutput putBucketReplication(PutBucketReplicationInput input)
throws TosException {
return retry(() -> client.putBucketReplication(input));
}
@Override
public GetBucketReplicationOutput getBucketReplication(GetBucketReplicationInput input)
throws TosException {
return retry(() -> client.getBucketReplication(input));
}
@Override
public DeleteBucketReplicationOutput deleteBucketReplication(DeleteBucketReplicationInput input)
throws TosException {
return retry(() -> client.deleteBucketReplication(input));
}
@Override
public PutBucketVersioningOutput putBucketVersioning(PutBucketVersioningInput input)
throws TosException {
return retry(() -> client.putBucketVersioning(input));
}
@Override
public GetBucketVersioningOutput getBucketVersioning(GetBucketVersioningInput input)
throws TosException {
return retry(() -> client.getBucketVersioning(input));
}
@Override
public PutBucketWebsiteOutput putBucketWebsite(PutBucketWebsiteInput input)
throws TosException {
return retry(() -> client.putBucketWebsite(input));
}
@Override
public GetBucketWebsiteOutput getBucketWebsite(GetBucketWebsiteInput input)
throws TosException {
return retry(() -> client.getBucketWebsite(input));
}
@Override
public DeleteBucketWebsiteOutput deleteBucketWebsite(DeleteBucketWebsiteInput input)
throws TosException {
return retry(() -> client.deleteBucketWebsite(input));
}
@Override
public PutBucketNotificationOutput putBucketNotification(PutBucketNotificationInput input)
throws TosException {
return retry(() -> client.putBucketNotification(input));
}
@Override
public GetBucketNotificationOutput getBucketNotification(GetBucketNotificationInput input)
throws TosException {
return retry(() -> client.getBucketNotification(input));
}
@Override
public PutBucketNotificationType2Output putBucketNotificationType2(
PutBucketNotificationType2Input input) throws TosException {
return retry(() -> client.putBucketNotificationType2(input));
}
@Override
public GetBucketNotificationType2Output getBucketNotificationType2(
GetBucketNotificationType2Input input) throws TosException {
return retry(() -> client.getBucketNotificationType2(input));
}
@Override
public PutBucketCustomDomainOutput putBucketCustomDomain(PutBucketCustomDomainInput input)
throws TosException {
return retry(() -> client.putBucketCustomDomain(input));
}
@Override
public ListBucketCustomDomainOutput listBucketCustomDomain(ListBucketCustomDomainInput input)
throws TosException {
return retry(() -> client.listBucketCustomDomain(input));
}
@Override
public DeleteBucketCustomDomainOutput deleteBucketCustomDomain(
DeleteBucketCustomDomainInput input) throws TosException {
return retry(() -> client.deleteBucketCustomDomain(input));
}
@Override
public PutBucketRealTimeLogOutput putBucketRealTimeLog(PutBucketRealTimeLogInput input)
throws TosException {
return retry(() -> client.putBucketRealTimeLog(input));
}
@Override
public GetBucketRealTimeLogOutput getBucketRealTimeLog(GetBucketRealTimeLogInput input)
throws TosException {
return retry(() -> client.getBucketRealTimeLog(input));
}
@Override
public DeleteBucketRealTimeLogOutput deleteBucketRealTimeLog(DeleteBucketRealTimeLogInput input)
throws TosException {
return retry(() -> deleteBucketRealTimeLog(input));
}
@Override
public PutBucketACLOutput putBucketACL(PutBucketACLInput input) throws TosException {
return retry(() -> client.putBucketACL(input));
}
@Override
public GetBucketACLOutput getBucketACL(GetBucketACLInput input) throws TosException {
return retry(() -> client.getBucketACL(input));
}
@Override
public PutBucketRenameOutput putBucketRename(PutBucketRenameInput input) throws TosException {
return retry(() -> client.putBucketRename(input));
}
@Override
public GetBucketRenameOutput getBucketRename(GetBucketRenameInput input) throws TosException {
return retry(() -> client.getBucketRename(input));
}
@Override
public DeleteBucketRenameOutput deleteBucketRename(DeleteBucketRenameInput input)
throws TosException {
return retry(() -> client.deleteBucketRename(input));
}
@Override
public PutBucketEncryptionOutput putBucketEncryption(PutBucketEncryptionInput input)
throws TosException {
return retry(() -> client.putBucketEncryption(input));
}
@Override
public GetBucketEncryptionOutput getBucketEncryption(GetBucketEncryptionInput input)
throws TosException {
return retry(() -> client.getBucketEncryption(input));
}
@Override
public DeleteBucketEncryptionOutput deleteBucketEncryption(DeleteBucketEncryptionInput input)
throws TosException {
return retry(() -> client.deleteBucketEncryption(input));
}
@Override
public PutBucketTaggingOutput putBucketTagging(PutBucketTaggingInput input) throws TosException {
return retry(() -> client.putBucketTagging(input));
}
@Override
public GetBucketTaggingOutput getBucketTagging(GetBucketTaggingInput input) throws TosException {
return retry(() -> client.getBucketTagging(input));
}
@Override
public DeleteBucketTaggingOutput deleteBucketTagging(DeleteBucketTaggingInput input)
throws TosException {
return retry(() -> client.deleteBucketTagging(input));
}
@Override
public PutBucketInventoryOutput putBucketInventory(PutBucketInventoryInput input)
throws TosException {
return retry(() -> client.putBucketInventory(input));
}
@Override
public GetBucketInventoryOutput getBucketInventory(GetBucketInventoryInput input)
throws TosException {
return retry(() -> client.getBucketInventory(input));
}
@Override
public ListBucketInventoryOutput listBucketInventory(ListBucketInventoryInput input)
throws TosException {
return retry(() -> client.listBucketInventory(input));
}
@Override
public DeleteBucketInventoryOutput deleteBucketInventory(DeleteBucketInventoryInput input)
throws TosException {
return retry(() -> client.deleteBucketInventory(input));
}
@Override
public GetObjectV2Output getObject(GetObjectV2Input input) throws TosException {
return retry(() -> client.getObject(input));
}
@Override
public GetObjectToFileOutput getObjectToFile(GetObjectToFileInput input) throws TosException {
return retry(() -> client.getObjectToFile(input));
}
@Override
public GetFileStatusOutput getFileStatus(GetFileStatusInput input) throws TosException {
return retry(() -> client.getFileStatus(input));
}
@Override
public UploadFileV2Output uploadFile(UploadFileV2Input input) throws TosException {
return retry(() -> client.uploadFile(input));
}
@Override
public DownloadFileOutput downloadFile(DownloadFileInput input) throws TosException {
return retry(() -> client.downloadFile(input));
}
@Override
public ResumableCopyObjectOutput resumableCopyObject(ResumableCopyObjectInput input)
throws TosException {
return retry(() -> client.resumableCopyObject(input));
}
@Override
public HeadObjectV2Output headObject(HeadObjectV2Input input) throws TosException {
return retry(() -> client.headObject(input));
}
@Override
public DeleteObjectOutput deleteObject(DeleteObjectInput input) throws TosException {
return retry(() -> client.deleteObject(input));
}
@Override
public DeleteMultiObjectsV2Output deleteMultiObjects(DeleteMultiObjectsV2Input input)
throws TosException {
return retry(() -> client.deleteMultiObjects(input));
}
public PutObjectOutput put(
String bucket, String key, InputStreamProvider streamProvider,
long contentLength, ACLType aclType) {
return retry(() -> client.putObject(
newPutObjectRequest(bucket, key, streamProvider, contentLength, aclType)));
}
private PutObjectInput newPutObjectRequest(
String bucket,
String key,
InputStreamProvider streamProvider,
long contentLength,
ACLType aclType) {
return PutObjectInput.builder()
.bucket(bucket)
.key(key)
.content(streamProvider.newStream())
.contentLength(contentLength)
.options(new ObjectMetaRequestOptions()
.setAclType(aclType))
.build();
}
public AppendObjectOutput appendObject(String bucket, String key,
InputStreamProvider streamProvider, long offset, long contentLength, String originalCrc64,
ACLType aclType) {
// originalCrc64 is needed when appending data to object. It should be the object's crc64
// checksum if the object exists, and null if the object doesn't exist.
return retry(() -> client.appendObject(
newAppendObjectRequest(bucket, key, streamProvider, offset, contentLength, originalCrc64,
aclType)));
}
private AppendObjectInput newAppendObjectRequest(
String bucket,
String key,
InputStreamProvider streamProvider,
long offset,
long contentLength,
String preCrc64ecma,
ACLType aclType) {
return AppendObjectInput.builder()
.bucket(bucket)
.key(key)
.content(streamProvider.newStream())
.offset(offset)
.contentLength(contentLength)
.preHashCrc64ecma(preCrc64ecma)
.options(new ObjectMetaRequestOptions()
.setAclType(aclType))
.build();
}
@Override
public PutObjectOutput putObject(PutObjectInput input) throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public PutObjectFromFileOutput putObjectFromFile(PutObjectFromFileInput input)
throws TosException {
return retry(() -> client.putObjectFromFile(input));
}
@Override
public AppendObjectOutput appendObject(AppendObjectInput input)
throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public SetObjectMetaOutput setObjectMeta(SetObjectMetaInput input)
throws TosException {
return retry(() -> client.setObjectMeta(input));
}
@Override
public SetObjectTimeOutput setObjectTime(SetObjectTimeInput input) throws TosException {
return retry(() -> client.setObjectTime(input));
}
@Override
public ListObjectsV2Output listObjects(ListObjectsV2Input input)
throws TosException {
return retry(() -> client.listObjects(input));
}
@Override
public ListObjectsType2Output listObjectsType2(ListObjectsType2Input input)
throws TosException {
return retry(() -> client.listObjectsType2(input));
}
@Override
public ListObjectVersionsV2Output listObjectVersions(ListObjectVersionsV2Input input)
throws TosException {
return retry(() -> client.listObjectVersions(input));
}
@Override
public CopyObjectV2Output copyObject(CopyObjectV2Input input)
throws TosException {
return retry(() -> client.copyObject(input));
}
@Override
public UploadPartCopyV2Output uploadPartCopy(UploadPartCopyV2Input input)
throws TosException {
return retry(() -> client.uploadPartCopy(input));
}
@Override
public PutObjectACLOutput putObjectAcl(PutObjectACLInput input)
throws TosException {
return retry(() -> client.putObjectAcl(input));
}
@Override
public GetObjectACLV2Output getObjectAcl(GetObjectACLV2Input input)
throws TosException {
return retry(() -> client.getObjectAcl(input));
}
@Override
public PutObjectTaggingOutput putObjectTagging(PutObjectTaggingInput input)
throws TosException {
return retry(() -> client.putObjectTagging(input));
}
@Override
public GetObjectTaggingOutput getObjectTagging(GetObjectTaggingInput input)
throws TosException {
return retry(() -> client.getObjectTagging(input));
}
@Override
public DeleteObjectTaggingOutput deleteObjectTagging(DeleteObjectTaggingInput input)
throws TosException {
return retry(() -> client.deleteObjectTagging(input));
}
@Override
public FetchObjectOutput fetchObject(FetchObjectInput input) throws TosException {
return retry(() -> client.fetchObject(input));
}
@Override
public PutFetchTaskOutput putFetchTask(PutFetchTaskInput input) throws TosException {
return retry(() -> client.putFetchTask(input));
}
@Override
public GetFetchTaskOutput getFetchTask(GetFetchTaskInput input) throws TosException {
return retry(() -> client.getFetchTask(input));
}
@Override
public CreateMultipartUploadOutput createMultipartUpload(CreateMultipartUploadInput input)
throws TosException {
return retry(() -> client.createMultipartUpload(input));
}
public Part uploadPart(
String bucket,
String key,
String uploadId,
int partNum,
InputStreamProvider streamProvider,
long contentLength,
ACLType aclType) {
return retry(() -> {
InputStream in = streamProvider.newStream();
CountingInputStream countedIn = new CountingInputStream(in);
UploadPartV2Input request = UploadPartV2Input.builder()
.bucket(bucket)
.key(key)
.partNumber(partNum)
.uploadID(uploadId)
.content(countedIn)
.contentLength(contentLength)
.options(new ObjectMetaRequestOptions()
.setAclType(aclType))
.build();
UploadPartV2Output output = client.uploadPart(request);
return new Part(output.getPartNumber(), countedIn.getCount(), output.getEtag());
});
}
@Override
public UploadPartV2Output uploadPart(UploadPartV2Input input) throws TosException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public UploadPartFromFileOutput uploadPartFromFile(UploadPartFromFileInput input)
throws TosException {
return retry(() -> client.uploadPartFromFile(input));
}
@Override
public CompleteMultipartUploadV2Output completeMultipartUpload(
CompleteMultipartUploadV2Input input) throws TosException {
return retry(() -> client.completeMultipartUpload(input));
}
@Override
public AbortMultipartUploadOutput abortMultipartUpload(AbortMultipartUploadInput input)
throws TosException {
return retry(() -> client.abortMultipartUpload(input));
}
@Override
public ListPartsOutput listParts(ListPartsInput input) throws TosException {
return retry(() -> client.listParts(input));
}
@Override
public ListMultipartUploadsV2Output listMultipartUploads(ListMultipartUploadsV2Input input)
throws TosException {
return retry(() -> client.listMultipartUploads(input));
}
@Override
public RenameObjectOutput renameObject(RenameObjectInput input) throws TosException {
return retry(() -> client.renameObject(input));
}
@Override
public RestoreObjectOutput restoreObject(RestoreObjectInput input) throws TosException {
return retry(() -> client.restoreObject(input));
}
@Override
public PutSymlinkOutput putSymlink(PutSymlinkInput input) throws TosException {
return retry(() -> client.putSymlink(input));
}
@Override
public GetSymlinkOutput getSymlink(GetSymlinkInput input) throws TosException {
return retry(() -> client.getSymlink(input));
}
@Override
public PreSignedURLOutput preSignedURL(PreSignedURLInput input) throws TosException {
return retry(() -> client.preSignedURL(input));
}
@Override
public PreSignedPostSignatureOutput preSignedPostSignature(PreSignedPostSignatureInput input)
throws TosException {
return retry(() -> client.preSignedPostSignature(input));
}
@Override
public PreSingedPolicyURLOutput preSingedPolicyURL(PreSingedPolicyURLInput input)
throws TosException {
return retry(() -> client.preSingedPolicyURL(input));
}
@Override
public PreSignedPolicyURLOutput preSignedPolicyURL(PreSignedPolicyURLInput input)
throws TosException {
return retry(() -> client.preSignedPolicyURL(input));
}
@Override
public void changeCredentials(Credentials credentials) {
retry(() -> {
client.changeCredentials(credentials);
return null;
});
}
@Override
public void changeRegionAndEndpoint(String region, String endpoint) {
retry(() -> {
client.changeRegionAndEndpoint(region, endpoint);
return null;
});
}
@Override
public void changeTransportConfig(TransportConfig conf) {
retry(() -> {
client.changeTransportConfig(conf);
return null;
});
}
@Override
public boolean refreshEndpointRegion(String s, String s1) {
return retry(() -> refreshEndpointRegion(s, s1));
}
@Override
public boolean refreshCredentials(String s, String s1, String s2) {
return retry(() -> refreshCredentials(s, s1, s2));
}
@Override
public void close() throws IOException {
client.close();
}
private void refresh() throws TosException {
Credential credential = provider.credential();
if (credentialIsChanged(credential)) {
synchronized (this) {
if (credentialIsChanged(credential)) {
client.changeCredentials(provider);
usedCredential = credential;
}
}
}
}
private boolean credentialIsChanged(Credential credential) {
return !Objects.equals(credential.getAccessKeyId(), usedCredential.getAccessKeyId())
|| !Objects.equals(credential.getAccessKeySecret(), usedCredential.getAccessKeySecret())
|| !Objects.equals(credential.getSecurityToken(), usedCredential.getSecurityToken());
}
private <T> T retry(Callable<T> callable) {
int attempt = 0;
while (true) {
attempt++;
try {
refresh();
return callable.call();
} catch (TosException e) {
if (attempt >= maxRetryTimes) {
LOG.error("Retry exhausted after {} times.", maxRetryTimes);
throw e;
}
if (isRetryableException(e, nonRetryable409ErrorCodes)) {
LOG.warn("Retry TOS request in the {} times, error: {}", attempt,
Throwables.getRootCause(e).getMessage());
try {
// last time does not need to sleep
Thread.sleep(RetryableUtils.backoff(attempt));
} catch (InterruptedException ex) {
throw new TosClientException("tos: request interrupted.", ex);
}
} else {
throw e;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@VisibleForTesting
static boolean isRetryableException(TosException e, List<String> nonRetryable409ErrorCodes) {
return e.getStatusCode() >= HttpStatus.INTERNAL_SERVER_ERROR
|| e.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS
|| e.getCause() instanceof SocketException
|| e.getCause() instanceof UnknownHostException
|| e.getCause() instanceof SSLException
|| e.getCause() instanceof SocketTimeoutException
|| e.getCause() instanceof InterruptedException
|| isRetryableTosClientException(e)
|| isRetryableTosServerException(e, nonRetryable409ErrorCodes);
}
private static boolean isRetryableTosClientException(TosException e) {
return e instanceof TosClientException
&& e.getCause() instanceof IOException
&& !(e.getCause() instanceof EOFException);
}
private static boolean isRetryableTosServerException(TosException e,
List<String> nonRetryable409ErrorCodes) {
return e instanceof TosServerException
&& e.getStatusCode() == HttpStatus.CONFLICT
&& isRetryableTosConflictException((TosServerException) e, nonRetryable409ErrorCodes);
}
private static boolean isRetryableTosConflictException(TosServerException e,
List<String> nonRetryableCodes) {
String errorCode = e.getEc();
return StringUtils.isEmpty(errorCode) || !nonRetryableCodes.contains(errorCode);
}
}