OperationCostValidator.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.assertj.core.api.Assumptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.metrics2.lib.MutableCounter;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Support for declarative assertions about operation cost.
* <p></p>
* Usage: A builder is used to declare the set of statistics
* to be monitored in the filesystem.
* <p></p>
* A call to {@link #exec(Callable, ExpectedProbe...)}
* executes the callable if 1+ probe is enabled; after
* invocation the probes are validated.
* The result of the callable is returned.
* <p></p>
* A call of {@link #intercepting(Class, String, Callable, ExpectedProbe...)}
* Invokes the callable if 1+ probe is enabled, expects an exception
* to be raised and then verifies metrics declared in the probes.
* <p></p>
* Probes are built up from the static method to create probes
* for metrics:
* <ul>
* <li>{@link #probe(boolean, Statistic, int)} </li>
* <li>{@link #probe(Statistic, int)} </li>
* <li>{@link #probes(boolean, ExpectedProbe...)} (Statistic, int)} </li>
* <li>{@link #always()}</li>
* </ul>
* If any probe evaluates to false, an assertion is raised.
* <p></p>
* When this happens: look in the logs!
* The logs will contain the whole set of metrics, the probe details
* and the result of the call.
*/
public final class OperationCostValidator {
private static final Logger LOG =
LoggerFactory.getLogger(OperationCostValidator.class);
/**
* The empty probe: declared as disabled.
*/
private static final ExpectedProbe EMPTY_PROBE =
new EmptyProbe("empty", false);
/**
* A probe which is always enabled.
*/
private static final ExpectedProbe ALWAYS_PROBE =
new EmptyProbe("always", true);
/**
* The map of metric diffs to track.
*/
private final Map<String, S3ATestUtils.MetricDiff> metricDiffs
= new TreeMap<>();
/**
* Instrumentation's IO Statistics.
*/
private final IOStatisticsStore ioStatistics;
/**
* Build the instance.
* @param builder builder containing all options.
*/
private OperationCostValidator(Builder builder) {
S3AFileSystem fs = builder.filesystem;
S3AInstrumentation instrumentation = fs.getInstrumentation();
for (Statistic stat : builder.metrics) {
String symbol = stat.getSymbol();
MutableMetric metric = instrumentation.lookupMetric(symbol);
if (metric instanceof MutableCounter) {
// only counters are used in the cost tracking;
// other statistics are ignored.
metricDiffs.put(symbol,
new S3ATestUtils.MetricDiff(fs, stat));
}
}
builder.metrics.clear();
ioStatistics = instrumentation.getIOStatistics();
}
/**
* Reset all the metrics being tracked.
*/
public void resetMetricDiffs() {
metricDiffs.values().forEach(S3ATestUtils.MetricDiff::reset);
}
/**
* Get the diff of a statistic.
* @param stat statistic to look up
* @return the value
* @throws NullPointerException if there is no match
*/
public S3ATestUtils.MetricDiff get(Statistic stat) {
S3ATestUtils.MetricDiff diff =
requireNonNull(metricDiffs.get(stat.getSymbol()),
() -> "No metric tracking for " + stat);
return diff;
}
/**
* Execute a closure and verify the metrics.
* <p></p>
* If no probes are active, the operation will
* raise an Assumption exception for the test to be skipped.
* @param eval closure to evaluate
* @param expected varargs list of expected diffs
* @param <T> return type.
* @return the result of the evaluation
*/
public <T> T exec(
Callable<T> eval,
ExpectedProbe... expectedA) throws Exception {
List<ExpectedProbe> expected = Arrays.asList(expectedA);
resetMetricDiffs();
// verify that 1+ probe is enabled
assumeProbesEnabled(expected);
// if we get here, then yes.
// evaluate it
T r = eval.call();
// build the text for errors
String text =
"operation returning "
+ (r != null ? r.toString() : "null");
LOG.info("{}", text);
LOG.info("state {}", this.toString());
LOG.info("probes {}", expected);
LOG.info("IOStatistics {}", ioStatisticsToPrettyString(ioStatistics));
for (ExpectedProbe ed : expected) {
ed.verify(this, text);
}
return r;
}
/**
* Scan all probes for being enabled.
* <p></p>
* If none of them are enabled, the evaluation will be skipped.
* @param expected list of expected probes
*/
private void assumeProbesEnabled(List<ExpectedProbe> expected) {
boolean enabled = false;
for (ExpectedProbe ed : expected) {
enabled |= ed.isEnabled();
}
String pstr = expected.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
Assumptions.assumeThat(enabled)
.describedAs("metrics to probe for are not enabled in %s", pstr)
.isTrue();
}
/**
* Execute a closure, expecting an exception.
* Verify the metrics after the exception has been caught and
* validated.
* @param clazz type of exception
* @param text text to look for in exception (optional)
* @param eval closure to evaluate
* @param expected varargs list of expected diffs
* @param <T> return type of closure
* @param <E> exception type
* @return the exception caught.
* @throws Exception any other exception
*/
public <T, E extends Throwable> E intercepting(
Class<E> clazz,
String text,
Callable<T> eval,
ExpectedProbe... expected) throws Exception {
return exec(() ->
intercept(clazz, text, eval),
expected);
}
@Override
public String toString() {
return metricDiffs.values().stream()
.map(S3ATestUtils.MetricDiff::toString)
.collect(Collectors.joining(", "));
}
/**
* Create a builder for the cost checker.
*
* @param fs filesystem.
* @return builder.
*/
public static Builder builder(S3AFileSystem fs) {
return new Builder(fs);
}
/**
* builder.
*/
public static final class Builder {
/**
* Filesystem.
*/
private final S3AFileSystem filesystem;
/**
* Metrics to create.
*/
private final List<Statistic> metrics = new ArrayList<>();
/**
* Create with a required filesystem.
* @param filesystem monitored filesystem
*/
public Builder(final S3AFileSystem filesystem) {
this.filesystem = requireNonNull(filesystem);
}
/**
* Add a single metric.
* @param statistic statistic to monitor.
* @return this
*/
public Builder withMetric(Statistic statistic) {
metrics.add(statistic);
return this;
}
/**
* Add a varargs list of metrics.
* @param stats statistics to monitor.
* @return this.
*/
public Builder withMetrics(Statistic...stats) {
metrics.addAll(Arrays.asList(stats));
return this;
}
/**
* Add all counters and duration types to the
* metrics which can be asserted over.
* @return this.
*/
public Builder withAllCounters() {
EnumSet.allOf(Statistic.class).stream()
.filter(s ->
s.getType() == StatisticTypeEnum.TYPE_COUNTER
|| s.getType() == StatisticTypeEnum.TYPE_DURATION)
.forEach(metrics::add);
return this;
}
/**
* Instantiate.
* @return the validator.
*/
public OperationCostValidator build() {
return new OperationCostValidator(this);
}
}
/**
* Get the "always" probe.
* @return a probe which always triggers execution.
*/
public static ExpectedProbe always() {
return ALWAYS_PROBE;
}
/**
* Create a probe of a statistic which is enabled whenever the expected
* value is greater than zero.
* @param statistic statistic to check.
* @param expected expected value.
* @return a probe.
*/
public static ExpectedProbe probe(
final Statistic statistic,
final int expected) {
return probe(expected >= 0, statistic, expected);
}
/**
* Create a probe of a statistic which is conditionally enabled.
* @param enabled is the probe enabled?
* @param statistic statistic to check.
* @param expected expected value.
* @return a probe.
*/
public static ExpectedProbe probe(
final boolean enabled,
final Statistic statistic,
final int expected) {
return enabled
? new ExpectSingleStatistic(statistic, expected)
: EMPTY_PROBE;
}
/**
* Create an aggregate probe from a vararges list of probes.
* @param enabled should the probes be enabled?
* @param plist probe list
* @return a probe
*/
public static ExpectedProbe probes(
final boolean enabled,
final ExpectedProbe...plist) {
return enabled
? new ProbeList(Arrays.asList(plist))
: EMPTY_PROBE;
}
/**
* Expect the exact head and list requests of the operation
* cost supplied.
* @param enabled is the probe enabled?
* @param cost expected cost.
* @return a probe.
*/
public static ExpectedProbe expect(
boolean enabled, OperationCost cost) {
return probes(enabled,
probe(OBJECT_METADATA_REQUESTS, cost.head()),
probe(OBJECT_LIST_REQUEST, cost.list()));
}
/**
* An expected probe to verify given criteria to trigger an eval.
* <p></p>
* Probes can be conditional, in which case they are only evaluated
* when true.
*/
public interface ExpectedProbe {
/**
* Verify a diff if the FS instance is compatible.
* @param message message to print; metric name is appended
*/
void verify(OperationCostValidator diffs, String message);
boolean isEnabled();
}
/**
* Simple probe is a single statistic.
*/
public static final class ExpectSingleStatistic implements ExpectedProbe {
private final Statistic statistic;
private final int expected;
/**
* Create.
* @param statistic statistic
* @param expected expected value.
*/
private ExpectSingleStatistic(final Statistic statistic,
final int expected) {
this.statistic = statistic;
this.expected = expected;
}
/**
* Verify a diff if the FS instance is compatible.
* @param message message to print; metric name is appended
*/
@Override
public void verify(OperationCostValidator diffs, String message) {
diffs.get(statistic).assertDiffEquals(message, expected);
}
public Statistic getStatistic() {
return statistic;
}
public int getExpected() {
return expected;
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public String toString() {
String sb = "ExpectSingleStatistic{"
+ statistic
+ ", expected=" + expected
+ ", enabled=" + isEnabled()
+ '}';
return sb;
}
}
/**
* A list of probes; the verify operation
* verifies them all.
*/
public static class ProbeList implements ExpectedProbe {
/**
* Probe list.
*/
private final List<ExpectedProbe> probes;
/**
* Constructor.
* @param probes probe list.
*/
public ProbeList(final List<ExpectedProbe> probes) {
this.probes = probes;
}
@Override
public void verify(final OperationCostValidator diffs,
final String message) {
probes.forEach(p -> p.verify(diffs, message));
}
/**
* Enabled if 1+ probe is enabled.
* @return true if enabled.
*/
@Override
public boolean isEnabled() {
boolean enabled = false;
for (ExpectedProbe probe : probes) {
enabled |= probe.isEnabled();
}
return enabled;
}
@Override
public String toString() {
String pstr = probes.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
return "ProbeList{" + pstr + '}';
}
}
/**
* The empty probe always runs; it can be used to force
* a verification to execute.
*/
private static final class EmptyProbe implements ExpectedProbe {
private final String name;
private final boolean enabled;
private EmptyProbe(final String name, boolean enabled) {
this.name = name;
this.enabled = enabled;
}
@Override
public void verify(final OperationCostValidator diffs,
final String message) {
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public String toString() {
return name;
}
}
}