AwsStatisticsCollector.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.statistics.impl;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.SdkMetric;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
/**
* Collect statistics from the AWS SDK and forward to an instance of
* {@link StatisticsFromAwsSdk} and thence into the S3A statistics.
* <p>
* See {@code com.facebook.presto.hive.s3.PrestoS3FileSystemMetricCollector}
* for the inspiration for this.
* <p>
* See {@code software.amazon.awssdk.core.metrics.CoreMetric} for metric names.
*/
public class AwsStatisticsCollector implements MetricPublisher {
/**
* final destination of updates.
*/
private final StatisticsFromAwsSdk collector;
/**
* Instantiate.
* @param collector final destination of updates
*/
public AwsStatisticsCollector(final StatisticsFromAwsSdk collector) {
this.collector = collector;
}
/**
* This is the callback from the AWS SDK where metrics
* can be collected.
* @param metricCollection metrics collection
*/
@Override
public void publish(MetricCollection metricCollection) {
// MetricCollections are nested, so we need to traverse through their
// "children" to collect the desired metrics. E.g.:
//
// ApiCall
// ���������������������������������������������������������������������������������������������������������������������������������
// ��� MarshallingDuration=PT0.002808333S ���
// ��� RetryCount=0 ���
// ��� ApiCallSuccessful=true ���
// ��� OperationName=DeleteObject ���
// ��� ApiCallDuration=PT0.079801458S ���
// ��� CredentialsFetchDuration=PT0.000007083S ���
// ��� ServiceId=S3 ���
// ���������������������������������������������������������������������������������������������������������������������������������
// ApiCallAttempt
// ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
// ��� SigningDuration=PT0.000319375S ���
// ��� ServiceCallDuration=PT0.078908584S ���
// ��� AwsExtendedRequestId=Kmvb2Sz8NuDgIFJPKzLLBhuHgQGmpAjVYBMrSHDvy= ���
// ��� HttpStatusCode=204 ���
// ��� BackoffDelayDuration=PT0S ���
// ��� AwsRequestId=KR0XZCSX ���
// ���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
// HttpClient
// ���������������������������������������������������������������������������������������������������������
// ��� AvailableConcurrency=1 ���
// ��� LeasedConcurrency=0 ���
// ��� ConcurrencyAcquireDuration=PT0S ���
// ��� PendingConcurrencyAcquires=0 ���
// ��� MaxConcurrency=96 ���
// ��� HttpClientName=Apache ���
// ���������������������������������������������������������������������������������������������������������
final long[] throttling = {0};
recurseThroughChildren(metricCollection)
.collect(Collectors.toList())
.forEach(m -> {
counter(m, CoreMetric.RETRY_COUNT, retries -> {
collector.updateAwsRetryCount(retries);
collector.updateAwsRequestCount(retries + 1);
});
counter(m, HttpMetric.HTTP_STATUS_CODE, statusCode -> {
if (statusCode == HttpStatusCode.THROTTLING) {
throttling[0] += 1;
}
});
timing(m, CoreMetric.API_CALL_DURATION,
collector::noteAwsClientExecuteTime);
timing(m, CoreMetric.SERVICE_CALL_DURATION,
collector::noteAwsRequestTime);
timing(m, CoreMetric.MARSHALLING_DURATION,
collector::noteRequestMarshallTime);
timing(m, CoreMetric.SIGNING_DURATION,
collector::noteRequestSigningTime);
timing(m, CoreMetric.UNMARSHALLING_DURATION,
collector::noteResponseProcessingTime);
});
collector.updateAwsThrottleExceptionsCount(throttling[0]);
}
@Override
public void close() {
}
/**
* Process a timing.
* @param collection metric collection
* @param metric metric
* @param durationConsumer consumer
*/
private void timing(
MetricCollection collection,
SdkMetric<Duration> metric,
Consumer<Duration> durationConsumer) {
collection
.metricValues(metric)
.forEach(v -> durationConsumer.accept(v));
}
/**
* Process a counter.
* @param collection metric collection
* @param metric metric
* @param consumer consumer
*/
private void counter(
MetricCollection collection,
SdkMetric<Integer> metric,
LongConsumer consumer) {
collection
.metricValues(metric)
.forEach(v -> consumer.accept(v.longValue()));
}
/**
* Metric collections can be nested. Exposes a stream of the given
* collection and its nested children.
* @param metrics initial collection
* @return a stream of all nested metric collections
*/
private static Stream<MetricCollection> recurseThroughChildren(
MetricCollection metrics) {
return Stream.concat(
Stream.of(metrics),
metrics.children().stream()
.flatMap(c -> recurseThroughChildren(c)));
}
}