ITestAuditManager.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.audit;
import java.nio.file.AccessDeniedException;
import java.util.EnumSet;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.s3a.Statistic.AUDIT_FAILURE;
import static org.apache.hadoop.fs.s3a.Statistic.AUDIT_REQUEST_EXECUTION;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.requireOutOfSpanOperationsRejected;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_REQUEST_HANDLERS;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.UNAUDITED_OPERATION;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test audit manager invocation by making assertions
* about the statistics of audit request execution
* {@link org.apache.hadoop.fs.s3a.Statistic#AUDIT_REQUEST_EXECUTION}
* and
* {@link org.apache.hadoop.fs.s3a.Statistic#AUDIT_FAILURE}.
*/
public class ITestAuditManager extends AbstractS3ACostTest {
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
resetAuditOptions(conf);
enableLoggingAuditor(conf);
conf.set(AUDIT_EXECUTION_INTERCEPTORS,
SimpleAWSExecutionInterceptor.CLASS);
conf.set(AUDIT_REQUEST_HANDLERS, "not-valid-class");
return conf;
}
/**
* Get the FS IOStatistics.
* @return the FS live IOSTats.
*/
private IOStatistics iostats() {
return getFileSystem().getIOStatistics();
}
/**
* Verify that operations outside a span are rejected
* by ensuring that the thread is outside a span, create
* a write operation helper, then
* reject it.
*/
@Test
public void testInvokeOutOfSpanRejected() throws Throwable {
describe("Operations against S3 will be rejected outside of a span");
final S3AFileSystem fs = getFileSystem();
requireOutOfSpanOperationsRejected(fs);
final long failures0 = lookupCounterStatistic(iostats(),
AUDIT_FAILURE.getSymbol());
final long exec0 = lookupCounterStatistic(iostats(),
AUDIT_REQUEST_EXECUTION.getSymbol());
// API call
// create and close a span, so the FS is not in a span.
fs.createSpan("span", null, null).close();
// this will be out of span
final WriteOperationHelper writer
= fs.getWriteOperationHelper();
// which can be verified
Assertions.assertThat(writer.getAuditSpan())
.matches(s -> !s.isValidSpan(), "Span is not valid");
// an S3 API call will fail and be mapped to access denial.
final AccessDeniedException ex = intercept(
AccessDeniedException.class, UNAUDITED_OPERATION, () ->
writer.listMultipartUploads("/"));
// verify the type of the inner cause, throwing the outer ex
// if it is null or a different class
if (!(ex.getCause() instanceof AuditFailureException)) {
throw ex;
}
assertThatStatisticCounter(iostats(), AUDIT_REQUEST_EXECUTION.getSymbol())
.isGreaterThan(exec0);
assertThatStatisticCounter(iostats(), AUDIT_FAILURE.getSymbol())
.isGreaterThan(failures0);
// stop rejecting out of span requests
fs.getAuditManager().setAuditFlags(EnumSet.of(AuditorFlags.PermitOutOfBandOperations));
writer.listMultipartUploads("/");
}
@Test
public void testExecutionInterceptorBinding() throws Throwable {
describe("Verify that extra ExecutionInterceptor can be added and that they"
+ " will be invoked during request execution");
final long baseCount = SimpleAWSExecutionInterceptor.getInvocationCount();
final S3AFileSystem fs = getFileSystem();
final long exec0 = lookupCounterStatistic(iostats(),
AUDIT_REQUEST_EXECUTION.getSymbol());
// API call to a known path, `getBucketLocation()` does not always result in an API call.
fs.listStatus(path("/"));
// which MUST have ended up calling the extension request handler
Assertions.assertThat(SimpleAWSExecutionInterceptor.getInvocationCount())
.describedAs("Invocation count of plugged in request handler")
.isGreaterThan(baseCount);
assertThatStatisticCounter(iostats(), AUDIT_REQUEST_EXECUTION.getSymbol())
.isGreaterThan(exec0);
assertThatStatisticCounter(iostats(), AUDIT_FAILURE.getSymbol())
.isZero();
Assertions.assertThat(SimpleAWSExecutionInterceptor.getStaticConf())
.describedAs("configuratin of SimpleAWSExecutionInterceptor")
.isNotNull()
.isSameAs(fs.getConf());
}
}