TestApacheHttpClientFallback.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;

import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;


public class TestApacheHttpClientFallback extends AbstractAbfsTestWithTimeout {

  public TestApacheHttpClientFallback() throws Exception {
    super();
  }

  private TracingContext getSampleTracingContext(int[] jdkCallsRegister,
      int[] apacheCallsRegister) {
    String correlationId, fsId;
    TracingHeaderFormat format;
    correlationId = "test-corr-id";
    fsId = "test-filesystem-id";
    format = TracingHeaderFormat.ALL_ID_FORMAT;
    TracingContext tc = Mockito.spy(new TracingContext(correlationId, fsId,
        FSOperationType.TEST_OP, true, format, null));
    Mockito.doAnswer(answer -> {
          answer.callRealMethod();
          AbfsHttpOperation op = answer.getArgument(0);
          if (op instanceof AbfsAHCHttpOperation) {
            Assertions.assertThat(tc.getHeader()).endsWith(APACHE_IMPL);
            apacheCallsRegister[0]++;
          }
          if (op instanceof AbfsJdkHttpOperation) {
            jdkCallsRegister[0]++;
            if (AbfsApacheHttpClient.usable()) {
              Assertions.assertThat(tc.getHeader()).endsWith(JDK_IMPL);
            } else {
              Assertions.assertThat(tc.getHeader()).endsWith(JDK_FALLBACK);
            }
          }
          return null;
        })
        .when(tc)
        .constructHeader(Mockito.any(AbfsHttpOperation.class),
            Mockito.nullable(String.class), Mockito.nullable(String.class));
    return tc;
  }

  @Test
  public void testMultipleFailureLeadToFallback()
      throws Exception {
    int[] apacheCallsTest1 = {0};
    int[] jdkCallsTest1 = {0};
    TracingContext tcTest1 = getSampleTracingContext(jdkCallsTest1,
        apacheCallsTest1);
    int[] retryIterationTest1 = {0};
    intercept(IOException.class, () -> {
      getMockRestOperation(retryIterationTest1).execute(tcTest1);
    });
    Assertions.assertThat(apacheCallsTest1[0])
        .isEqualTo(DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES);
    Assertions.assertThat(jdkCallsTest1[0]).isEqualTo(1);

    int[] retryIteration1 = {0};
    int[] apacheCallsTest2 = {0};
    int[] jdkCallsTest2 = {0};
    TracingContext tcTest2 = getSampleTracingContext(jdkCallsTest2,
        apacheCallsTest2);
    intercept(IOException.class, () -> {
      getMockRestOperation(retryIteration1).execute(tcTest2);
    });
    Assertions.assertThat(apacheCallsTest2[0]).isEqualTo(0);
    Assertions.assertThat(jdkCallsTest2[0])
        .isEqualTo(DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES + 1);
  }

  private AbfsRestOperation getMockRestOperation(int[] retryIteration)
      throws IOException {
    AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
    Mockito.doReturn(APACHE_HTTP_CLIENT)
        .when(configuration)
        .getPreferredHttpOperationType();
    Mockito.doReturn(DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
        .when(configuration)
        .getMaxApacheHttpClientIoExceptionsRetries();
    AbfsClient client = Mockito.mock(AbfsClient.class);
    Mockito.doReturn(Mockito.mock(ExponentialRetryPolicy.class))
        .when(client)
        .getExponentialRetryPolicy();

    AbfsRetryPolicy retryPolicy = Mockito.mock(AbfsRetryPolicy.class);
    Mockito.doReturn(retryPolicy)
        .when(client)
        .getRetryPolicy(Mockito.nullable(String.class));

    Mockito.doAnswer(answer -> {
          if (retryIteration[0]
              < DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES) {
            retryIteration[0]++;
            return true;
          } else {
            return false;
          }
        })
        .when(retryPolicy)
        .shouldRetry(Mockito.anyInt(), Mockito.nullable(Integer.class));

    AbfsThrottlingIntercept abfsThrottlingIntercept = Mockito.mock(
        AbfsThrottlingIntercept.class);
    Mockito.doNothing()
        .when(abfsThrottlingIntercept)
        .updateMetrics(Mockito.any(AbfsRestOperationType.class),
            Mockito.any(AbfsHttpOperation.class));
    Mockito.doNothing()
        .when(abfsThrottlingIntercept)
        .sendingRequest(Mockito.any(AbfsRestOperationType.class),
            Mockito.nullable(AbfsCounters.class));
    Mockito.doReturn(abfsThrottlingIntercept).when(client).getIntercept();


    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
        AbfsRestOperationType.ReadFile,
        client,
        AbfsHttpConstants.HTTP_METHOD_GET,
        new URL("http://localhost"),
        new ArrayList<>(),
        null,
        configuration
    ));

    Mockito.doReturn(null).when(op).getClientLatency();

    Mockito.doReturn(createApacheHttpOp())
        .when(op)
        .createAbfsHttpOperation();
    Mockito.doReturn(createAhcHttpOp())
        .when(op)
        .createAbfsAHCHttpOperation();

    Mockito.doAnswer(answer -> {
      return answer.getArgument(0);
    }).when(op).createNewTracingContext(Mockito.nullable(TracingContext.class));

    Mockito.doNothing()
        .when(op)
        .signRequest(Mockito.any(AbfsHttpOperation.class), Mockito.anyInt());

    Mockito.doAnswer(answer -> {
      AbfsHttpOperation operation = Mockito.spy(
          (AbfsHttpOperation) answer.callRealMethod());
      Assertions.assertThat(operation).isInstanceOf(
          (retryIteration[0]
              < DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES
              && AbfsApacheHttpClient.usable())
              ? AbfsAHCHttpOperation.class
              : AbfsJdkHttpOperation.class);
      Mockito.doReturn(HTTP_OK).when(operation).getStatusCode();
      Mockito.doThrow(new IOException("Test Exception"))
          .when(operation)
          .processResponse(Mockito.nullable(byte[].class), Mockito.anyInt(),
              Mockito.anyInt());
      Mockito.doCallRealMethod().when(operation).getTracingContextSuffix();
      return operation;
    }).when(op).createHttpOperation();
    return op;
  }

  private AbfsAHCHttpOperation createAhcHttpOp() {
    AbfsAHCHttpOperation ahcOp = Mockito.mock(AbfsAHCHttpOperation.class);
    Mockito.doCallRealMethod().when(ahcOp).getTracingContextSuffix();
    return ahcOp;
  }

  private AbfsJdkHttpOperation createApacheHttpOp() {
    AbfsJdkHttpOperation httpOperationMock = Mockito.mock(AbfsJdkHttpOperation.class);
    Mockito.doCallRealMethod()
        .when(httpOperationMock)
        .getTracingContextSuffix();
    return httpOperationMock;
  }

  @Test
  public void testTcHeaderOnJDKClientUse() {
    int[] jdkCallsRegister = {0};
    int[] apacheCallsRegister = {0};
    TracingContext tc = getSampleTracingContext(jdkCallsRegister,
        apacheCallsRegister);
    AbfsJdkHttpOperation op = Mockito.mock(AbfsJdkHttpOperation.class);
    Mockito.doCallRealMethod().when(op).getTracingContextSuffix();
    tc.constructHeader(op, null, null);
  }
}