TOS.java

/*
 * ByteDance Volcengine EMR, Copyright 2022.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object.tos;

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TosException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.comm.common.ACLType;
import com.volcengine.tos.comm.common.BucketType;
import com.volcengine.tos.internal.util.TypeConverter;
import com.volcengine.tos.model.bucket.HeadBucketV2Input;
import com.volcengine.tos.model.bucket.HeadBucketV2Output;
import com.volcengine.tos.model.bucket.Tag;
import com.volcengine.tos.model.object.AbortMultipartUploadInput;
import com.volcengine.tos.model.object.AppendObjectOutput;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
import com.volcengine.tos.model.object.CopyObjectV2Input;
import com.volcengine.tos.model.object.CreateMultipartUploadInput;
import com.volcengine.tos.model.object.CreateMultipartUploadOutput;
import com.volcengine.tos.model.object.DeleteError;
import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input;
import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output;
import com.volcengine.tos.model.object.DeleteObjectInput;
import com.volcengine.tos.model.object.DeleteObjectTaggingInput;
import com.volcengine.tos.model.object.GetFileStatusInput;
import com.volcengine.tos.model.object.GetFileStatusOutput;
import com.volcengine.tos.model.object.GetObjectBasicOutput;
import com.volcengine.tos.model.object.GetObjectTaggingInput;
import com.volcengine.tos.model.object.GetObjectTaggingOutput;
import com.volcengine.tos.model.object.GetObjectV2Input;
import com.volcengine.tos.model.object.GetObjectV2Output;
import com.volcengine.tos.model.object.HeadObjectV2Input;
import com.volcengine.tos.model.object.HeadObjectV2Output;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Input;
import com.volcengine.tos.model.object.ListMultipartUploadsV2Output;
import com.volcengine.tos.model.object.ListObjectsType2Input;
import com.volcengine.tos.model.object.ListObjectsType2Output;
import com.volcengine.tos.model.object.ListedCommonPrefix;
import com.volcengine.tos.model.object.ListedObjectV2;
import com.volcengine.tos.model.object.ListedUpload;
import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
import com.volcengine.tos.model.object.ObjectTobeDeleted;
import com.volcengine.tos.model.object.PutObjectOutput;
import com.volcengine.tos.model.object.PutObjectTaggingInput;
import com.volcengine.tos.model.object.RenameObjectInput;
import com.volcengine.tos.model.object.TagSet;
import com.volcengine.tos.model.object.UploadPartCopyV2Input;
import com.volcengine.tos.model.object.UploadPartCopyV2Output;
import com.volcengine.tos.model.object.UploadedPartV2;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.conf.TosKeys;
import org.apache.hadoop.fs.tosfs.object.BucketInfo;
import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
import org.apache.hadoop.fs.tosfs.object.ChecksumType;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
import org.apache.hadoop.fs.tosfs.object.InputStreamProvider;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectConstants;
import org.apache.hadoop.fs.tosfs.object.ObjectContent;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
import org.apache.hadoop.fs.tosfs.util.LazyReload;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.hadoop.fs.tosfs.object.tos.TOSErrorCodes.APPEND_NOT_APPENDABLE;
import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.CHECKSUM_HEADER;
import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.appendable;
import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.crc64ecma;
import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.parseChecksum;

/**
 * {@link TOS} will be initialized by the {@link ObjectStorage#initialize(Configuration, String)}.
 */
public class TOS implements DirectoryStorage {

  private static final Logger LOG = LoggerFactory.getLogger(TOS.class);
  public static final String TOS_SCHEME = "tos";

  public static final String ENV_TOS_ACCESS_KEY_ID = "TOS_ACCESS_KEY_ID";
  public static final String ENV_TOS_SECRET_ACCESS_KEY = "TOS_SECRET_ACCESS_KEY";
  public static final String ENV_TOS_SESSION_TOKEN = "TOS_SESSION_TOKEN";
  public static final String ENV_TOS_ENDPOINT = "TOS_ENDPOINT";

  private static final int NOT_FOUND_CODE = 404;
  private static final int PATH_CONFLICT_CODE = 409;
  private static final int INVALID_RANGE_CODE = 416;

  private static final int MIN_PART_SIZE = 5 * 1024 * 1024;
  private static final int MAX_PART_COUNT = 10000;

  private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(new byte[0]);

