TestDelegationClientBuilder.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.tosfs.object.tos;

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.auth.Credential;
import com.volcengine.tos.auth.StaticCredentials;
import com.volcengine.tos.comm.HttpStatus;
import com.volcengine.tos.model.object.DeleteObjectInput;
import com.volcengine.tos.model.object.HeadObjectV2Input;
import com.volcengine.tos.model.object.HeadObjectV2Output;
import com.volcengine.tos.model.object.ListObjectsV2Input;
import com.volcengine.tos.model.object.ListObjectsV2Output;
import com.volcengine.tos.model.object.PutObjectInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.TestEnv;
import org.apache.hadoop.fs.tosfs.common.Tasks;
import org.apache.hadoop.fs.tosfs.common.ThreadPools;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.conf.TosKeys;
import org.apache.hadoop.fs.tosfs.object.tos.auth.EnvironmentCredentialsProvider;
import org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider;
import org.apache.hadoop.fs.tosfs.util.ParseUtils;
import org.apache.hadoop.fs.tosfs.util.TestUtility;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestReporter;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import javax.net.ssl.SSLException;

import static org.apache.hadoop.fs.tosfs.object.tos.DelegationClient.isRetryableException;
import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestDelegationClientBuilder {

  private static final String TEST_KEY = UUIDUtils.random();
  private static final String TEST_DATA = "1234567890";
  private static String envAccessKey;
  private static String envSecretKey;
  private static String envEndpoint;

  // Maximum retry times of the tos http client.
  public static final String MAX_RETRY_COUNT_KEY = "fs.tos.http.maxRetryCount";

  @BeforeAll
  public static void before() {
    assumeTrue(TestEnv.checkTestEnabled());

    envAccessKey =
        ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false);
    envSecretKey =
        ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false);
    envEndpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false);
  }

  @BeforeEach
  public void setUp() {
    TOSV2 tosSdkClientV2 =
        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
            new StaticCredentials(envAccessKey, envSecretKey));
    try (ByteArrayInputStream stream = new ByteArrayInputStream(TEST_DATA.getBytes())) {
      PutObjectInput putObjectInput =
          new PutObjectInput().setBucket(TestUtility.bucket()).setKey(TEST_KEY).setContent(stream);
      tosSdkClientV2.putObject(putObjectInput);
    } catch (IOException e) {
      fail(e.getMessage());
    }
  }

  @Test
  public void testHeadApiRetry() throws IOException {
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME),
        "https://test.tos-cn-beijing.ivolces.com");
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");

    DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build();
    TOSV2 mockClient = mock(TOSV2.class);
    tosV2.setClient(mockClient);
    tosV2.setMaxRetryTimes(5);

    HeadObjectV2Input input = HeadObjectV2Input.builder().bucket("test").build();
    when(tosV2.headObject(input)).thenThrow(
            new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
            new TosServerException(HttpStatus.TOO_MANY_REQUESTS),
            new TosClientException("fake toe", new IOException("fake ioe")),
            new TosException(new SocketException("fake msg")),
            new TosException(new UnknownHostException("fake msg")),
            new TosException(new SSLException("fake msg")),
            new TosException(new InterruptedException("fake msg")),
            new TosException(new InterruptedException("fake msg")))
        .thenReturn(new HeadObjectV2Output());

    RuntimeException exception =
        assertThrows(RuntimeException.class, () -> tosV2.headObject(input));
    assertTrue(exception instanceof TosException);
    assertTrue(exception.getCause() instanceof UnknownHostException);
    verify(tosV2.client(), times(5)).headObject(input);

    HeadObjectV2Input inputOneTime = HeadObjectV2Input.builder().bucket("inputOneTime").build();
    HeadObjectV2Output output = new HeadObjectV2Output();
    when(tosV2.headObject(inputOneTime)).thenReturn(output);
    HeadObjectV2Output headObject = tosV2.headObject(inputOneTime);
    assertEquals(headObject, output);
    verify(tosV2.client(), times(1)).headObject(inputOneTime);
    tosV2.close();

    DelegationClient newClient = new DelegationClientBuilder().bucket("test").conf(conf).build();
    mockClient = mock(TOSV2.class);
    newClient.setClient(mockClient);
    newClient.setMaxRetryTimes(5);
    when(newClient.headObject(input)).thenThrow(
        new TosClientException("fake toe", new EOFException("fake eof")),
        new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
        new TosServerException(HttpStatus.TOO_MANY_REQUESTS)).thenReturn(new HeadObjectV2Output());

    exception = assertThrows(RuntimeException.class, () -> newClient.headObject(input));
    assertTrue(exception instanceof TosClientException);
    assertTrue(exception.getCause() instanceof EOFException);
    verify(newClient.client(), times(1)).headObject(input);
    newClient.close();
  }

  @Test
  public void testEnableCrcCheck(TestInfo testInfo, TestReporter testReporter) throws IOException {
    String bucket = testInfo.getTestMethod().map(method -> method.getName()).orElse("Unknown");
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME),
        "https://test.tos-cn-beijing.ivolces.com");
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY");
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), "SECRET_KEY");

    DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
    assertTrue(tosV2.config().isEnableCrc());

    conf.setBoolean(TosKeys.FS_TOS_CRC_CHECK_ENABLED, false);
    tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
    assertFalse(tosV2.config().isEnableCrc());

    tosV2.close();
  }

  @Test
  public void testClientCache(TestInfo testInfo, TestReporter testReporter) throws IOException {
    String bucket = testInfo.getTestMethod().map(method -> method.getName()).orElse("Unknown");
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME),
        "https://test.tos-cn-beijing.ivolces.com");
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY_A");
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), "SECRET_KEY_A");

    DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
    DelegationClient tosV2Cached = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
    assertEquals(tosV2Cached, tosV2, "client must be load in cache");
    assertEquals("ACCESS_KEY_A", tosV2.usedCredential().getAccessKeyId());
    tosV2Cached.close();

    String newBucket = "new-test-bucket";
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(newBucket), "ACCESS_KEY_B");
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(newBucket), "SECRET_KEY_B");
    DelegationClient changeBucketClient =
        new DelegationClientBuilder().bucket(newBucket).conf(conf).build();
    assertNotEquals(changeBucketClient, tosV2, "client should be created entirely new");
    assertEquals("ACCESS_KEY_B", changeBucketClient.usedCredential().getAccessKeyId());
    changeBucketClient.close();

    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); // disable cache: true
    DelegationClient tosV2NotCached =
        new DelegationClientBuilder().bucket(bucket).conf(conf).build();
    assertNotEquals(tosV2NotCached, tosV2, "client should be created entirely new");
    assertEquals("ACCESS_KEY_A", tosV2NotCached.usedCredential().getAccessKeyId());
    tosV2NotCached.close();

    tosV2.close();
  }

  @Test
  public void testOverwriteHttpConfig() throws IOException {
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME),
        "https://tos-cn-beijing.ivolces.com");
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
    conf.setInt(MAX_RETRY_COUNT_KEY, 24);
    conf.setInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES, 24);
    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);

    DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build();
    assertEquals("ACCESS_KEY", tosV2.usedCredential().getAccessKeyId());
    assertEquals(24, tosV2.config().getTransportConfig().getMaxConnections(),
        "http max connection overwrite to 24 from 1024, must be 24");
    assertEquals(DelegationClientBuilder.DISABLE_TOS_RETRY_VALUE,
        tosV2.config().getTransportConfig().getMaxRetryCount(),
        "tos maxRetryCount disabled, must be -1");
    assertEquals(24, tosV2.maxRetryTimes(), "maxRetryTimes must be 24");
    assertEquals("https://tos-cn-beijing.ivolces.com", tosV2.config().getEndpoint(),
        "endpoint must be equals to https://tos-cn-beijing.ivolces.com");

    tosV2.close();
  }

  @Test
  public void testDynamicRefreshAkSk() throws IOException {
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), envEndpoint);
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), envAccessKey);
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), envSecretKey);
    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
    conf.setInt(MAX_RETRY_COUNT_KEY, 24);

    TOSV2 tosSdkClientV2 =
        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
            new StaticCredentials("a", "b"));
    DelegationClient delegationClientV2 =
        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();

    ListObjectsV2Input inputV2 =
        ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("")
            .maxKeys(10).build();

    assertThrows(TosServerException.class, () -> tosSdkClientV2.listObjects(inputV2));

    tosSdkClientV2.changeCredentials(new StaticCredentials(envAccessKey, envSecretKey));

    ListObjectsV2Output tosSdkOutput = tosSdkClientV2.listObjects(inputV2);
    ListObjectsV2Output delegateOutput = delegationClientV2.listObjects(inputV2);
    int nativeContentSize =
        tosSdkOutput.getContents() == null ? -1 : tosSdkOutput.getContents().size();
    int delegateContentSize =
        delegateOutput.getContents() == null ? -1 : delegateOutput.getContents().size();

    assertEquals(nativeContentSize, delegateContentSize,
        "delegation client must same as native client");
    assertEquals(envAccessKey, delegationClientV2.usedCredential().getAccessKeyId());

    delegationClientV2.close();
  }

  @Test
  public void testCreateClientWithEnvironmentCredentials() throws IOException {
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), envEndpoint);
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, EnvironmentCredentialsProvider.NAME);

    DelegationClient tosV2 =
        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();
    Credential cred = tosV2.usedCredential();

    String assertMsg =
        String.format("expect %s, but got %s", envAccessKey, cred.getAccessKeyId());
    assertEquals(cred.getAccessKeyId(), envAccessKey, assertMsg);
    assertMsg = String.format("expect %s, but got %s", envSecretKey, cred.getAccessKeySecret());
    assertEquals(cred.getAccessKeySecret(), envSecretKey, assertMsg);

    tosV2.close();
  }

  @Test
  public void testCreateClientWithSimpleCredentials() throws IOException {
    Configuration conf = new Configuration();
    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), envEndpoint);
    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), envAccessKey);
    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), envSecretKey);
    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
    conf.setInt(MAX_RETRY_COUNT_KEY, 24);

    ListObjectsV2Input input =
        ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("")
            .maxKeys(10).build();

    TOSV2 v2 = new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
        new StaticCredentials(envAccessKey, envSecretKey));
    ListObjectsV2Output outputV2 = v2.listObjects(input);

    DelegationClient tosV2 =
        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();

    ListObjectsV2Output output = tosV2.listObjects(input);
    assertEquals(outputV2.getContents().size(), output.getContents().size(),
        "delegation client must be same as native client");

    tosV2.close();
  }

  @Test
  public void testCachedConcurrently(TestInfo testInfo, TestReporter testReporter) {
    String bucketName = testInfo.getTestMethod().map(method -> method.getName()).orElse("Unknown");

    Function<String, Configuration> commonConf = bucket -> {
      Configuration conf = new Configuration();
      conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), envEndpoint);
      conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
      conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), envAccessKey);
      conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), envSecretKey);
      return conf;
    };

    // enable cache
    Function<String, Configuration> enableCachedConf = bucket -> {
      Configuration conf = commonConf.apply(bucket);
      conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
      return conf;
    };

    ExecutorService es = ThreadPools.newWorkerPool("testCachedConcurrently", 32);
    int bucketCount = 5;
    int taskCount = 10000;

    AtomicInteger success = new AtomicInteger(0);
    AtomicInteger failure = new AtomicInteger(0);
    Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount)))
        .executeWith(es).run(bucket -> {
          try {
            Configuration conf = enableCachedConf.apply(bucket);
            DelegationClient client =
                new DelegationClientBuilder().bucket(bucket).conf(conf).build();
            client.close();
            success.incrementAndGet();
          } catch (Exception e) {
            failure.incrementAndGet();
          }
        });

    assertEquals(bucketCount, DelegationClientBuilder.CACHE.size());
    assertEquals(taskCount, success.get());
    assertEquals(0, failure.get());

    // clear cache
    DelegationClientBuilder.CACHE.clear();

    // disable cache
    Function<String, Configuration> disableCachedConf = bucket -> {
      Configuration conf = commonConf.apply(bucket);
      conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
      return conf;
    };

    success.set(0);
    failure.set(0);
    Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount)))
        .executeWith(es).run(bucket -> {
          try {
            Configuration conf = disableCachedConf.apply(bucket);
            DelegationClient client =
                new DelegationClientBuilder().bucket(bucket).conf(conf).build();
            client.close();
            success.incrementAndGet();
          } catch (Exception e) {
            failure.incrementAndGet();
          }
        });

    assertTrue(DelegationClientBuilder.CACHE.isEmpty());
    assertEquals(taskCount, success.get());
    assertEquals(0, failure.get());

    es.shutdown();
  }

  @AfterEach
  public void deleteAllTestData() throws IOException {
    TOSV2 tosSdkClientV2 =
        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
            new StaticCredentials(envAccessKey, envSecretKey));
    tosSdkClientV2.deleteObject(
        DeleteObjectInput.builder().bucket(TestUtility.bucket()).key(TEST_KEY).build());

    tosSdkClientV2.close();
    DelegationClientBuilder.CACHE.clear();
  }

  @Test
  public void testRetryableException() {
    assertTrue(retryableException(new TosServerException(500)));
    assertTrue(retryableException(new TosServerException(501)));
    assertTrue(retryableException(new TosServerException(429)));
    assertFalse(retryableException(new TosServerException(404)));

    assertTrue(retryableException(new TosException(new SocketException())));
    assertTrue(retryableException(new TosException(new UnknownHostException())));
    assertTrue(retryableException(new TosException(new SSLException("fake ssl"))));
    assertTrue(retryableException(new TosException(new SocketTimeoutException())));
    assertTrue(retryableException(new TosException(new InterruptedException())));

    assertTrue(retryableException(new TosClientException("fake ioe", new IOException())));
    assertFalse(retryableException(new TosClientException("fake eof", new EOFException())));

    assertTrue(retryableException(new TosServerException(409)));
    assertTrue(
        retryableException(new TosServerException(409).setEc(TOSErrorCodes.PATH_LOCK_CONFLICT)));
    assertFalse(
        retryableException(new TosServerException(409).setEc(TOSErrorCodes.DELETE_NON_EMPTY_DIR)));
    assertFalse(
        retryableException(new TosServerException(409).setEc(TOSErrorCodes.LOCATED_UNDER_A_FILE)));
    assertFalse(retryableException(
        new TosServerException(409).setEc(TOSErrorCodes.COPY_BETWEEN_DIR_AND_FILE)));
    assertFalse(retryableException(
        new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_AN_EXISTED_DIR)));
    assertFalse(
        retryableException(new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_SUB_DIR)));
    assertFalse(retryableException(
        new TosServerException(409).setEc(TOSErrorCodes.RENAME_BETWEEN_DIR_AND_FILE)));
  }

  private boolean retryableException(TosException e) {
    return isRetryableException(e,
        Arrays.asList(TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES.split(",")));
  }
}