ITestAbfsOutputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.MessageDigest;
import java.util.Arrays;

import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.http.HttpResponse;

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * Test create operation.
 */
@RunWith(Parameterized.class)
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {

  private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000;
  private static final String TEST_FILE_PATH = "testfile";
  private static final int TEN = 10;

  @Parameterized.Parameter
  public HttpOperationType httpOperationType;

  @Parameterized.Parameters(name = "{0}")
  public static Iterable<Object[]> params() {
    return Arrays.asList(new Object[][]{
        {HttpOperationType.JDK_HTTP_URL_CONNECTION},
        {HttpOperationType.APACHE_HTTP_CLIENT}
    });
  }


  public ITestAbfsOutputStream() throws Exception {
    super();
  }

  @Override
  public AzureBlobFileSystem getFileSystem(final Configuration configuration)
      throws Exception {
    Configuration conf = new Configuration(configuration);
    conf.set(ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY, httpOperationType.toString());
    return (AzureBlobFileSystem) FileSystem.newInstance(conf);
  }

  @Test
  public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
    Configuration conf = getRawConfiguration();
    final AzureBlobFileSystem fs = getFileSystem(conf);
    try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();

      int maxConcurrentRequests
          = getConfiguration().getWriteMaxConcurrentRequestCount();
      if (stream.isAppendBlobStream()) {
        maxConcurrentRequests = 1;
      }

    Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
        "maxConcurrentRequests should be " + maxConcurrentRequests)
        .isEqualTo(maxConcurrentRequests);
    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
        "maxRequestsToQueue should be " + getConfiguration()
            .getMaxWriteRequestsToQueue())
        .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
    }
  }

  @Test
  public void testMaxRequestsAndQueueCapacity() throws Exception {
    Configuration conf = getRawConfiguration();
    int maxConcurrentRequests = 6;
    int maxRequestsToQueue = 10;
    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
        "" + maxConcurrentRequests);
    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
        "" + maxRequestsToQueue);
    final AzureBlobFileSystem fs = getFileSystem(conf);
    try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
      AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();

      if (stream.isAppendBlobStream()) {
        maxConcurrentRequests = 1;
      }

      Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
          "maxConcurrentRequests should be " + maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
      Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
          .isEqualTo(maxRequestsToQueue);
    }
  }

  /**
   * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream
   * to make sure that the FS instance is not eligible for GC while writing.
   */
  @Test(timeout = TEST_EXECUTION_TIMEOUT)
  public void testAzureBlobFileSystemBackReferenceInOutputStream()
      throws Exception {
    byte[] testBytes = new byte[5 * 1024];
    // Creating an output stream using a FS in a separate method to make the
    // FS instance used eligible for GC. Since when a method is popped from
    // the stack frame, it's variables become anonymous, this creates higher
    // chance of getting Garbage collected.
    try (AbfsOutputStream out = getStream()) {
      // Every 5KB block written is flushed and a GC is hinted, if the
      // executor service is shut down in between, the test should fail
      // indicating premature shutdown while writing.
      for (int i = 0; i < 5; i++) {
        out.write(testBytes);
        out.flush();
        System.gc();
        Assertions.assertThat(
            out.getExecutorService().isShutdown() || out.getExecutorService()
                .isTerminated())
            .describedAs("Executor Service should not be closed before "
                + "OutputStream while writing")
            .isFalse();
        Assertions.assertThat(out.getFsBackRef().isNull())
            .describedAs("BackReference in output stream should not be null")
            .isFalse();
      }
    }
  }

  /**
   * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the
   * FS instance is closed before the stream.
   */
  @Test
  public void testAbfsOutputStreamClosingFsBeforeStream()
      throws Exception {
    AzureBlobFileSystem fs = new AzureBlobFileSystem();
    fs.initialize(new URI(getTestUrl()), new Configuration());
    Path pathFs = path(getMethodName());
    byte[] inputBytes = new byte[5 * 1024];
    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
        pathFs)) {
      out.write(inputBytes);
      fs.close();
      // verify that output stream close after fs.close() would raise a
      // pathIOE containing the path being written to.
      intercept(PathIOException.class, getMethodName(), out::close);
    }
  }

  @Test
  public void testExpect100ContinueFailureInAppend() throws Exception {
    if (!getIsNamespaceEnabled(getFileSystem())) {
      Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
    }
    Configuration configuration = new Configuration(getRawConfiguration());
    configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
    AzureBlobFileSystem fs = getFileSystem(configuration);
    Path path = new Path("/testFile");
    AbfsOutputStream os = Mockito.spy(
        (AbfsOutputStream) fs.create(path).getWrappedStream());
    AzureIngressHandler ingressHandler = Mockito.spy(
        os.getIngressHandler());
    Mockito.doReturn(ingressHandler).when(os).getIngressHandler();

    AbfsClient spiedClient = Mockito.spy(ingressHandler.getClient());
    Mockito.doReturn(spiedClient).when(ingressHandler).getClient();
    AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
    mockSetupForAppend(httpOpForAppendTest, spiedClient);
    Mockito.doReturn(spiedClient).when(os).getClient();
    fs.delete(path, true);
    os.write(1);
    if (spiedClient instanceof AbfsDfsClient) {
      intercept(FileNotFoundException.class, os::close);
    } else {
      IOException ex = intercept(IOException.class, os::close);
      Assertions.assertThat(ex.getCause().getCause()).isInstanceOf(
          AbfsRestOperationException.class);
    }
    Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
        .describedAs("First try from AbfsClient will have expect-100 "
            + "header and should fail with expect-100 error.").isTrue();
    if (httpOpForAppendTest[0] instanceof AbfsJdkHttpOperation) {
      Mockito.verify((AbfsJdkHttpOperation) httpOpForAppendTest[0],
              Mockito.times(0))
          .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
              Mockito.anyInt(), Mockito.anyInt());
    }

    Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
        .describedAs("The retried operation from AbfsClient should not "
            + "fail with expect-100 error. The retried operation does not have"
            + "expect-100 header.").isFalse();
    if (httpOpForAppendTest[1] instanceof AbfsJdkHttpOperation) {
      Mockito.verify((AbfsJdkHttpOperation) httpOpForAppendTest[1],
              Mockito.times(1))
          .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
              Mockito.anyInt(), Mockito.anyInt());
    }
  }

  private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
      final AbfsClient spiedClient) {
    int[] index = new int[1];
    index[0] = 0;
    Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
          AbfsRestOperation op = Mockito.spy(
              (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
          boolean[] isExpectCall = new boolean[1];
          for (AbfsHttpHeader header : op.getRequestHeaders()) {
            if (header.getName().equals(EXPECT)) {
              isExpectCall[0] = true;
            }
          }
          Mockito.doAnswer(createHttpOpInvocation -> {
            httpOpForAppendTest[index[0]] = Mockito.spy(
                (AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
            if (isExpectCall[0]) {
              if (httpOpForAppendTest[index[0]] instanceof AbfsJdkHttpOperation) {
                Mockito.doAnswer(invocation -> {
                      OutputStream os = (OutputStream) invocation.callRealMethod();
                      os.write(1);
                      os.close();
                      throw new ProtocolException(EXPECT_100_JDK_ERROR);
                    })
                    .when((AbfsJdkHttpOperation) httpOpForAppendTest[index[0]])
                    .getConnOutputStream();
              } else {
                Mockito.doAnswer(invocation -> {
                      throw new AbfsApacheHttpExpect100Exception(
                          (HttpResponse) invocation.callRealMethod());
                    })
                    .when((AbfsAHCHttpOperation) httpOpForAppendTest[index[0]])
                    .executeRequest();
              }
            }
            return httpOpForAppendTest[index[0]++];
          }).when(op).createHttpOperation();
          return op;
        })
        .when(spiedClient)
        .getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
            Mockito.anyString(), Mockito.any(
                URL.class), Mockito.anyList(), Mockito.any(byte[].class),
            Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
  }

  /**
   * Separate method to create an outputStream using a local FS instance so
   * that once this method has returned, the FS instance can be eligible for GC.
   *
   * @return AbfsOutputStream used for writing.
   */
  private AbfsOutputStream getStream() throws URISyntaxException, IOException {
    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
    fs1.initialize(new URI(getTestUrl()), new Configuration());
    Path pathFs1 = path(getMethodName() + "1");

    return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
  }

  /**
   * Verify that if getBlockList throws exception append should fail.
   */
  @Test
  public void testValidateGetBlockList() throws Exception {
    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
    Assume.assumeTrue(!getIsNamespaceEnabled(fs));
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    assumeBlobServiceType();

    // Mock the clientHandler to return the blobClient when getBlobClient is called
    AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
    AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

    Mockito.doReturn(clientHandler).when(store).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
    Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

    Mockito.doReturn(store).when(fs).getAbfsStore();
    Path testFilePath = new Path("/testFile");
    AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(testFilePath).getWrappedStream());

    Mockito.doReturn(clientHandler).when(os).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();

    AbfsRestOperationException exception = getMockAbfsRestOperationException(HTTP_CONFLICT);
    // Throw exception when getBlockList is called
    Mockito.doThrow(exception).when(blobClient).getBlockList(Mockito.anyString(), Mockito.any(TracingContext.class));

    // Create a non-empty file
    os.write(TEN);
    os.hsync();
    os.close();

    Mockito.doCallRealMethod().when(store).openFileForWrite(Mockito.any(Path.class), Mockito.any(), Mockito.anyBoolean(), Mockito.any(TracingContext.class));
    intercept(AzureBlobFileSystemException.class, () -> store
        .openFileForWrite(testFilePath, null, false, getTestTracingContext(fs, true)));
  }

  /**
   * Verify that for flush without append no network calls are made for blob endpoint.
   **/
  @Test
  public void testNoNetworkCallsForFlush() throws Exception {
    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
    Assume.assumeTrue(!getIsNamespaceEnabled(fs));
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    assumeBlobServiceType();

    // Mock the clientHandler to return the blobClient when getBlobClient is called
    AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
    AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

    Mockito.doReturn(clientHandler).when(store).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
    Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

    Mockito.doReturn(store).when(fs).getAbfsStore();
    Path testFilePath = new Path("/testFile");
    AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(testFilePath).getWrappedStream());
    AzureIngressHandler ingressHandler = Mockito.spy(os.getIngressHandler());
    Mockito.doReturn(ingressHandler).when(os).getIngressHandler();
    Mockito.doReturn(blobClient).when(ingressHandler).getClient();

    Mockito.doReturn(clientHandler).when(os).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();

    os.hsync();

    Mockito.verify(blobClient, Mockito.times(0))
        .append(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
            Mockito.any(TracingContext.class));
    Mockito.verify(blobClient, Mockito.times(0)).
        flush(Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(),
            Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(),
            Mockito.any(TracingContext.class), Mockito.anyString());
  }

  private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
    return new AbfsRestOperationException(status, "", "", new Exception());
  }

  /**
   * Verify that for flush without append no network calls are made for blob endpoint.
   **/
  @Test
  public void testNoNetworkCallsForSecondFlush() throws Exception {
    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
    Assume.assumeTrue(!getIsNamespaceEnabled(fs));
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    assumeBlobServiceType();
    Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());

    // Step 2: Mock the clientHandler to return the blobClient when getBlobClient is called
    AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
    AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

    Mockito.doReturn(clientHandler).when(store).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
    Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

    Mockito.doReturn(store).when(fs).getAbfsStore();
    Path testFilePath = new Path("/testFile");
    AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(testFilePath).getWrappedStream());
    AzureIngressHandler ingressHandler = Mockito.spy(os.getIngressHandler());
    Mockito.doReturn(ingressHandler).when(os).getIngressHandler();
    Mockito.doReturn(blobClient).when(ingressHandler).getClient();

    Mockito.doReturn(clientHandler).when(os).getClientHandler();
    Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();

    os.write(10);
    os.hsync();
    os.close();

    Mockito.verify(blobClient, Mockito.times(1))
        .append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
                AppendRequestParameters.class), Mockito.any(), Mockito.any(),
            Mockito.any(TracingContext.class));
    Mockito.verify(blobClient, Mockito.times(1)).
        flush(Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(),
            Mockito.any(TracingContext.class), Mockito.anyString());
  }

  /**
   * Tests that the message digest is reset when an exception occurs during remote flush.
   * Simulates a failure in the flush operation and verifies reset is called on MessageDigest.
   */
  @Test
  public void testResetCalledOnExceptionInRemoteFlush() throws Exception {
    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
    Assume.assumeTrue(!getIsNamespaceEnabled(fs));
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    assumeBlobServiceType();
    Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());

    // Create a file and spy on AbfsOutputStream
    Path path = new Path("/testFile");
    AbfsOutputStream realOs = (AbfsOutputStream) fs.create(path).getWrappedStream();
    AbfsOutputStream os = Mockito.spy(realOs);
    AzureIngressHandler ingressHandler = Mockito.spy(os.getIngressHandler());
    Mockito.doReturn(ingressHandler).when(os).getIngressHandler();
    AbfsClient spiedClient = Mockito.spy(ingressHandler.getClient());
    Mockito.doReturn(spiedClient).when(ingressHandler).getClient();
    AzureBlobBlockManager blockManager = Mockito.spy((AzureBlobBlockManager) os.getBlockManager());
    Mockito.doReturn(blockManager).when(ingressHandler).getBlockManager();
    Mockito.doReturn(true).when(blockManager).hasBlocksToCommit();
    Mockito.doReturn("dummy-block-id").when(blockManager).getBlockIdToCommit();

    MessageDigest mockMessageDigest = Mockito.mock(MessageDigest.class);
    Mockito.doReturn(mockMessageDigest).when(os).getFullBlobContentMd5();
    Mockito.doReturn(os).when(ingressHandler).getAbfsOutputStream();
    Mockito.doReturn("dummyMd5").when(ingressHandler).computeFullBlobMd5();

    // Simulating the exception in client flush call
    Mockito.doThrow(
            new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception()))
        .when(spiedClient).flush(
            Mockito.any(byte[].class),
            Mockito.anyString(),
            Mockito.anyBoolean(),
            Mockito.nullable(String.class),
            Mockito.nullable(String.class),
            Mockito.anyString(),
            Mockito.nullable(ContextEncryptionAdapter.class),
            Mockito.any(TracingContext.class), Mockito.nullable(String.class));

    // Triggering the flush to simulate exception
    try {
      ingressHandler.remoteFlush(0, false, false, null,
          getTestTracingContext(fs, true));
    } catch (AzureBlobFileSystemException e) {
      //expected exception
    }
    // Verify that reset was called on the message digest
    Mockito.verify(mockMessageDigest, Mockito.times(1)).reset();
  }
}