TestStreamFactories.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl.streams;

import java.io.IOException;
import java.io.UncheckedIOException;

import org.apache.hadoop.fs.s3a.Statistic;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_CUSTOM_FACTORY;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_CLASSIC;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_CUSTOM;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_PREFETCH;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements.Requirements.RequiresFuturePool;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.DEFAULT_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.E_EMPTY_CUSTOM_CLASSNAME;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.E_INVALID_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.factoryFromConfig;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Unit test for stream factory creation.
 * Verifies mapping of name to type, default handling,
 * legacy prefetch switch and failure handling.
 */
public class TestStreamFactories extends AbstractHadoopTestBase {

  /**
   * The empty string and "default" both map to the classic stream.
   */
  @Test
  public void testDefaultFactoryCreation() throws Throwable {
    load("", DEFAULT_STREAM_TYPE,
        ClassicObjectInputStreamFactory.class);
    load(INPUT_STREAM_TYPE_DEFAULT, DEFAULT_STREAM_TYPE,
        ClassicObjectInputStreamFactory.class);
  }

  /**
   * Classic factory.
   */
  @Test
  public void testClassicFactoryCreation() throws Throwable {
    final ClassicObjectInputStreamFactory f =
        load(INPUT_STREAM_TYPE_CLASSIC, DEFAULT_STREAM_TYPE,
            ClassicObjectInputStreamFactory.class);
    final StreamFactoryRequirements requirements = f.factoryRequirements();
    assertThat(requirements.requiresFuturePool())
        .describedAs("requires future pool of %s", requirements)
        .isFalse();
    assertRequirement(requirements,
        ExpectUnauditedGetRequests,
        false);
  }

  /**
   * Asset taht the requirements matches the specified need.
   * @param requirements requirements instance
   * @param probe requirement to probe for.
   * @param shouldMatch is the requirement to be met to to fail?
   */
  private static void assertRequirement(
      final StreamFactoryRequirements requirements,
      final StreamFactoryRequirements.Requirements probe,
      final boolean shouldMatch) {
    assertThat(requirements.requires(probe))
        .describedAs("%s of %s", probe, requirements)
        .isEqualTo(shouldMatch);
  }

  /**
   * Prefetch factory.
   */
  @Test
  public void testPrefetchFactoryCreation() throws Throwable {
    // load from config option
    final PrefetchingInputStreamFactory f = load(INPUT_STREAM_TYPE_PREFETCH,
        InputStreamType.Prefetch,
        PrefetchingInputStreamFactory.class);
    final StreamFactoryRequirements requirements = f.factoryRequirements();
    assertThat(requirements.requiresFuturePool())
        .describedAs("requires future pool of %s", requirements)
        .isTrue();
    assertRequirement(requirements,
        ExpectUnauditedGetRequests,
        false);
    assertRequirement(requirements,
        RequiresFuturePool,
        true);
  }

  /**
   * Prefetch factory via the prefect enabled flag.
   * This is returned before any attempt is made to instantiate
   * the stream type option.
   */

  @Test
  public void testPrefetchEnabledFlag() throws Throwable {

    // request an analytics stream
    final Configuration conf = configWithStream("undefined");
    // but then set the prefetch key
    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
    assertFactorySatisfies(factoryFromConfig(conf),
        INPUT_STREAM_TYPE_PREFETCH,
        InputStreamType.Prefetch,
        PrefetchingInputStreamFactory.class);
  }

  @Test
  public void testRequirementFlagsNoElements() throws Throwable {
    VectoredIOContext vertex = new VectoredIOContext();

    // no elements
    final StreamFactoryRequirements r1 =
        new StreamFactoryRequirements(1, 2, vertex);
    assertRequirement(r1, ExpectUnauditedGetRequests, false);
    assertRequirement(r1, RequiresFuturePool, false);
    assertThat(r1.requiresFuturePool())
        .describedAs("requiresFuturePool() %s", r1)
        .isFalse();
    assertThat(r1)
        .describedAs("%s", r1)
        .matches(r -> !r.requiresFuturePool(), "requiresFuturePool")
        .satisfies(r ->
            assertThat(r.sharedThreads()).isEqualTo(1))
        .satisfies(r ->
            assertThat(r.streamThreads()).isEqualTo(2));
  }

  @Test
  public void testRequirementFlagsFutures() throws Throwable {
    VectoredIOContext vertex = new VectoredIOContext();

    final StreamFactoryRequirements r1 =
        new StreamFactoryRequirements(1, 2, vertex, RequiresFuturePool);
    assertRequirement(r1, ExpectUnauditedGetRequests, false);
    assertRequirement(r1, RequiresFuturePool, true);
    assertThat(r1.requiresFuturePool())
        .describedAs("requiresFuturePool() %s", r1)
        .isTrue();
  }

