ObjectStorage.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
import org.apache.hadoop.fs.tosfs.util.LazyReload;
import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public interface ObjectStorage extends Closeable {
  String EMPTY_DELIMITER = "";

  /**
   * @return Scheme of the object storage.
   */
  String scheme();

  /**
   * @return null if bucket doesn't exist.
   */
  BucketInfo bucket();

  /**
   * Initialize the Object storage, according to the properties.
   *
   * @param conf   to initialize the {@link ObjectStorage}
   * @param bucket the corresponding bucket name, each object store has one bucket.
   */
  void initialize(Configuration conf, String bucket);

  /**
   * @return storage conf
   */
  Configuration conf();

  default ObjectContent get(String key) {
    return get(key, 0, -1);
  }

  /**
   * Get the data for the given object specified by key.
   * Throw {@link RuntimeException} if object key doesn't exist.
   * Throw {@link RuntimeException} if object key is null or empty.
   *
   * @param key     the object key.
   * @param offset  the offset to start read.
   * @param limit   the max length to read.
   * @return {@link InputStream} to read the object content.
   */
  ObjectContent get(String key, long offset, long limit);

  default byte[] put(String key, byte[] data) {
    return put(key, data, 0, data.length);
  }

  default byte[] put(String key, byte[] data, int off, int len) {
    return put(key, () -> new ByteArrayInputStream(data, off, len), len);
  }

  /**
   * Put data read from a reader to an object specified by key. The implementation must ensure to
   * close the stream created by stream provider after finishing stream operation.
   * Throw {@link RuntimeException} if object key is null or empty.
   *
   * @param key            for the object.
   * @param streamProvider the binary input stream provider that create input stream to write.
   * @param contentLength  the content length, if the actual data is bigger than content length, the
   *                       object can be created, but the object data will be truncated to the given
   *                       content length, if the actual data is smaller than content length, will
   *                       create object failed with unexpect end of IOException.
   * @return the checksum of uploaded object
   */
  byte[] put(String key, InputStreamProvider streamProvider, long contentLength);

  default byte[] append(String key, byte[] data) {
    return append(key, data, 0, data.length);
  }

  default byte[] append(String key, byte[] data, int off, int len) {
    return append(key, () -> new ByteArrayInputStream(data, off, len), len);
  }

  /**
   * Append data read from a reader to an object specified by key. If the object exists, data will
   * be appended to the tail. Otherwise, the object will be created and data will be written to it.
   * Content length could be zero if object exists. If the object doesn't exist and content length
   * is zero, a {@link NotAppendableException} will be thrown.
   * <p>
   * The first one wins if there are concurrent appends.
   * <p>
   * The implementation must ensure to close the stream created by stream provider after finishing
   * stream operation.
   * Throw {@link RuntimeException} if object key is null or empty.
   *
   * @param key            for the object.
   * @param streamProvider the binary input stream provider that create input stream to write.
   * @param contentLength  the appended content length. If the actual appended data is bigger than
   *                       content length, the object can be appended but the data to append will be
   *                       truncated to the given content length. If the actual data is smaller than
   *                       content length, append object will fail with unexpect end IOException.
   * @return the checksum of appended object.
   * @throws NotAppendableException if the object already exists and is not appendable, or the
   *                                object doesn't exist and content length is zero.
   */
  byte[] append(String key, InputStreamProvider streamProvider, long contentLength);

  /**
   * Delete an object.
   * No exception thrown if the object key doesn't exist.
   * Throw {@link RuntimeException} if object key is null or empty.
   *
   * @param key the given object key to be deleted.
   */
  void delete(String key);

  /**
   * Delete multiple keys. If one key doesn't exist, it will be treated as delete succeed, won't be
   * included in response list.
   *
   * @param keys the given object keys to be deleted
   * @return the keys delete failed
   */
  List<String> batchDelete(List<String> keys);

  /**
   * Delete all objects with the given prefix(include the prefix if the corresponding object
   * exists).
   *
   * @param prefix the prefix key.
   */
  void deleteAll(String prefix);

  /**
   * Head returns some information about the object or a null if not found.
   * Throw {@link RuntimeException} if object key is null or empty.
   * There are some differences between directory bucket and general purpose bucket:
   * <ul>
   *   <li>Assume an file object 'a/b' exists, only head("a/b") will get the meta of object 'a/b'
   *   for both general purpose bucket and directory bucket</li>
   *   <li>Assume an dir object 'a/b/' exists, regarding general purpose bucket, only head("a/b/")
   *   will get the meta of object 'a/b/', but for directory bucket, both head("a/b") and
   *   head("a/b/") will get the meta of object 'a/b/'</li>
   * </ul>
   *
   * @param key for the specified object.
   * @return {@link ObjectInfo}, null if the object does not exist.
   * @throws InvalidObjectKeyException if the object is locating under an existing file in directory
   *                                   bucket, which is not allowed.
   */
  ObjectInfo head(String key);

  /**
   * List objects according to the given {@link ListObjectsRequest}.
   *
   * @param request {@link ListObjectsRequest}
   * @return the iterable of {@link ListObjectsResponse} which contains objects and common prefixes
   */
  Iterable<ListObjectsResponse> list(ListObjectsRequest request);

  /**
   * List limited objects in a given bucket.
   *
   * @param prefix     Limits the response to keys that begin with the specified prefix.
   * @param startAfter StartAfter is where you want the object storage to start listing from.
   *                   object storage starts listing after this specified key.
   *                   StartAfter can be any key in the bucket.
   * @param limit      Limit the maximum number of response objects.
   * @return {@link ObjectInfo} the object list with matched prefix key
   */
  default Iterable<ObjectInfo> list(String prefix, String startAfter, int limit) {
    ListObjectsRequest request = ListObjectsRequest.builder()
        .prefix(prefix)
        .startAfter(startAfter)
        .maxKeys(limit)
        .delimiter(EMPTY_DELIMITER)
        .build();

    return new LazyReload<>(() -> {
      Iterator<ListObjectsResponse> iterator = list(request).iterator();
      return buf -> {
        if (!iterator.hasNext()) {
          return true;
        }
        buf.addAll(iterator.next().objects());

        return !iterator.hasNext();
      };
    });
  }

  /**
   * List all objects in a given bucket.
   *
   * @param prefix     Limits the response to keys that begin with the specified prefix.
   * @param startAfter StartAfter is where you want the object storage to start listing from.
   *                   object storage starts listing after this specified key.
   *                   StartAfter can be any key in the bucket.
   * @return {@link ObjectInfo} Iterable to iterate over the objects with matched prefix key
   *                            and StartAfter
   */
  default Iterable<ObjectInfo> listAll(String prefix, String startAfter) {
    return list(prefix, startAfter, -1);
  }

  /**
   * CreateMultipartUpload starts to upload a large object part by part.
   *
   * @param key for the specified object.
   * @return {@link MultipartUpload}.
   */
  MultipartUpload createMultipartUpload(String key);

  /**
   * UploadPart upload a part of an object. The implementation must ensure to close the stream
   * created by stream provider after finishing stream operation.
   *
   * @param key            for the specified object.
   * @param uploadId       for the multipart upload id.
   * @param partNum        upload part number.
   * @param streamProvider the stream provider to provider part stream
   * @param contentLength  the content length, if the actual data is bigger than content length, the
   *                       object can be created, but the object data will be truncated to the given
   *                       content length, if the actual data is smaller than content length, will
   *                       create object failed with unexpect end of IOException.
   * @return the uploaded part.
   */
  Part uploadPart(String key, String uploadId, int partNum, InputStreamProvider streamProvider,
      long contentLength);

  /**
   * Complete the multipart uploads with given object key and upload id.
   *
   * @param key         for the specified object.
   * @param uploadId    id of the multipart upload.
   * @param uploadParts parts to upload.
   * @return the checksum of uploaded object
   */
  byte[] completeUpload(String key, String uploadId, List<Part> uploadParts);

  /**
   * Abort a multipart upload.
   *
   * @param key      object key.
   * @param uploadId multipart upload Id.
   */
  void abortMultipartUpload(String key, String uploadId);

  /**
   * List multipart uploads under a path.
   *
   * @param prefix for uploads to abort.
   * @return Iterable to iterate over multipart unloads.
   */
  Iterable<MultipartUpload> listUploads(String prefix);

  /**
   * upload part copy with mutipart upload id.
   *
   * @param srcKey               source object key
   * @param dstKey               dest object key
   * @param uploadId             id of the multipart upload copy
   * @param partNum              part num of the multipart upload copy
   * @param copySourceRangeStart copy source range start of source object
   * @param copySourceRangeEnd   copy source range end of source object
   * @return {@link Part}.
   */
  Part uploadPartCopy(
      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
      long copySourceRangeEnd);

  /**
   * Copy binary content from one object to another object.
   *
   * @param srcKey source object key
   * @param dstKey dest object key
   */
  void copy(String srcKey, String dstKey);

  /**
   * Atomic rename source object to dest object without any data copying.
   * Will overwrite dest object if dest object exists.
   *
   * @param srcKey source object key
   * @param dstKey dest object key
   * @throws RuntimeException if rename failed���e.g. srcKey is equal to dstKey or the source object
   *                          doesn't exist.
   */
  void rename(String srcKey, String dstKey);

  /**
   * Attach tags to specified object. This method will overwrite all existed tags with the new tags.
   * Remove all existed tags if the new tags are empty. The maximum tags number is 10.
   *
   * @param key     the key of the object key.
   * @param newTags the new tags to put.
   * @throws RuntimeException if key doesn't exist.
   */
  default void putTags(String key, Map<String, String> newTags) {
    throw new UnsupportedOperationException(
        this.getClass().getSimpleName() + " doesn't support putObjectTagging.");
  }

  /**
   * Get all attached tags of the object.
   *
   * @param key the key of the object.
   * @return map containing all tags.
   * @throws RuntimeException if key doesn't exist.
   */
  default Map<String, String> getTags(String key) {
    throw new UnsupportedOperationException(
        this.getClass().getSimpleName() + " doesn't support getObjectTagging.");
  }

  /**
   * Gets the object status for the given key.
   * It's different from {@link ObjectStorage#head(String)}, it returns object info if the key
   * exists or the prefix with value key exists.
   * <p>
   * There are three kinds of implementations:
   * <ul>
   *   <li>Uses the headObject API if the object storage support directory bucket and the requested
   *   bucket is a directory bucket, the object storage will return object directly if the file or
   *   dir exists, otherwise return null</li>
   *   <li>Uses getFileStatus API if the object storage support it, e.g. TOS. The object storage
   *   will return the object directly if the key or prefix exists, otherwise return null.</li>
   *   <li>If the object storage doesn't support above all cases, you have to try to headObject(key)
   *   at first, if the object doesn't exist, and then headObject(key + "/") later if the key
   *   doesn't end with '/', and if neither the new key doesn't exist, and then use listObjects API
   *   to check whether the prefix/key exist.</li>
   * </ul>
   *
   * @param key the object
   * @return object info if the key or prefix exists, otherwise return null.
   * @throws InvalidObjectKeyException if the object is locating under an existing file in directory
   *                                   bucket, which is not allowed.
   */
  ObjectInfo objectStatus(String key);

  /**
   * Get the object storage checksum information, including checksum algorithm name,
   * checksum type, etc.
   *
   * @return checksum information of this storage.
   */
  ChecksumInfo checksumInfo();
}