ITestAbfsHttpClientRequestExecutor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;

import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.protocol.HttpClientContext;

import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

public class ITestAbfsHttpClientRequestExecutor extends
    AbstractAbfsIntegrationTest {

  public ITestAbfsHttpClientRequestExecutor() throws Exception {
    super();
  }

  /**
   * Verify the correctness of expect 100 continue handling by ApacheHttpClient
   * with AbfsManagedHttpRequestExecutor.
   */
  @Test
  public void testExpect100ContinueHandling() throws Exception {
    AzureBlobFileSystem fs = getFileSystem();
    Path path = new Path("/testExpect100ContinueHandling");
    if (isAppendBlobEnabled()) {
      Assume.assumeFalse("Not valid for AppendBlob with blob endpoint",
          getIngressServiceType() == AbfsServiceType.BLOB);
    }

    Configuration conf = new Configuration(fs.getConf());
    conf.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.toString());
    AzureBlobFileSystem fs2 = Mockito.spy(
        (AzureBlobFileSystem) FileSystem.newInstance(conf));

    AzureBlobFileSystemStore store = Mockito.spy(fs2.getAbfsStore());
    Mockito.doReturn(store).when(fs2).getAbfsStore();

    AbfsClient client = Mockito.spy(store.getClient());
    Mockito.doReturn(client).when(store).getClient();

    final int[] invocation = {0};
    Mockito.doAnswer(answer -> {
      AbfsRestOperation op = Mockito.spy(
          (AbfsRestOperation) answer.callRealMethod());
      final ConnectionInfo connectionInfo = new ConnectionInfo();

      /*
       * Assert that correct actions are taking place over the connection to handle
       * expect100 assertions, failure and success.
       *
       * The test would make two calls to the server. The first two calls would
       * be because of attempt to write in a non-existing file. The first call would have
       * expect100 header, and the server would respond with 404. The second call would
       * be a retry from AbfsOutputStream, and would not have expect100 header.
       *
       * The third call would be because of attempt to write in an existing file. The call
       * would have expect100 assertion pass and would send the data.
       *
       * Following is the expectation from the first attempt:
       * 1. sendHeaders should be called once. This is for expect100 assertion invocation.
       * 2. receiveResponse should be called once. This is to receive expect100 assertion.
       * 2. sendBody should not be called.
       *
       * Following is the expectation from the second attempt:
       * 1. sendHeaders should be called once. This is not for expect100 assertion invocation.
       * 2. sendBody should be called once. It will not have any expect100 assertion.
       * Once headers are sent, body is sent.
       * 3. receiveResponse should be called once. This is to receive the response from the server.
       *
       * Following is the expectation from the third attempt:
       * 1. sendHeaders should be called once. This is for expect100 assertion invocation.
       * 2. receiveResponse should be called. This is to receive the response from the server for expect100 assertion.
       * 3. sendBody called as expect100 assertion is pass.
       * 4. receiveResponse should be called. This is to receive the response from the server.
       */
      mockHttpOperationBehavior(connectionInfo, op);
      Mockito.doAnswer(executeAnswer -> {
        invocation[0]++;
        final Throwable throwable;
        if (invocation[0] == 3) {
          executeAnswer.callRealMethod();
          throwable = null;
        } else {
          throwable = intercept(IOException.class, () -> {
            try {
              executeAnswer.callRealMethod();
            } catch (IOException ex) {
              //This exception is expected to be thrown by the op.execute() method.
              throw ex;
            } catch (Throwable interceptedAssertedThrowable) {
              //Any other throwable thrown by Mockito's callRealMethod would be
              //considered as an assertion error.
            }
          });
        }
        /*
         * The first call would be with expect headers, and expect 100 continue assertion has to happen which would fail.
         * For expect100 assertion to happen, header IO happens before body IO. If assertion fails, no body IO happens.
         * The second call would not be using expect headers.
         *
         * The third call would be with expect headers, and expect 100 continue assertion has to happen which would pass.
         */
        if (invocation[0] == 1) {
          Assertions.assertThat(connectionInfo.getSendHeaderInvocation())
              .isEqualTo(1);
          Assertions.assertThat(connectionInfo.getSendBodyInvocation())
              .isEqualTo(0);
          Assertions.assertThat(connectionInfo.getReceiveResponseInvocation())
              .isEqualTo(1);
          Assertions.assertThat(
                  connectionInfo.getReceiveResponseBodyInvocation())
              .isEqualTo(1);
        }
        if (invocation[0] == 2) {
          Assertions.assertThat(connectionInfo.getSendHeaderInvocation())
              .isEqualTo(1);
          Assertions.assertThat(connectionInfo.getSendBodyInvocation())
              .isEqualTo(1);
          Assertions.assertThat(connectionInfo.getReceiveResponseInvocation())
              .isEqualTo(1);
          Assertions.assertThat(
                  connectionInfo.getReceiveResponseBodyInvocation())
              .isEqualTo(1);
        }
        if (invocation[0] == 3) {
          Assertions.assertThat(connectionInfo.getSendHeaderInvocation())
              .isEqualTo(1);
          Assertions.assertThat(connectionInfo.getSendBodyInvocation())
              .isEqualTo(1);
          Assertions.assertThat(connectionInfo.getReceiveResponseInvocation())
              .isEqualTo(2);
          Assertions.assertThat(
                  connectionInfo.getReceiveResponseBodyInvocation())
              .isEqualTo(1);
        }
        Assertions.assertThat(invocation[0]).isLessThanOrEqualTo(3);
        if (throwable != null) {
          throw throwable;
        }
        return null;
      }).when(op).execute(Mockito.any(TracingContext.class));
      return op;
    }).when(client).getAbfsRestOperation(
        Mockito.any(AbfsRestOperationType.class),
        Mockito.anyString(),
        Mockito.any(URL.class),
        Mockito.anyList(),
        Mockito.any(byte[].class),
        Mockito.anyInt(),
        Mockito.anyInt(),
        Mockito.nullable(String.class));

    final OutputStream os = fs2.create(path);
    fs.delete(path, true);
    AbfsOutputStream innerOs
        = (AbfsOutputStream) ((FSDataOutputStream) os).getWrappedStream();
    if (innerOs.getClientHandler()
        .getIngressClient() instanceof AbfsDfsClient) {
      intercept(FileNotFoundException.class, () -> {
        /*
         * This would lead to two server calls.
         * First call would be with expect headers, and expect 100 continue
         *  assertion has to happen which would fail with 404.
         * Second call would be a retry from AbfsOutputStream, and would not be using expect headers.
         */
        os.write(1);
        os.close();
      });
    } else {
      AbfsRestOperationException ex = intercept(
          AbfsRestOperationException.class, () -> {
            /*
             * This would lead to two server calls.
             * First call would be with expect headers, and expect 100 continue
             *  assertion has to happen which would fail with 404.
             * Second call would be a retry from AbfsOutputStream, and would not be using expect headers.
             */
            try {
              os.write(1);
              os.close();
            } catch (IOException e) {
              throw (IOException) e.getCause().getCause();
            }
          });
      Assertions.assertThat(ex.getStatusCode()).isEqualTo(HTTP_PRECON_FAILED);
    }
  }

  /**
   * Creates a mock of HttpOperation that would be returned for AbfsRestOperation
   * to use to execute server call. To make call via ApacheHttpClient, an object
   * of {@link HttpClientContext} is required. This method would create a mock
   * of HttpClientContext that would be able to register the actions taken on
   * {@link HttpClientConnection} object. This would help in asserting the
   * order of actions taken on the connection object for making an append call with
   * expect100 header.
   */
  private void mockHttpOperationBehavior(final ConnectionInfo connectionInfo,
      final AbfsRestOperation op) throws IOException {
    Mockito.doAnswer(httpOpCreationAnswer -> {
      AbfsAHCHttpOperation httpOperation = Mockito.spy(
          (AbfsAHCHttpOperation) httpOpCreationAnswer.callRealMethod());

      Mockito.doAnswer(createContextAnswer -> {
            AbfsManagedHttpClientContext context = Mockito.spy(
                (AbfsManagedHttpClientContext) createContextAnswer.callRealMethod());
            Mockito.doAnswer(connectionSpyIntercept -> {
              return interceptedConn(connectionInfo,
                  (HttpClientConnection) connectionSpyIntercept.getArgument(0));
            }).when(context).interceptConnectionActivity(Mockito.any(
                HttpClientConnection.class));
            return context;
          })
          .when(httpOperation).getHttpClientContext();
      return httpOperation;
    }).when(op).createHttpOperation();
  }

  private HttpClientConnection interceptedConn(final ConnectionInfo connectionInfo,
      final HttpClientConnection connection) throws IOException, HttpException {
    HttpClientConnection interceptedConn = Mockito.spy(connection);

    Mockito.doAnswer(answer -> {
      connectionInfo.incrementSendHeaderInvocation();
      long start = System.currentTimeMillis();
      Object result = answer.callRealMethod();
      connectionInfo.addSendTime(System.currentTimeMillis() - start);
      return result;
    }).when(interceptedConn).sendRequestHeader(Mockito.any(HttpRequest.class));

    Mockito.doAnswer(answer -> {
      connectionInfo.incrementSendBodyInvocation();
      long start = System.currentTimeMillis();
      Object result = answer.callRealMethod();
      connectionInfo.addSendTime(System.currentTimeMillis() - start);
      return result;
    }).when(interceptedConn).sendRequestEntity(Mockito.any(
        HttpEntityEnclosingRequest.class));

    Mockito.doAnswer(answer -> {
      connectionInfo.incrementReceiveResponseInvocation();
      long start = System.currentTimeMillis();
      Object result = answer.callRealMethod();
      connectionInfo.addReadTime(System.currentTimeMillis() - start);
      return result;
    }).when(interceptedConn).receiveResponseHeader();

    Mockito.doAnswer(answer -> {
      connectionInfo.incrementReceiveResponseBodyInvocation();
      long start = System.currentTimeMillis();
      Object result = answer.callRealMethod();
      connectionInfo.addReadTime(System.currentTimeMillis() - start);
      return result;
    }).when(interceptedConn).receiveResponseEntity(Mockito.any(
        HttpResponse.class));
    return interceptedConn;
  }

  @Test
  public void testConnectionReadRecords() throws Exception {
    AzureBlobFileSystem fs = getFileSystem();
    Path path = new Path("/testConnectionRecords");

    Configuration conf = new Configuration(fs.getConf());
    conf.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.toString());
    AzureBlobFileSystem fs2 = Mockito.spy(
        (AzureBlobFileSystem) FileSystem.newInstance(conf));

    AzureBlobFileSystemStore store = Mockito.spy(fs2.getAbfsStore());
    Mockito.doReturn(store).when(fs2).getAbfsStore();

    AbfsClient client = Mockito.spy(store.getClient());
    Mockito.doReturn(client).when(store).getClient();

    try (OutputStream os = fs.create(path)) {
      os.write(1);
    }

    InputStream is = fs2.open(path);

    Mockito.doAnswer(answer -> {
      AbfsRestOperation op = Mockito.spy(
          (AbfsRestOperation) answer.callRealMethod());
      final ConnectionInfo connectionInfo = new ConnectionInfo();
      mockHttpOperationBehavior(connectionInfo, op);
      Mockito.doAnswer(executeAnswer -> {
        executeAnswer.callRealMethod();
        Assertions.assertThat(connectionInfo.getSendHeaderInvocation())
            .isEqualTo(1);
        Assertions.assertThat(connectionInfo.getSendBodyInvocation())
            .isEqualTo(0);
        Assertions.assertThat(connectionInfo.getReceiveResponseInvocation())
            .isEqualTo(1);
        Assertions.assertThat(connectionInfo.getReceiveResponseBodyInvocation())
            .isEqualTo(1);
        return null;
      }).when(op).execute(Mockito.any(TracingContext.class));
      return op;
    }).when(client).getAbfsRestOperation(
        Mockito.any(AbfsRestOperationType.class),
        Mockito.anyString(),
        Mockito.any(URL.class),
        Mockito.anyList(),
        Mockito.any(byte[].class),
        Mockito.anyInt(),
        Mockito.anyInt(),
        Mockito.nullable(String.class));

    is.read();
    is.close();
  }

  private static class ConnectionInfo {

    private long connectTime;

    private long readTime;

    private long sendTime;

    private int sendHeaderInvocation;

    private int sendBodyInvocation;

    private int receiveResponseInvocation;

    private int receiveResponseBodyInvocation;

    private void incrementSendHeaderInvocation() {
      sendHeaderInvocation++;
    }

    private void incrementSendBodyInvocation() {
      sendBodyInvocation++;
    }

    private void incrementReceiveResponseInvocation() {
      receiveResponseInvocation++;
    }

    private void incrementReceiveResponseBodyInvocation() {
      receiveResponseBodyInvocation++;
    }

    private void addConnectTime(long connectTime) {
      this.connectTime += connectTime;
    }

    private void addReadTime(long readTime) {
      this.readTime += readTime;
    }

    private void addSendTime(long sendTime) {
      this.sendTime += sendTime;
    }

    private long getConnectTime() {
      return connectTime;
    }

    private long getReadTime() {
      return readTime;
    }

    private long getSendTime() {
      return sendTime;
    }

    private int getSendHeaderInvocation() {
      return sendHeaderInvocation;
    }

    private int getSendBodyInvocation() {
      return sendBodyInvocation;
    }

    private int getReceiveResponseInvocation() {
      return receiveResponseInvocation;
    }

    private int getReceiveResponseBodyInvocation() {
      return receiveResponseBodyInvocation;
    }
  }
}