TestAuditIntegration.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.audit;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.List;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.interceptor.InterceptorContext;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import org.junit.jupiter.api.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditor;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.attachSpanToRequest;
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.retrieveAttachedSpan;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.createIOStatisticsStoreForAuditing;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditConfig;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME;
import static org.apache.hadoop.fs.s3a.audit.impl.S3AInternalAuditConstants.AUDIT_SPAN_EXECUTION_ATTRIBUTE;
import static org.apache.hadoop.service.ServiceAssert.assertServiceStateStarted;
import static org.apache.hadoop.service.ServiceAssert.assertServiceStateStopped;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for auditing.
*/
public class TestAuditIntegration extends AbstractHadoopTestBase {
private final IOStatisticsStore ioStatistics =
createIOStatisticsStoreForAuditing();
/**
* AuditFailureException is mapped to AccessDeniedException.
*/
@Test
public void testExceptionTranslation() throws Throwable {
intercept(AccessDeniedException.class,
() -> {
throw translateException("test", "/",
new AuditFailureException("should be translated"));
});
}
/**
* UnsupportedRequest mapping and fail fast outcome.
*/
@Test
public void testUnsupportedExceptionTranslation() throws Throwable {
final UnsupportedRequestException ex = intercept(UnsupportedRequestException.class, () -> {
throw translateException("test", "/",
new AuditOperationRejectedException("not supported"));
});
final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false));
final RetryPolicy.RetryAction action = retryPolicy.shouldRetry(ex, 0, 0, true);
assertThat(action.action)
.describedAs("retry policy %s for %s", action, ex)
.isEqualTo(RetryPolicy.RetryAction.RetryDecision.FAIL);
}
/**
* Create a no-op auditor.
*/
@Test
public void testNoOpAuditorInstantiation() throws Throwable {
OperationAuditor auditor = createAndStartNoopAuditor(
ioStatistics);
assertThat(auditor)
.describedAs("No-op auditor")
.isInstanceOf(NoopAuditor.class)
.satisfies(o -> o.isInState(Service.STATE.STARTED));
}
/**
* Create a no-op auditor through AuditIntegration, just as
* the audit manager does.
* @param store stats store.
* @return a started auditor
*/
private NoopAuditor createAndStartNoopAuditor(
final IOStatisticsStore store)
throws IOException {
Configuration conf = noopAuditConfig();
OperationAuditorOptions options =
OperationAuditorOptions.builder()
.withConfiguration(conf)
.withIoStatisticsStore(store);
OperationAuditor auditor =
AuditIntegration.createAndInitAuditor(conf,
AUDIT_SERVICE_CLASSNAME,
options);
assertThat(auditor)
.describedAs("No-op auditor")
.isInstanceOf(NoopAuditor.class)
.satisfies(o -> o.isInState(Service.STATE.INITED));
auditor.start();
return (NoopAuditor) auditor;
}
/**
* The auditor class has to exist.
*/
@Test
public void testCreateNonexistentAuditor() throws Throwable {
final Configuration conf = new Configuration();
OperationAuditorOptions options =
OperationAuditorOptions.builder()
.withConfiguration(conf)
.withIoStatisticsStore(ioStatistics);
conf.set(AUDIT_SERVICE_CLASSNAME, "not.a.known.class");
intercept(RuntimeException.class, () ->
AuditIntegration.createAndInitAuditor(conf,
AUDIT_SERVICE_CLASSNAME,
options));
}
/**
* The audit manager creates the auditor the config tells it to;
* this will have the same lifecycle as the manager.
*/
@Test
public void testAuditManagerLifecycle() throws Throwable {
AuditManagerS3A manager = AuditIntegration.createAndStartAuditManager(
noopAuditConfig(),
ioStatistics);
OperationAuditor auditor = manager.getAuditor();
assertServiceStateStarted(auditor);
manager.close();
assertServiceStateStopped(auditor);
}
@Test
public void testSingleExecutionInterceptor() throws Throwable {
AuditManagerS3A manager = AuditIntegration.createAndStartAuditManager(
noopAuditConfig(),
ioStatistics);
List<ExecutionInterceptor> interceptors
= manager.createExecutionInterceptors();
assertThat(interceptors)
.hasSize(1);
ExecutionInterceptor interceptor = interceptors.get(0);
RequestFactory requestFactory = RequestFactoryImpl.builder()
.withBucket("bucket")
.build();
HeadObjectRequest.Builder requestBuilder =
requestFactory.newHeadObjectRequestBuilder("/");
assertThat(interceptor instanceof AWSAuditEventCallbacks).isTrue();
((AWSAuditEventCallbacks)interceptor).requestCreated(requestBuilder);
HeadObjectRequest request = requestBuilder.build();
SdkHttpRequest httpRequest = SdkHttpRequest.builder()
.protocol("https")
.host("test")
.method(SdkHttpMethod.HEAD)
.build();
ExecutionAttributes attributes = ExecutionAttributes.builder().build();
InterceptorContext context = InterceptorContext.builder()
.request(request)
.httpRequest(httpRequest)
.build();
// test the basic pre-request sequence while avoiding
// the complexity of recreating the full sequence
// (and probably getting it wrong)
interceptor.beforeExecution(context, attributes);
interceptor.modifyRequest(context, attributes);
interceptor.beforeMarshalling(context, attributes);
interceptor.afterMarshalling(context, attributes);
interceptor.modifyHttpRequest(context, attributes);
interceptor.beforeTransmission(context, attributes);
AuditSpanS3A span = attributes.getAttribute(AUDIT_SPAN_EXECUTION_ATTRIBUTE);
assertThat(span).isNotNull();
assertThat(span.isValidSpan()).isFalse();
}
/**
* Register a second handler, verify it makes it to the list.
*/
@Test
public void testRequestHandlerLoading() throws Throwable {
Configuration conf = noopAuditConfig();
conf.setClassLoader(this.getClass().getClassLoader());
conf.set(AUDIT_EXECUTION_INTERCEPTORS,
SimpleAWSExecutionInterceptor.CLASS);
AuditManagerS3A manager = AuditIntegration.createAndStartAuditManager(
conf,
ioStatistics);
assertThat(manager.createExecutionInterceptors())
.hasSize(2)
.hasAtLeastOneElementOfType(SimpleAWSExecutionInterceptor.class);
}
@Test
public void testLoggingAuditorBinding() throws Throwable {
AuditManagerS3A manager = AuditIntegration.createAndStartAuditManager(
AuditTestSupport.loggingAuditConfig(),
ioStatistics);
OperationAuditor auditor = manager.getAuditor();
assertServiceStateStarted(auditor);
manager.close();
assertServiceStateStopped(auditor);
}
@Test
public void testNoopAuditManager() throws Throwable {
AuditManagerS3A manager = AuditIntegration.stubAuditManager();
assertThat(manager.createTransferListener())
.describedAs("transfer listener")
.isNotNull();
}
@Test
public void testSpanAttachAndRetrieve() throws Throwable {
AuditManagerS3A manager = AuditIntegration.stubAuditManager();
AuditSpanS3A span = manager.createSpan("op", null, null);
ExecutionAttributes attributes = ExecutionAttributes.builder().build();
attachSpanToRequest(attributes, span);
AuditSpanS3A retrievedSpan = retrieveAttachedSpan(attributes);
assertThat(retrievedSpan).isSameAs(span);
}
}