  private Configuration conf;
  private String bucket;
  private DelegationClient client;
  private long maxDrainBytes;
  private int batchDeleteMaxRetries;
  private List<String> batchDeleteRetryCodes;
  private long batchDeleteRetryInterval;
  private int maxDeleteObjectsCount;
  private int listObjectsCount;
  // the max retry times during reading object content
  private int maxInputStreamRetries;
  private ACLType defaultAcl;
  private ChecksumInfo checksumInfo;
  private BucketInfo bucketInfo;

  static {
    org.apache.log4j.Logger logger = LogManager.getLogger("com.volcengine.tos");
    String logLevel = System.getProperty("tos.log.level", "WARN");

    LOG.debug("Reset the log level of com.volcengine.tos with {} ", logLevel);
    logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN));
  }

  @Override
  public void initialize(Configuration config, String bucketName) {
    this.conf = config;
    this.bucket = bucketName;
    client = new DelegationClientBuilder().conf(config).bucket(bucketName).build();
    maxDrainBytes =
        config.getLong(TosKeys.FS_TOS_MAX_DRAIN_BYTES, TosKeys.FS_TOS_MAX_DRAIN_BYTES_DEFAULT);
    batchDeleteMaxRetries = config.getInt(TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES,
        TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT);
    batchDeleteRetryCodes = Arrays.asList(
        config.getTrimmedStrings(TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES,
            TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES_DEFAULT));
    batchDeleteRetryInterval = config.getLong(TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL,
        TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL_DEFAULT);
    maxDeleteObjectsCount = config.getInt(TosKeys.FS_TOS_DELETE_OBJECTS_COUNT,
        TosKeys.FS_TOS_DELETE_OBJECTS_COUNT_DEFAULT);
    listObjectsCount =
        config.getInt(TosKeys.FS_TOS_LIST_OBJECTS_COUNT, TosKeys.FS_TOS_LIST_OBJECTS_COUNT_DEFAULT);
    maxInputStreamRetries = config.getInt(TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES,
        TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES_DEFAULT);
    defaultAcl = TypeConverter.convertACLType(config.get(TosKeys.FS_TOS_ACL_DEFAULT));

    String algorithm =
        config.get(TosKeys.FS_TOS_CHECKSUM_ALGORITHM, TosKeys.FS_TOS_CHECKSUM_ALGORITHM_DEFAULT);
    ChecksumType checksumType = ChecksumType.valueOf(
        config.get(TosKeys.FS_TOS_CHECKSUM_TYPE, TosKeys.FS_TOS_CHECKSUM_TYPE_DEFAULT)
            .toUpperCase());
    Preconditions.checkArgument(CHECKSUM_HEADER.containsKey(checksumType),
        "Checksum type %s is not supported by TOS.", checksumType.name());
    checksumInfo = new ChecksumInfo(algorithm, checksumType);

    bucketInfo = getBucketInfo(bucketName);
  }

  @Override
  public String scheme() {
    return TOS_SCHEME;
  }

  @Override
  public Configuration conf() {
    return conf;
  }

  @Override
  public BucketInfo bucket() {
    return bucketInfo;
  }

  private BucketInfo getBucketInfo(String bucketName) {
    try {
      HeadBucketV2Output res =
          client.headBucket(HeadBucketV2Input.builder().bucket(bucketName).build());

      // BUCKET_TYPE_FNS is the general purpose bucket, BUCKET_TYPE_HNS is directory bucket.
      boolean directoryBucket = BucketType.BUCKET_TYPE_HNS.equals(res.getBucketType());

      return new BucketInfo(bucketName, directoryBucket);
    } catch (TosException e) {
      if (e.getStatusCode() == NOT_FOUND_CODE) {
        return null;
      }
      throw new RuntimeException(e);
    }
  }

  @VisibleForTesting
  void setClient(DelegationClient client) {
    this.client = client;
  }

  private void checkAvailableClient() {
    Preconditions.checkState(client != null,
        "Encountered uninitialized ObjectStorage, call initialize(..) please.");
  }

  @Override
  public ObjectContent get(String key, long offset, long limit) {
    checkAvailableClient();
    Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset);

    if (limit == 0) {
      // Can not return empty stream when limit = 0, because the requested object might not exist.
      if (head(key) != null) {
        return new ObjectContent(Constants.MAGIC_CHECKSUM, EMPTY_STREAM);
      } else {
        throw new RuntimeException(String.format("Object %s doesn't exit", key));
      }
    }

    long end = limit < 0 ? -1 : offset + limit - 1;
    GetObjectFactory factory = (k, startOff, endOff) -> getObject(key, startOff, endOff);
    ChainTOSInputStream chainStream =
        new ChainTOSInputStream(factory, key, offset, end, maxDrainBytes, maxInputStreamRetries);
    return new ObjectContent(chainStream.checksum(), chainStream);
  }

  @Override
  public Iterable<ObjectInfo> listDir(String key, boolean recursive) {
    if (recursive) {
      if (bucket().isDirectory()) {
        // The directory bucket only support list object with delimiter = '/', so if we want to
        // list directory recursively, we have to list each dir step by step.
        return bfsListDir(key);
      } else {
        return listAll(key, key);
      }
    } else {
      return innerListDir(key, key, -1);
    }
  }

  private Iterable<ObjectInfo> bfsListDir(String key) {
    return new LazyReload<>(() -> {
      final Deque<String> dirQueue = new LinkedList<>();
      AtomicReference<String> continueToken = new AtomicReference<>("");
      AtomicReference<String> curDir = new AtomicReference<>(key);

      return buf -> {
        // No more objects when isTruncated is false.
        if (curDir.get() == null) {
          return true;
        }

        ListObjectsType2Input request =
            createListObjectsType2Input(curDir.get(), curDir.get(), listObjectsCount, "/",
                continueToken.get());
        ListObjectsType2Output response = client.listObjectsType2(request);

        if (response.getContents() != null) {
          for (ListedObjectV2 obj : response.getContents()) {
            buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(),
                parseChecksum(obj, checksumInfo)));
          }
        }

        if (response.getCommonPrefixes() != null) {
          for (ListedCommonPrefix prefix : response.getCommonPrefixes()) {
            buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM));
            dirQueue.add(prefix.getPrefix());
          }
        }

        if (response.isTruncated()) {
          continueToken.set(response.getNextContinuationToken());
        } else {
          curDir.set(dirQueue.poll());
          continueToken.set("");
        }

        return curDir.get() == null;
      };
    });
  }

  private Iterable<ObjectInfo> innerListDir(String key, String startAfter, int limit) {
    return new LazyReload<>(() -> {
      AtomicReference<String> continueToken = new AtomicReference<>("");
      AtomicBoolean isTruncated = new AtomicBoolean(true);
      AtomicInteger remaining = new AtomicInteger(limit < 0 ? Integer.MAX_VALUE : limit);

      return buf -> {
        // No more objects when isTruncated is false.
        if (!isTruncated.get()) {
          return true;
        }

        int remainingKeys = remaining.get();
        int maxKeys = Math.min(listObjectsCount, remainingKeys);
        ListObjectsType2Input request =
            createListObjectsType2Input(key, startAfter, maxKeys, "/", continueToken.get());
        ListObjectsType2Output response = client.listObjectsType2(request);

        if (response.getContents() != null) {
          for (ListedObjectV2 obj : response.getContents()) {
            buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(),
                parseChecksum(obj, checksumInfo)));
          }
        }

        if (response.getCommonPrefixes() != null) {
          for (ListedCommonPrefix prefix : response.getCommonPrefixes()) {
            buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM));
          }
        }

        isTruncated.set(response.isTruncated());
        remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount());
        continueToken.set(response.getNextContinuationToken());

        return !isTruncated.get();
      };
    });
  }

  @Override
  public void deleteDir(String key, boolean recursive) {
    checkAvailableClient();
    if (recursive) {
      if (conf.getBoolean(TosKeys.FS_TOS_RMR_SERVER_ENABLED,
          TosKeys.FS_FS_TOS_RMR_SERVER_ENABLED_DEFAULT)) {
        DeleteObjectInput request =
            DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build();
        try {
          // It's a test feature, TOS SDK don't expose atomic delete dir capability currently.
          Field f = DeleteObjectInput.class.getDeclaredField("recursiveByServer");
          f.setAccessible(true);
          f.setBoolean(request, true);
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
        client.deleteObject(request);
      } else {
        if (conf.getBoolean(TosKeys.FS_TOS_RMR_CLIENT_ENABLE,
            TosKeys.FS_TOS_RMR_CLIENT_ENABLE_DEFAULT)) {
          client.deleteObject(
              DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build());
        } else {
          recursiveDeleteDir(key);
        }
      }
    } else {
      delete(key);
    }
  }

  @Override
  public boolean isEmptyDir(String key) {
    checkAvailableClient();
    return !innerListDir(key, key, 1).iterator().hasNext();
  }

  public void recursiveDeleteDir(String key) {
    for (ObjectInfo obj : innerListDir(key, key, -1)) {
      if (obj.isDir()) {
        recursiveDeleteDir(obj.key());
      } else {
        delete(obj.key());
      }
    }
    delete(key);
  }

  interface GetObjectFactory {
    /**
     * Get object content for the given object key and range.
     *
     * @param key    The object key
     * @param offset The start offset of object content
     * @param end    The end offset of object content
     * @return {@link GetObjectOutput}
     */
    GetObjectOutput create(String key, long offset, long end);
  }

  public GetObjectOutput getObject(String key, long offset, long end) {
    checkAvailableClient();
    Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset);

    try {
      GetObjectV2Input request = GetObjectV2Input.builder().bucket(bucket).key(key)
          .options(ObjectMetaRequestOptions.builder().range(offset, end).build()).build();
      GetObjectV2Output output = client.getObject(request);

      byte[] checksum = parseChecksum(output.getRequestInfo().getHeader(), checksumInfo);
      return new GetObjectOutput(output, checksum);
    } catch (TosException e) {
      if (e instanceof TosServerException) {
        TosServerException tosException = (TosServerException) e;
        if (tosException.getStatusCode() == INVALID_RANGE_CODE) {
          ObjectInfo info = head(key);
          // if the object is empty or the requested offset is equal to object size,
          // return empty stream directly, otherwise, throw exception.
          if (info.size() == 0 || offset == info.size()) {
            return new GetObjectOutput(
                new GetObjectV2Output(new GetObjectBasicOutput(), EMPTY_STREAM), info.checksum());
          } else {
            throw new RuntimeException(e);
          }
        }
      }
      throw new RuntimeException(e);
    }
  }

  @Override
  public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) {
    checkAvailableClient();
    PutObjectOutput res = client.put(bucket, key, streamProvider, contentLength, defaultAcl);
    return ObjectInfo.isDir(key) ?
        Constants.MAGIC_CHECKSUM :
        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
  }

  @Override
  public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) {
    if (bucketInfo.isDirectory()) {
      return hnsAppend(key, streamProvider, contentLength);
    } else {
      return fnsAppend(key, streamProvider, contentLength);
    }
  }

  private byte[] hnsAppend(String key, InputStreamProvider streamProvider, long contentLength) {
    checkAvailableClient();

    long offset = 0;
    String preCrc64;

    TosObjectInfo obj = innerHead(key);
    if (obj == null) {
      if (contentLength == 0) {
        throw new NotAppendableException(String.format(
            "%s is not appendable because append non-existed object with "
                + "zero byte is not supported.", key));
      }

      // In HNS, append non-existed object is not allowed. Pre-create an empty object before
      // performing appendObject.
      PutObjectOutput res = client.put(bucket, key, () -> EMPTY_STREAM, 0, defaultAcl);
      preCrc64 = res.getHashCrc64ecma();
    } else {
      if (contentLength == 0) {
        return obj.checksum();
      }
      offset = obj.size();
      preCrc64 = obj.crc64ecma();
    }

    AppendObjectOutput res =
        client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64,
            defaultAcl);
    return ObjectInfo.isDir(key) ? Constants.MAGIC_CHECKSUM :
        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
  }

  private byte[] fnsAppend(String key, InputStreamProvider streamProvider, long contentLength) {
    checkAvailableClient();

    TosObjectInfo obj = innerHead(key);
    if (obj != null) {
      if (!obj.appendable()) {
        throw new NotAppendableException(String.format("%s is not appendable.", key));
      }
      if (contentLength == 0) {
        return obj.checksum();
      }
    } else if (contentLength == 0) {
      throw new NotAppendableException(String.format("%s is not appendable because append"
          + " non-existed object with zero byte is not supported.", key));
    }

    long offset = obj == null ? 0 : obj.size();
    String preCrc64 = obj == null ? null : obj.crc64ecma();
    AppendObjectOutput res;
    try {
      res = client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64,
          defaultAcl);
    } catch (TosServerException e) {
      if (e.getStatusCode() == 409 && APPEND_NOT_APPENDABLE.equals(e.getEc())) {
        throw new NotAppendableException(String.format("%s is not appendable.", key));
      }
      throw e;
    }

    return ObjectInfo.isDir(key) ?
        Constants.MAGIC_CHECKSUM :
        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
  }

  @Override
  public void delete(String key) {
    checkAvailableClient();
    client.deleteObject(DeleteObjectInput.builder().bucket(bucket).key(key).build());
  }

  @Override
  public List<String> batchDelete(List<String> keys) {
    checkAvailableClient();
    int totalKeyCnt = keys.size();

    Preconditions.checkArgument(totalKeyCnt <= maxDeleteObjectsCount,
        "The batch delete object count should <= %s", maxDeleteObjectsCount);


    List<DeleteError> failedKeys = innerBatchDelete(keys);
    for (int retry = 1; retry < batchDeleteMaxRetries && !failedKeys.isEmpty(); retry++) {
      if (isBatchDeleteRetryable(failedKeys)) {
        try {
          Thread.sleep(batchDeleteRetryInterval);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }

        failedKeys = innerBatchDelete(deleteErrorKeys(failedKeys));
      } else {
        LOG.warn("{} of {} objects deleted failed, and cannot be retried, detail: {}",
            failedKeys.size(),
            totalKeyCnt,
            Joiner.on(",\n").join(failedKeys));
        break;
      }
    }

    if (!failedKeys.isEmpty()) {
      LOG.warn("{} of {} objects deleted failed after retry {} times.",
          failedKeys.size(), totalKeyCnt, batchDeleteMaxRetries);
    }

    return deleteErrorKeys(failedKeys);
  }

  @Override
  public void deleteAll(String prefix) {
    if (bucket().isDirectory()) {
      deleteDir(prefix, true);
    } else {
      Iterable<ObjectInfo> objects = listAll(prefix, "");
      ObjectUtils.deleteAllObjects(this, objects,
          conf.getInt(ConfKeys.FS_BATCH_DELETE_SIZE.key(scheme()),
              ConfKeys.FS_BATCH_DELETE_SIZE_DEFAULT));
    }
  }

  private List<DeleteError> innerBatchDelete(List<String> keys) {
    List<ObjectTobeDeleted> toBeDeleted = Lists.newArrayList();
    for (String key : keys) {
      toBeDeleted.add(ObjectTobeDeleted.builder().key(key).build());
    }

    DeleteMultiObjectsV2Output deletedRes = client.deleteMultiObjects(DeleteMultiObjectsV2Input
        .builder()
        .bucket(bucket)
        .objects(toBeDeleted)
        .build());

    return deletedRes.getErrors() == null ? Lists.newArrayList() : deletedRes.getErrors();
  }

  private boolean isBatchDeleteRetryable(List<DeleteError> failedKeys) {
    for (DeleteError errorKey : failedKeys) {
      if (batchDeleteRetryCodes.contains(errorKey.getCode())) {
        LOG.warn("Failed to delete object, which might be deleted succeed after retry, detail: {}",
            errorKey);
      } else {
        return false;
      }
    }
    return true;
  }

  private static List<String> deleteErrorKeys(List<DeleteError> errorKeys) {
    List<String> keys = Lists.newArrayList();
    for (DeleteError error : errorKeys) {
      keys.add(error.getKey());
    }
    return keys;
  }

  @Override
  public ObjectInfo head(String key) {
    return innerHead(key);
  }

  private TosObjectInfo innerHead(String key) {
    checkAvailableClient();
    try {
      HeadObjectV2Input request = HeadObjectV2Input.builder().bucket(bucket).key(key).build();
      HeadObjectV2Output response = client.headObject(request);

      // use crc64ecma/crc32c as checksum to compare object contents, don't use eTag as checksum
      // value since PUT & MPU operations have different object etags for same content.
      Map<String, String> headers = response.getRequestInfo().getHeader();
      byte[] checksum = parseChecksum(headers, checksumInfo);
      boolean isDir = bucket().isDirectory() ? response.isDirectory() : ObjectInfo.isDir(key);

      return new TosObjectInfo(key, response.getContentLength(), response.getLastModifiedInDate(),
          checksum, isDir,
          appendable(headers), crc64ecma(headers));
    } catch (TosException e) {
      if (e.getStatusCode() == NOT_FOUND_CODE) {
        return null;
      }

      if (e.getStatusCode() == PATH_CONFLICT_CODE) {
        // if a directory 'a/b/' exists in directory bucket, both headObject('a/b') and
        // headObject('a/b/') will get directory info, but the response key should be 'a/b/'.
        // But if a file 'a/b' exists in directory bucket, only headObject('a/b') will get file
        // info, headObject('a/b/') will get 409 error.
        throw new InvalidObjectKeyException(e);
      }

      throw new RuntimeException(e);
    }
  }

  @Override
  public Iterable<ListObjectsResponse> list(ListObjectsRequest req) {
    return new LazyReload<>(() -> {
      AtomicReference<String> continueToken = new AtomicReference<>("");
      AtomicBoolean isTruncated = new AtomicBoolean(true);
      AtomicInteger remaining =
          new AtomicInteger(req.maxKeys() < 0 ? Integer.MAX_VALUE : req.maxKeys());

      return buf -> {
        // No more objects when isTruncated is false.
        if (!isTruncated.get()) {
          return true;
        }

        int remainingKeys = remaining.get();
        int maxKeys = Math.min(listObjectsCount, remainingKeys);
        ListObjectsType2Input request =
            createListObjectsType2Input(req.prefix(), req.startAfter(), maxKeys, req.delimiter(),
                continueToken.get());
        ListObjectsType2Output response = client.listObjectsType2(request);
        List<ObjectInfo> objects = listObjectsOutputToObjectInfos(response);
        List<String> commonPrefixes = listObjectsOutputToCommonPrefixes(response);
        buf.add(new ListObjectsResponse(objects, commonPrefixes));

        if (maxKeys < listObjectsCount) {
          isTruncated.set(false);
        } else {
          continueToken.set(response.getNextContinuationToken());
          remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount());
          if (remaining.get() == 0) {
            isTruncated.set(false);
          } else {
            isTruncated.set(response.isTruncated());
          }
        }
        return !isTruncated.get();
      };
    });
  }

  private List<String> listObjectsOutputToCommonPrefixes(ListObjectsType2Output listObjectsOutput) {
    if (listObjectsOutput.getCommonPrefixes() == null) {
      return Lists.newArrayList();
    }

    return listObjectsOutput.getCommonPrefixes()
        .stream()
        .map(ListedCommonPrefix::getPrefix)
        .collect(Collectors.toList());
  }

  private List<ObjectInfo> listObjectsOutputToObjectInfos(
      ListObjectsType2Output listObjectsOutput) {
    if (listObjectsOutput.getContents() == null) {
      return Lists.newArrayList();
    }
    return listObjectsOutput.getContents().stream()
        .map(obj -> new ObjectInfo(
            obj.getKey(),
            obj.getSize(),
            obj.getLastModified(),
            parseChecksum(obj, checksumInfo)))
        .collect(Collectors.toList());
  }

  private ListObjectsType2Input createListObjectsType2Input(
      String prefix, String startAfter, int maxKeys, String delimiter, String continueToken) {
    ListObjectsType2Input.ListObjectsType2InputBuilder builder = ListObjectsType2Input.builder()
        .bucket(bucket)
        .prefix(prefix)
        .startAfter(startAfter)
        .delimiter(delimiter)
        .maxKeys(maxKeys);

    if (!Strings.isNullOrEmpty(continueToken)) {
      builder.continuationToken(continueToken);
    }
    return builder.build();
  }

  @Override
  public MultipartUpload createMultipartUpload(String key) {
    checkAvailableClient();
    CreateMultipartUploadInput input = CreateMultipartUploadInput.builder()
        .bucket(bucket)
        .key(key)
        .options(createMetaOptions())
        .build();
    CreateMultipartUploadOutput output = client.createMultipartUpload(input);
    return new MultipartUpload(output.getKey(), output.getUploadID(), MIN_PART_SIZE,
        MAX_PART_COUNT);
  }

  @Override
  public Part uploadPart(
      String key, String uploadId, int partNum,
      InputStreamProvider streamProvider, long contentLength) {
    checkAvailableClient();
    return client.uploadPart(bucket, key, uploadId, partNum, streamProvider, contentLength,
        defaultAcl);
  }

  @Override
  public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) {
    checkAvailableClient();
    List<UploadedPartV2> uploadedPartsV2 = uploadParts.stream().map(
        part -> UploadedPartV2.builder()
            .etag(part.eTag())
            .partNumber(part.num())
            .size(part.size())
            .build()
    ).collect(Collectors.toList());
    CompleteMultipartUploadV2Input input = CompleteMultipartUploadV2Input.builder()
        .bucket(bucket)
        .key(key)
        .uploadID(uploadId)
        .uploadedParts(uploadedPartsV2)
        .build();
    return parseChecksum(client.completeMultipartUpload(input).getRequestInfo().getHeader(),
        checksumInfo);
  }

  @Override
  public void abortMultipartUpload(String key, String uploadId) {
    checkAvailableClient();
    AbortMultipartUploadInput input = AbortMultipartUploadInput.builder()
        .bucket(bucket)
        .key(key)
        .uploadID(uploadId)
        .build();
    client.abortMultipartUpload(input);
  }

  @Override
  public Iterable<MultipartUpload> listUploads(String prefix) {
    checkAvailableClient();
    return new LazyReload<>(() -> {
      AtomicReference<String> nextKeyMarker = new AtomicReference<>("");
      AtomicReference<String> nextUploadIdMarker = new AtomicReference<>("");
      AtomicBoolean isTruncated = new AtomicBoolean(true);
      return buf -> {
        // No more uploads when isTruncated is false.
        if (!isTruncated.get()) {
          return true;
        }
        ListMultipartUploadsV2Input input = ListMultipartUploadsV2Input.builder()
            .bucket(bucket)
            .prefix(prefix)
            .keyMarker(nextKeyMarker.get())
            .uploadIDMarker(nextUploadIdMarker.get())
            .build();
        ListMultipartUploadsV2Output output = client.listMultipartUploads(input);
        isTruncated.set(output.isTruncated());
        if (output.getUploads() != null) {
          // Fill the reloaded uploads into buffer.
          for (ListedUpload upload : output.getUploads()) {
            buf.add(new MultipartUpload(upload.getKey(), upload.getUploadID(),
                ObjectConstants.MIN_PART_SIZE, ObjectConstants.MAX_PART_COUNT));
          }
          LOG.info("Retrieve {} uploads with prefix: {}, marker: {}",
              output.getUploads().size(), nextKeyMarker.get(), nextUploadIdMarker.get());
        }
        // Refresh the nextKeyMarker and nextUploadMarker for the next reload.
        nextKeyMarker.set(output.getNextKeyMarker());
        nextUploadIdMarker.set(output.getNextUploadIdMarker());

        return !isTruncated.get();
      };
    });
  }

  @Override
  public Part uploadPartCopy(
      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
      long copySourceRangeEnd) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(srcKey), "Source key should not be empty.");
    Preconditions.checkArgument(!Strings.isNullOrEmpty(dstKey), "Dest key should not be empty.");
    Preconditions.checkArgument(!Strings.isNullOrEmpty(uploadId), "Upload ID should not be empty.");
    Preconditions.checkArgument(copySourceRangeStart >= 0, "CopySourceRangeStart must be >= 0.");
    Preconditions.checkArgument(copySourceRangeEnd >= 0, "CopySourceRangeEnd must be >= 0.");
    Preconditions.checkNotNull(copySourceRangeEnd >= copySourceRangeStart,
        "CopySourceRangeEnd must be >= copySourceRangeStart.");
    checkAvailableClient();
    UploadPartCopyV2Input input = UploadPartCopyV2Input.builder()
        .bucket(bucket)
        .key(dstKey)
        .uploadID(uploadId)
        .sourceBucket(bucket)
        .sourceKey(srcKey)
        .partNumber(partNum)
        .copySourceRange(copySourceRangeStart, copySourceRangeEnd)
        .options(createMetaOptions())
        .build();
    UploadPartCopyV2Output output = client.uploadPartCopy(input);
    return new Part(output.getPartNumber(), copySourceRangeEnd - copySourceRangeStart + 1,
        output.getEtag());
  }

  @Override
  public void copy(String srcKey, String dstKey) {
    checkAvailableClient();
    CopyObjectV2Input input = CopyObjectV2Input.builder()
        .bucket(bucket)
        .key(dstKey)
        .srcBucket(bucket)
        .srcKey(srcKey)
        .options(createMetaOptions())
        .build();
    client.copyObject(input);
  }

  private ObjectMetaRequestOptions createMetaOptions() {
    return new ObjectMetaRequestOptions().setAclType(defaultAcl);
  }

  @Override
  public void rename(String srcKey, String dstKey) {
    checkAvailableClient();
    Preconditions.checkArgument(!Objects.equals(srcKey, dstKey),
        "Cannot rename to the same object");

    RenameObjectInput request = RenameObjectInput.builder()
        .bucket(bucket)
        .key(srcKey)
        .newKey(dstKey)
        .build();
    client.renameObject(request);
  }

  // TOS allows up to 10 tags. AWS S3 allows up to 10 tags too.
  @Override
  public void putTags(String key, Map<String, String> newTags) {
    checkAvailableClient();
    List<Tag> tags = newTags.entrySet().stream()
        .map(e -> new Tag().setKey(e.getKey()).setValue(e.getValue()))
        .collect(Collectors.toList());

    if (tags.size() > 0) {
      client.putObjectTagging(createPutTagInput(bucket, key, tags));
    } else {
      client.deleteObjectTagging(createDeleteTagInput(bucket, key));
    }
  }

  @Override
  public Map<String, String> getTags(String key) {
    Map<String, String> result = new HashMap<>();
    for (Tag tag : getObjectTaggingList(key)) {
      result.put(tag.getKey(), tag.getValue());
    }
    return result;
  }

  private List<Tag> getObjectTaggingList(String key) {
    checkAvailableClient();

    GetObjectTaggingInput input = GetObjectTaggingInput.builder()
        .bucket(bucket)
        .key(key)
        .build();
    GetObjectTaggingOutput output = client.getObjectTagging(input);

    TagSet tagSet = output.getTagSet();
    if (tagSet == null || tagSet.getTags() == null) {
      return new ArrayList<>();
    }
    return tagSet.getTags();
  }

  private static PutObjectTaggingInput createPutTagInput(String bucket, String key,
      List<Tag> tags) {
    return PutObjectTaggingInput.builder()
        .bucket(bucket)
        .key(key)
        .tagSet(TagSet.builder().tags(tags).build())
        .build();
  }

  private static DeleteObjectTaggingInput createDeleteTagInput(String bucket, String key) {
    return DeleteObjectTaggingInput.builder()
        .bucket(bucket)
        .key(key)
        .build();
  }

  /**
   * Implement Hadoop FileSystem.getFileStatus semantics through
   * {@link TOSV2#getFileStatus(GetFileStatusInput)}. <br>
   *
   * The detail behavior are as follows:
   * <ul>
   *   <li>Assume object 'a/b' exists in TOS, getFileStatus("a/b") will get object('a/b') succeed,
   *   getFileStatus("a/b/") will get 404.</li>
   *   <li>Assume object 'a/b/' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/")
   *   will get object('a/b/') succeed </li>
   *   <li>Assume object 'a/b/c' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/")
   *   will get object('a/b/') succeed.</li>
   * </ul>
   * <p>
   * And the following is the logic of {@link TOSV2#getFileStatus(GetFileStatusInput)}: <br>
   * Step 1: Head the specified key, if the head operation is successful, the response is filled
   * with the actual object. <br>
   * Step 2: Append the key with the suffix '/' to perform list operation, if the list operation is
   * successful, the response is filled with the <strong>first object from the listing results
   * </strong>; if there are no objects, return 404. <br>
   *
   * @param key for the object.
   * @return object
   */
  private ObjectInfo getFileStatus(String key) {
    checkAvailableClient();
    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "key should not be empty.");

    GetFileStatusInput input = GetFileStatusInput.builder()
        .bucket(bucket)
        .key(key)
        .build();
    try {
      GetFileStatusOutput output = client.getFileStatus(input);
      if (key.equals(output.getKey()) && !ObjectInfo.isDir(output.getKey())) {
        return new ObjectInfo(key, output.getSize(), output.getLastModifiedInDate(),
            parseChecksum(output, checksumInfo));
      } else {
        String dirKey = ObjectInfo.isDir(key) ? key : key + '/';

        // If only the prefix exists but dir object key doesn't exist, will use the current date as
        // the modified date.
        Date lastModifiedInDate =
            dirKey.equals(output.getKey()) ? output.getLastModifiedInDate() : new Date();
        return new ObjectInfo(dirKey, 0, lastModifiedInDate, Constants.MAGIC_CHECKSUM, true);
      }
    } catch (TosException e) {
      // the specified object does not exist.
      if (e.getStatusCode() == NOT_FOUND_CODE) {
        return null;
      }

      if (e.getStatusCode() == PATH_CONFLICT_CODE) {
        throw new InvalidObjectKeyException(e);
      }
      throw new RuntimeException(e);
    }
  }

  @Override
  public ObjectInfo objectStatus(String key) {
    if (bucket().isDirectory()) {
      return head(key);
    } else if (conf.getBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED,
        TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED_DEFAULT)) {
      return getFileStatus(key);
    } else {
      ObjectInfo obj = head(key);
      if (obj == null && !ObjectInfo.isDir(key)) {
        key = key + '/';
        obj = head(key);
      }

      if (obj == null) {
        Iterable<ObjectInfo> objs = list(key, null, 1);
        if (objs.iterator().hasNext()) {
          obj = new ObjectInfo(key, 0, new Date(0), Constants.MAGIC_CHECKSUM, true);
        }
      }

      return obj;
    }
  }

  @Override
  public ChecksumInfo checksumInfo() {
    return checksumInfo;
  }

  @Override
  public void close() throws IOException {
    client.close();
  }
}