AbstractAuditingTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.audit;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;


import software.amazon.awssdk.awscore.AwsExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.InterceptorContext;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_GET_FILE_STATUS;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.UNAUDITED_OPERATION;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.createIOStatisticsStoreForAuditing;
import static org.apache.hadoop.service.ServiceOperations.stopQuietly;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Abstract class for auditor unit tests.
 */
public abstract class AbstractAuditingTest extends AbstractHadoopTestBase {

  protected static final String OPERATION
      = INVOCATION_GET_FILE_STATUS.getSymbol();

  /**
   * Logging.
   */
  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractAuditingTest.class);

  public static final String PATH_1 = "/path1";

  public static final String PATH_2 = "/path2";

  /**
   * Statistics store with the auditor counters wired up.
   */
  private final IOStatisticsStore ioStatistics =
      createIOStatisticsStoreForAuditing();

  private RequestFactory requestFactory;

  private AuditManagerS3A manager;

  @BeforeEach
  public void setup() throws Exception {
    requestFactory = RequestFactoryImpl.builder()
        .withBucket("bucket")
        .build();
    manager = AuditIntegration.createAndStartAuditManager(
        createConfig(),
        ioStatistics);
  }

  /**
   * Create config.
   * @return config to use when creating a manager
   */
  protected abstract Configuration createConfig();

  @AfterEach
  public void teardown() {
    stopQuietly(manager);
  }

  protected IOStatisticsStore getIOStatistics() {
    return ioStatistics;
  }

  protected RequestFactory getRequestFactory() {
    return requestFactory;
  }

  protected AuditManagerS3A getManager() {
    return manager;
  }

  /**
   * Assert that a specific span is active.
   * This matches on the wrapped spans.
   * @param span span to assert over.
   */
  protected void assertActiveSpan(final AuditSpan span) {
    assertThat(activeSpan())
        .isSameAs(span);
  }

  /**
   * Assert a span is unbound/invalid.
   * @param span span to assert over.
   */
  protected void assertUnbondedSpan(final AuditSpan span) {
    assertThat(span.isValidSpan())
        .describedAs("Validity of %s", span)
        .isFalse();
  }

  protected AuditSpanS3A activeSpan() {
    return manager.getActiveAuditSpan();
  }

  /**
   * Create a head request and pass it through the manager's beforeExecution()
   * callback.
   *
   * @return a processed request.
   */
  protected SdkHttpRequest head() {
    HeadObjectRequest.Builder headObjectRequestBuilder =
        requestFactory.newHeadObjectRequestBuilder("/");
    manager.requestCreated(headObjectRequestBuilder);
    HeadObjectRequest headObjectRequest = headObjectRequestBuilder.build();
    ExecutionAttributes executionAttributes = ExecutionAttributes.builder().build();
    InterceptorContext context = InterceptorContext.builder()
        .request(headObjectRequest)
        .httpRequest(SdkHttpRequest.builder()
            .uri(URI.create("https://test"))
            .method(SdkHttpMethod.HEAD)
            .build())
        .build();
    manager.beforeExecution(context, executionAttributes);
    return manager.modifyHttpRequest(context, executionAttributes);
  }

  /**
   * Create a get request and pass it through the manager's beforeExecution()
   * callback.
   *
   * @return a processed request.
   */
  protected SdkHttpRequest get(String range) {
    GetObjectRequest.Builder getObjectRequestBuilder =
        requestFactory.newGetObjectRequestBuilder("/");

    SdkHttpRequest.Builder httpRequestBuilder =
        SdkHttpRequest.builder().uri(URI.create("https://test")).method(SdkHttpMethod.GET);

    if (!range.isEmpty()) {
      getObjectRequestBuilder.range(range);
      List<String> rangeHeader = new ArrayList<>();
      rangeHeader.add(range);
      Map<String, List<String>> headers = new HashMap<>();
      headers.put("Range", rangeHeader);
      httpRequestBuilder.headers(headers);
    }

    manager.requestCreated(getObjectRequestBuilder);
    GetObjectRequest getObjectRequest = getObjectRequestBuilder.build();
    ExecutionAttributes executionAttributes = ExecutionAttributes.builder().build().putAttribute(
        AwsExecutionAttribute.OPERATION_NAME, "GetObject");
    InterceptorContext context = InterceptorContext.builder()
        .request(getObjectRequest)
        .httpRequest(httpRequestBuilder.build())
        .build();
    manager.beforeExecution(context, executionAttributes);
    return manager.modifyHttpRequest(context, executionAttributes);
  }

  /**
   * Assert a head request fails as there is no
   * active span.
   */
  protected void assertHeadUnaudited() throws Exception {
    intercept(AuditFailureException.class,
        UNAUDITED_OPERATION, this::head);
  }

  /**
   * Assert that the audit failure is of a given value.
   * Returns the value to assist in chaining,
   * @param expected expected value
   * @return the expected value.
   */
  protected long verifyAuditFailureCount(
      final long expected) {
    return verifyCounter(Statistic.AUDIT_FAILURE, expected);
  }

  /**
   * Assert that the audit execution count
   * is of a given value.
   * Returns the value to assist in chaining,
   * @param expected expected value
   * @return the expected value.
   */
  protected long verifyAuditExecutionCount(
      final long expected) {
    return verifyCounter(Statistic.AUDIT_REQUEST_EXECUTION, expected);
  }

  /**
   * Assert that a statistic counter is of a given value.
   * Returns the value to assist in chaining,
   * @param statistic statistic to check
   * @param expected expected value
   * @return the expected value.
   */
  protected long verifyCounter(final Statistic statistic,
      final long expected) {
    IOStatisticAssertions.assertThatStatisticCounter(
        ioStatistics,
        statistic.getSymbol())
        .isEqualTo(expected);
    return expected;
  }

  /**
   * Create and switch to a span.
   * @return a span
   */
  protected AuditSpanS3A span() throws IOException {
    AuditSpanS3A span = manager.createSpan(OPERATION, PATH_1, PATH_2);
    assertThat(span)
        .matches(AuditSpan::isValidSpan);
    return span;
  }

  /**
   * Assert the map contains the expected (key, value).
   * @param params map of params
   * @param key key
   * @param expected expected value.
   */
  protected void assertMapContains(final Map<String, String> params,
      final String key, final String expected) {
    assertThat(params.get(key))
        .describedAs(key)
        .isEqualTo(expected);
  }

  /**
   * Assert the map does not contain the key, i.e, it is null.
   * @param params map of params
   * @param key key
   */
  protected void assertMapNotContains(final Map<String, String> params, final String key) {
    assertThat(params.get(key))
            .describedAs(key)
            .isNull();
  }

  /**
   * Create head request for bulk delete and pass it through beforeExecution of the manager.
   *
   * @param keys keys to be provided in the bulk delete request.
   * @return a processed request.
   */
  protected SdkHttpRequest headForBulkDelete(String... keys) {
    if (keys == null || keys.length == 0) {
      return null;
    }

    List<ObjectIdentifier> keysToDelete = Arrays
        .stream(keys)
        .map(key -> ObjectIdentifier.builder().key(key).build())
        .collect(Collectors.toList());

    ExecutionAttributes executionAttributes = ExecutionAttributes.builder().build();

    SdkHttpRequest.Builder httpRequestBuilder =
        SdkHttpRequest.builder().uri(URI.create("https://test")).method(SdkHttpMethod.POST);

    DeleteObjectsRequest deleteObjectsRequest =
        requestFactory.newBulkDeleteRequestBuilder(keysToDelete).build();

    InterceptorContext context = InterceptorContext.builder()
        .request(deleteObjectsRequest)
        .httpRequest(httpRequestBuilder.build())
        .build();

    manager.beforeExecution(context, executionAttributes);
    return manager.modifyHttpRequest(context, executionAttributes);
  }

}