  @Test
  public void testRequirementFlagsUnaudited() throws Throwable {
    VectoredIOContext vertex = new VectoredIOContext();

    final StreamFactoryRequirements r1 =
        new StreamFactoryRequirements(1, 2, vertex, ExpectUnauditedGetRequests);
    assertRequirement(r1, ExpectUnauditedGetRequests, true);
    assertRequirement(r1, RequiresFuturePool, false);
  }


  /**
   * Create a factory, assert that it satisfies the requirements.
   * @param name name: only used for assertion messages.
   * @param type expected stream type.
   * @param clazz expected class.
   * @param <T> class to expect
   */
  private static <T extends ObjectInputStreamFactory> T load(
      String name,
      InputStreamType type,
      Class<T> clazz) throws IOException {

    final ObjectInputStreamFactory factory = factory(name);
    assertFactorySatisfies(factory, name, type, clazz);
    factory.init(new Configuration(false));
    factory.bind(new FactoryBindingParameters(new Callbacks()));
    return (T)factory;
  }

  /**
   * Assert that a factory satisfies the requirements.
   * @param factory factory
   * @param name name: only used for assertion messages.
   * @param type expected stream type.
   * @param clazz expected class.
   * @param <T> class to expect
   */
  private static <T extends ObjectInputStreamFactory> void assertFactorySatisfies(
      final ObjectInputStreamFactory factory,
      final String name,
      final InputStreamType type,
      final Class<T> clazz) {
    assertThat(factory)
        .describedAs("Factory for stream %s", name)
        .isInstanceOf(clazz)
        .satisfies(f ->
            assertThat(factory.streamType()).isEqualTo(type));
  }

  /**
   * When an unknown stream type is passed in, it is rejected.
   */
  @Test
  public void testUnknownStreamType() throws Throwable {
    final String name = "unknown";
    intercept(IllegalArgumentException.class, E_INVALID_STREAM_TYPE,
        () -> factory(name));
  }

  /**
   * Create a factory, using the given name as the configuration option.
   * @param name stream name.
   * @return the factory
   */
  private static ObjectInputStreamFactory factory(final String name) {
    return factoryFromConfig(configWithStream(name));
  }

  /**
   * Create a configuration with the given name declared as the input
   * stream.
   * @param name stream name.
   * @return the prepared configuration.
   */
  private static Configuration configWithStream(final String name) {
    final Configuration conf = new Configuration(false);
    conf.set(INPUT_STREAM_TYPE, name);
    return conf;
  }

  /**
   * Custom factory loading: the good path.
   */
  @Test
  public void testCustomFactoryLoad() throws Throwable {
    final Configuration conf = configWithStream(INPUT_STREAM_TYPE_CUSTOM);
    conf.set(INPUT_STREAM_CUSTOM_FACTORY, CustomFactory.class.getName());
    final ObjectInputStreamFactory factory = factoryFromConfig(conf);
    assertThat(factory.streamType())
        .isEqualTo(InputStreamType.Custom);
    assertThat(factory)
        .isInstanceOf(CustomFactory.class);
  }

  /**
   * A custom factory must have a classname.
   */
  @Test
  public void testCustomFactoryUndefined() throws Throwable {
    intercept(IllegalArgumentException.class, E_EMPTY_CUSTOM_CLASSNAME,
        () -> factory(INPUT_STREAM_TYPE_CUSTOM));
  }

  /**
   * Constructor failures are passed in, deeply wrapped though.
   */
  @Test
  public void testCustomConstructorFailure() throws Throwable {
    final Configuration conf = configWithStream(INPUT_STREAM_TYPE_CUSTOM);
    conf.set(INPUT_STREAM_CUSTOM_FACTORY, FactoryFailsToInstantiate.class.getName());
    final RuntimeException ex =
        intercept(RuntimeException.class, "InvocationTargetException",
            () -> factoryFromConfig(conf));
    assertThat(ex.getCause().getCause())
        .describedAs("innermost exception")
        .isInstanceOf(UncheckedIOException.class);
  }

  /**
   * Simple factory.
   */
  public static class CustomFactory extends AbstractObjectInputStreamFactory {

    public CustomFactory() {
      super("custom");
    }

    @Override
    public InputStreamType streamType() {
      return InputStreamType.Custom;
    }

    @Override
    public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
      return null;
    }

    @Override
    public StreamFactoryRequirements factoryRequirements() {
      return null;
    }
  }

  /**
   * Factory which raises an exception during construction.
   */
  public static final class FactoryFailsToInstantiate extends CustomFactory {

    public FactoryFailsToInstantiate() {
      throw new UncheckedIOException("failed to instantiate", new IOException());
    }

  }

  /**
   * Callbacks from {@link ObjectInputStreamFactory} instances.
   */
  private static final class Callbacks implements ObjectInputStreamFactory.StreamFactoryCallbacks {

    @Override
    public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
      throw new UnsupportedOperationException("not implemented");
    }

    @Override
    public void incrementFactoryStatistic(Statistic statistic) {
      throw new UnsupportedOperationException("not implemented");
    }
  }

}