ITestAbfsOutputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;

/**
 * Test create operation.
 */
@RunWith(Parameterized.class)
public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {

  private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000;
  private static final String TEST_FILE_PATH = "testfile";

  @Parameterized.Parameter
  public HttpOperationType httpOperationType;

  @Parameterized.Parameters(name = "{0}")
  public static Iterable<Object[]> params() {
    return Arrays.asList(new Object[][]{
        {HttpOperationType.JDK_HTTP_URL_CONNECTION},
        {HttpOperationType.APACHE_HTTP_CLIENT}
    });
  }


  public ITestAbfsOutputStream() throws Exception {
    super();
  }

  @Override
  public AzureBlobFileSystem getFileSystem(final Configuration configuration)
      throws Exception {
    Configuration conf = new Configuration(configuration);
    conf.set(ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY, httpOperationType.toString());
    return (AzureBlobFileSystem) FileSystem.newInstance(conf);
  }

  @Test
  public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
    Configuration conf = getRawConfiguration();
    final AzureBlobFileSystem fs = getFileSystem(conf);
    try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();

      int maxConcurrentRequests
          = getConfiguration().getWriteMaxConcurrentRequestCount();
      if (stream.isAppendBlobStream()) {
        maxConcurrentRequests = 1;
      }

    Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
        "maxConcurrentRequests should be " + maxConcurrentRequests)
        .isEqualTo(maxConcurrentRequests);
    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
        "maxRequestsToQueue should be " + getConfiguration()
            .getMaxWriteRequestsToQueue())
        .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
    }
  }

  @Test
  public void testMaxRequestsAndQueueCapacity() throws Exception {
    Configuration conf = getRawConfiguration();
    int maxConcurrentRequests = 6;
    int maxRequestsToQueue = 10;
    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
        "" + maxConcurrentRequests);
    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
        "" + maxRequestsToQueue);
    final AzureBlobFileSystem fs = getFileSystem(conf);
    try (FSDataOutputStream out = fs.create(path(TEST_FILE_PATH))) {
      AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();

      if (stream.isAppendBlobStream()) {
        maxConcurrentRequests = 1;
      }

      Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
          "maxConcurrentRequests should be " + maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
      Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
          .isEqualTo(maxRequestsToQueue);
    }
  }

  /**
   * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream
   * to make sure that the FS instance is not eligible for GC while writing.
   */
  @Test(timeout = TEST_EXECUTION_TIMEOUT)
  public void testAzureBlobFileSystemBackReferenceInOutputStream()
      throws Exception {
    byte[] testBytes = new byte[5 * 1024];
    // Creating an output stream using a FS in a separate method to make the
    // FS instance used eligible for GC. Since when a method is popped from
    // the stack frame, it's variables become anonymous, this creates higher
    // chance of getting Garbage collected.
    try (AbfsOutputStream out = getStream()) {
      // Every 5KB block written is flushed and a GC is hinted, if the
      // executor service is shut down in between, the test should fail
      // indicating premature shutdown while writing.
      for (int i = 0; i < 5; i++) {
        out.write(testBytes);
        out.flush();
        System.gc();
        Assertions.assertThat(
            out.getExecutorService().isShutdown() || out.getExecutorService()
                .isTerminated())
            .describedAs("Executor Service should not be closed before "
                + "OutputStream while writing")
            .isFalse();
        Assertions.assertThat(out.getFsBackRef().isNull())
            .describedAs("BackReference in output stream should not be null")
            .isFalse();
      }
    }
  }

  /**
   * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the
   * FS instance is closed before the stream.
   */
  @Test
  public void testAbfsOutputStreamClosingFsBeforeStream()
      throws Exception {
    AzureBlobFileSystem fs = new AzureBlobFileSystem();
    fs.initialize(new URI(getTestUrl()), new Configuration());
    Path pathFs = path(getMethodName());
    byte[] inputBytes = new byte[5 * 1024];
    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
        pathFs)) {
      out.write(inputBytes);
      fs.close();
      // verify that output stream close after fs.close() would raise a
      // pathIOE containing the path being written to.
      LambdaTestUtils
          .intercept(PathIOException.class, getMethodName(), out::close);
    }
  }

  @Test
  public void testExpect100ContinueFailureInAppend() throws Exception {
    Configuration configuration = new Configuration(getRawConfiguration());
    configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
    AzureBlobFileSystem fs = getFileSystem(configuration);
    Path path = new Path("/testFile");
    AbfsOutputStream os = Mockito.spy(
        (AbfsOutputStream) fs.create(path).getWrappedStream());
    AbfsClient spiedClient = Mockito.spy(os.getClient());
    AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
    mockSetupForAppend(httpOpForAppendTest, spiedClient);
    Mockito.doReturn(spiedClient).when(os).getClient();
    fs.delete(path, true);
    os.write(1);
    LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
      os.close();
    });
    Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
        .describedAs("First try from AbfsClient will have expect-100 "
            + "header and should fail with expect-100 error.").isTrue();
    if (httpOpForAppendTest[0] instanceof AbfsJdkHttpOperation) {
      Mockito.verify((AbfsJdkHttpOperation) httpOpForAppendTest[0],
              Mockito.times(0))
          .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
              Mockito.anyInt(), Mockito.anyInt());
    }

    Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
        .describedAs("The retried operation from AbfsClient should not "
            + "fail with expect-100 error. The retried operation does not have"
            + "expect-100 header.").isFalse();
    if (httpOpForAppendTest[1] instanceof AbfsJdkHttpOperation) {
      Mockito.verify((AbfsJdkHttpOperation) httpOpForAppendTest[1],
              Mockito.times(1))
          .processConnHeadersAndInputStreams(Mockito.any(byte[].class),
              Mockito.anyInt(), Mockito.anyInt());
    }
  }

  private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
      final AbfsClient spiedClient) {
    int[] index = new int[1];
    index[0] = 0;
    Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
          AbfsRestOperation op = Mockito.spy(
              (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
          Mockito.doAnswer(createHttpOpInvocation -> {
            httpOpForAppendTest[index[0]] = Mockito.spy(
                (AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
            return httpOpForAppendTest[index[0]++];
          }).when(op).createHttpOperation();
          return op;
        })
        .when(spiedClient)
        .getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
            Mockito.anyString(), Mockito.any(
                URL.class), Mockito.anyList(), Mockito.any(byte[].class),
            Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
  }

  /**
   * Separate method to create an outputStream using a local FS instance so
   * that once this method has returned, the FS instance can be eligible for GC.
   *
   * @return AbfsOutputStream used for writing.
   */
  private AbfsOutputStream getStream() throws URISyntaxException, IOException {
    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
    fs1.initialize(new URI(getTestUrl()), new Configuration());
    Path pathFs1 = path(getMethodName() + "1");

    return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
  }

}