AbstractOperationAuditor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.audit.impl;
import java.util.EnumSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
import org.apache.hadoop.fs.s3a.audit.AuditorFlags;
import org.apache.hadoop.fs.s3a.audit.OperationAuditor;
import org.apache.hadoop.fs.s3a.audit.OperationAuditorOptions;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.service.AbstractService;
import static java.util.Objects.requireNonNull;
/**
* This is a long-lived service which is created in S3A FS initialize
* (make it fast!) which provides context for tracking operations made to S3.
* An IOStatisticsStore is passed in -in production this is expected to
* be the S3AFileSystem instrumentation, which will have the
* {@code AUDIT_SPAN_START} statistic configured for counting durations.
*/
public abstract class AbstractOperationAuditor extends AbstractService
implements OperationAuditor {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractOperationAuditor.class);
/**
* Base of IDs is a UUID.
*/
public static final String BASE = UUID.randomUUID().toString();
/**
* Counter to create unique auditor IDs.
*/
private static final AtomicLong SPAN_ID_COUNTER = new AtomicLong(1);
/**
* Destination for recording statistics, especially duration/count of
* operations.
* Set in {@link #init(OperationAuditorOptions)}.
*/
private IOStatisticsStore iostatistics;
/**
* Options: set in {@link #init(OperationAuditorOptions)}.
*/
private OperationAuditorOptions options;
/**
* Should out of span requests be rejected?
*/
private AtomicBoolean rejectOutOfSpan = new AtomicBoolean(false);
/**
* Auditor ID as a UUID.
*/
private final UUID auditorUUID = UUID.randomUUID();
/**
* ID of the auditor, which becomes that of the filesystem
* in request contexts.
*/
private final String auditorID = auditorUUID.toString();
/**
* Audit flags which can be passed down to subclasses.
*/
private EnumSet<AuditorFlags> auditorFlags;
/**
* Construct.
* @param name name
*
*/
protected AbstractOperationAuditor(final String name) {
super(name);
}
/**
* Sets the IOStats and then calls init().
* @param opts options to initialize with.
*/
@Override
public void init(final OperationAuditorOptions opts) {
this.options = opts;
this.iostatistics = requireNonNull(opts.getIoStatisticsStore());
init(opts.getConfiguration());
}
@Override
protected void serviceInit(final Configuration conf) throws Exception {
super.serviceInit(conf);
setRejectOutOfSpan(AuditIntegration.isRejectOutOfSpan(conf));
LOG.debug("{}: Out of span operations will be {}",
getName(),
isRejectOutOfSpan() ? "rejected" : "ignored");
}
@Override
public String getAuditorId() {
return auditorID;
}
/**
* Get the IOStatistics Store.
* @return the IOStatistics store updated with statistics.
*/
public IOStatisticsStore getIOStatistics() {
return iostatistics;
}
/**
* Get the options this auditor was initialized with.
* @return options.
*/
protected OperationAuditorOptions getOptions() {
return options;
}
/**
* Create a span ID.
* @return a unique span ID.
*/
protected final String createSpanID() {
return String.format("%s-%08d",
auditorID, SPAN_ID_COUNTER.incrementAndGet());
}
/**
* Should out of scope ops be rejected?
* @return true if out of span calls should be rejected.
*/
protected boolean isRejectOutOfSpan() {
return rejectOutOfSpan.get();
}
/**
* Enable/disable out of span rejection.
* @param rejectOutOfSpan new value.
*/
protected void setRejectOutOfSpan(boolean rejectOutOfSpan) {
this.rejectOutOfSpan.set(rejectOutOfSpan);
}
/**
* Update Auditor flags.
* Calls {@link #auditorFlagsChanged(EnumSet)} after the update.
* @param flags audit flags.
*/
@Override
public void setAuditFlags(final EnumSet<AuditorFlags> flags) {
auditorFlags = flags;
auditorFlagsChanged(flags);
}
/**
* Get the current set of auditor flags.
*
* @return the current set of auditor flags.
*/
public EnumSet<AuditorFlags> getAuditorFlags() {
return auditorFlags;
}
/**
* Notification that the auditor flags have been updated.
* @param flags audit flags.
*/
protected void auditorFlagsChanged(EnumSet<AuditorFlags> flags) {
// if out of band operations are allowed, configuration settings are overridden
if (flags.contains(AuditorFlags.PermitOutOfBandOperations)) {
LOG.debug("Out of span operations are required by the stream factory");
setRejectOutOfSpan(false);
}
}
}