TestTOSObjectStorage.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object.tos;

import com.volcengine.tos.internal.model.CRC64Checksum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.TestEnv;
import org.apache.hadoop.fs.tosfs.common.Bytes;
import org.apache.hadoop.fs.tosfs.conf.TosKeys;
import org.apache.hadoop.fs.tosfs.object.ChecksumType;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.fs.tosfs.util.TestUtility;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.apache.hadoop.util.PureJavaCrc32C;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import java.util.zip.Checksum;

import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class TestTOSObjectStorage {
  public static Stream<Arguments> provideArguments() {
    assumeTrue(TestEnv.checkTestEnabled());

    List<Arguments> values = new ArrayList<>();

    Configuration conf = new Configuration();
    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC64ECMA.name());
    values.add(Arguments.of(
        ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()),
            TOS_SCHEME, TestUtility.bucket(), conf),
        new CRC64Checksum(),
        ChecksumType.CRC64ECMA));

    conf = new Configuration();
    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC32C.name());
    values.add(Arguments.of(
        ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()),
            TOS_SCHEME, TestUtility.bucket(), conf),
        new PureJavaCrc32C(),
        ChecksumType.CRC32C));

    return values.stream();
  }

  private ObjectStorage tos;
  private ChecksumType type;

  private void setEnv(ObjectStorage objectStore, ChecksumType csType) {
    this.tos = objectStore;
    this.type = csType;
  }

  @AfterEach
  public void tearDown() throws Exception {
    CommonUtils.runQuietly(() -> tos.deleteAll(""));
    for (MultipartUpload upload : tos.listUploads("")) {
      tos.abortMultipartUpload(upload.key(), upload.uploadId());
    }
    tos.close();
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testHeadObj(ObjectStorage objectStore, Checksum ckmer, ChecksumType csType) {
    setEnv(objectStore, csType);

    String key = "testPutChecksum";
    byte[] data = TestUtility.rand(1024);
    ckmer.update(data, 0, data.length);
    assertEquals(ckmer.getValue(), parseChecksum(objectStore.put(key, data)));

    ObjectInfo objInfo = objectStore.head(key);
    assertEquals(ckmer.getValue(), parseChecksum(objInfo.checksum()));
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testGetFileStatus(ObjectStorage objectStore, Checksum ckmer, ChecksumType csType) {
    setEnv(objectStore, csType);
    assumeFalse(objectStore.bucket().isDirectory());

    Configuration conf = new Configuration(objectStore.conf());
    conf.setBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED, true);
    objectStore.initialize(conf, objectStore.bucket().name());

    String key = "testFileStatus";
    byte[] data = TestUtility.rand(256);
    byte[] checksum = objectStore.put(key, data);

    ObjectInfo obj1 = objectStore.objectStatus(key);
    assertArrayEquals(checksum, obj1.checksum());
    assertEquals(key, obj1.key());
    assertEquals(obj1, objectStore.head(key));

    ObjectInfo obj2 = objectStore.objectStatus(key + "/");
    assertNull(obj2);

    String dirKey = "testDirStatus/";
    checksum = objectStore.put(dirKey, new byte[0]);

    ObjectInfo obj3 = objectStore.objectStatus("testDirStatus");
    assertArrayEquals(checksum, obj3.checksum());
    assertEquals(dirKey, obj3.key());
    assertEquals(obj3, objectStore.head(dirKey));
    assertNull(objectStore.head("testDirStatus"));
    ObjectInfo obj4 = objectStore.objectStatus(dirKey);
    assertArrayEquals(checksum, obj4.checksum());
    assertEquals(dirKey, obj4.key());
    assertEquals(obj4, objectStore.head(dirKey));

    String prefix = "testPrefix/";
    objectStore.put(prefix + "subfile", data);
    ObjectInfo obj5 = objectStore.objectStatus(prefix);
    assertEquals(prefix, obj5.key());
    assertArrayEquals(Constants.MAGIC_CHECKSUM, obj5.checksum());
    assertNull(objectStore.head(prefix));
    ObjectInfo obj6 = objectStore.objectStatus("testPrefix");
    assertEquals(prefix, obj6.key());
    assertArrayEquals(Constants.MAGIC_CHECKSUM, obj6.checksum());
    assertNull(objectStore.head("testPrefix"));
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testObjectStatus(ObjectStorage objectStore, Checksum checksum, ChecksumType csType) {
    setEnv(objectStore, csType);
    assumeFalse(objectStore.bucket().isDirectory());

    String key = "testObjectStatus";
    byte[] data = TestUtility.rand(1024);
    checksum.update(data, 0, data.length);
    assertEquals(checksum.getValue(), parseChecksum(objectStore.put(key, data)));

    ObjectInfo objInfo = objectStore.objectStatus(key);
    assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum()));

    objInfo = objectStore.head(key);
    assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum()));

    String dir = key + "/";
    objectStore.put(dir, new byte[0]);
    objInfo = objectStore.objectStatus(dir);
    assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum());

    objInfo = objectStore.head(dir);
    assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum());
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testListObjs(ObjectStorage objectStore, Checksum checksum, ChecksumType csType) {
    setEnv(objectStore, csType);

    String key = "testListObjs";
    byte[] data = TestUtility.rand(1024);
    checksum.update(data, 0, data.length);
    for (int i = 0; i < 5; i++) {
      assertEquals(checksum.getValue(), parseChecksum(objectStore.put(key, data)));
    }

    ListObjectsRequest request =
        ListObjectsRequest.builder().prefix(key).startAfter(null).maxKeys(-1).delimiter("/")
            .build();
    Iterator<ListObjectsResponse> iter = objectStore.list(request).iterator();
    while (iter.hasNext()) {
      List<ObjectInfo> objs = iter.next().objects();
      for (ObjectInfo obj : objs) {
        assertEquals(checksum.getValue(), parseChecksum(obj.checksum()));
      }
    }
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testPutChecksum(ObjectStorage objectStore, Checksum checksum, ChecksumType csType) {
    setEnv(objectStore, csType);

    String key = "testPutChecksum";
    byte[] data = TestUtility.rand(1024);
    checksum.update(data, 0, data.length);

    byte[] checksumStr = objectStore.put(key, data);

    assertEquals(checksum.getValue(), parseChecksum(checksumStr));
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testMPUChecksum(ObjectStorage objectStore, Checksum checksum, ChecksumType csType) {
    setEnv(objectStore, csType);

    int partNumber = 2;
    String key = "testMPUChecksum";
    MultipartUpload mpu = objectStore.createMultipartUpload(key);
    byte[] data = TestUtility.rand(mpu.minPartSize() * partNumber);
    checksum.update(data, 0, data.length);

    List<Part> parts = new ArrayList<>();
    for (int i = 0; i < partNumber; i++) {
      final int index = i;
      Part part = objectStore.uploadPart(key, mpu.uploadId(), index + 1,
          () -> new ByteArrayInputStream(data, index * mpu.minPartSize(), mpu.minPartSize()),
          mpu.minPartSize());
      parts.add(part);
    }

    byte[] checksumStr = objectStore.completeUpload(key, mpu.uploadId(), parts);
    assertEquals(checksum.getValue(), parseChecksum(checksumStr));
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testAppendable(ObjectStorage objectStore, Checksum checksum, ChecksumType csType) {
    setEnv(objectStore, csType);
    assumeFalse(objectStore.bucket().isDirectory());

    // Test create object with append then append.
    byte[] data = TestUtility.rand(256);
    String prefix = "a/testAppendable/";
    String key = prefix + "object.txt";
    objectStore.append(key, data);

    objectStore.append(key, new byte[0]);

    // Test create object with put then append.
    data = TestUtility.rand(256);
    objectStore.put(key, data);

    assertThrows(NotAppendableException.class, () -> objectStore.append(key, new byte[0]),
        "Expect not appendable.");

    objectStore.delete(key);
  }

  @ParameterizedTest
  @MethodSource("provideArguments")
  public void testDirectoryBucketAppendable(ObjectStorage objectStore, Checksum checksum,
      ChecksumType csType) {
    setEnv(objectStore, csType);
    assumeTrue(objectStore.bucket().isDirectory());

    byte[] data = TestUtility.rand(256);
    String prefix = "a/testAppendable/";
    String key = prefix + "object.txt";
    objectStore.put(key, data);

    objectStore.append(key, new byte[1024]);

    objectStore.delete(key);
  }

  private long parseChecksum(byte[] checksum) {
    switch (type) {
    case CRC32C:
    case CRC64ECMA:
      return Bytes.toLong(checksum);
    default:
      throw new IllegalArgumentException(
          String.format("Checksum type %s is not supported by TOS.", type.name()));
    }
  }
}