AbfsMetricsManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.LOG;
/**
* AbfsMetricsManager is responsible for managing metrics collection
* and emission for an AbfsClient instance.
*/
public class AbfsMetricsManager implements Closeable {
// Timer thread name for AbfsMetricsManager
public static final String ABFS_CLIENT_TIMER_THREAD_NAME
= "abfs-timer-client";
// Timer for scheduling metric emission tasks based on idle time
private Timer timer;
// URL for sending metrics
private URL metricUrl;
// Shared key credentials for metric account
private SharedKeyCredentials metricSharedkeyCredentials = null;
// Currently running TimerTask
private TimerTask runningTimerTask;
// Metric analysis periods
private final int metricAnalysisPeriod;
// Metric idle period
private final int metricIdlePeriod;
// Flag to indicate if a separate metric account is used
private boolean hasSeparateMetricAccount = false;
// Flag to indicate if metric collection is enabled
private final AtomicBoolean isMetricCollectionEnabled
= new AtomicBoolean(false);
// Metric format for metrics
private MetricFormat metricFormat;
// Flag to indicate if metric collection is stopped
private final AtomicBoolean isMetricCollectionStopped;
// AggregateMetricsManager instance
private final AggregateMetricsManager aggregateMetricsManager;
// Scheduler to emit aggregated metric based on time
private ScheduledExecutorService metricsEmitScheduler = null;
// AbfsConfiguration instance
private final AbfsConfiguration abfsConfiguration;
// AbfsCounters instance
private final AbfsCounters abfsCounters;
// File system ID
private final String fileSystemId;
// Storage account name
private final String accountName;
/**
* Constructor for AbfsMetricsManager.
*
* @param abfsConfiguration AbfsConfiguration object.
* @param abfsCounters AbfsCounters object.
* @param baseUrlString Base URL string of the AbfsClient.
* @param indexLastForwardSlash Index of last forward slash in the base URL string.
* @param accountName Storage account name.
* @param fileSystemId File system ID.
*/
public AbfsMetricsManager(final AbfsConfiguration abfsConfiguration,
final AbfsCounters abfsCounters, final String baseUrlString,
final int indexLastForwardSlash, final String accountName,
final String fileSystemId) {
this.abfsConfiguration = abfsConfiguration;
this.abfsCounters = abfsCounters;
this.fileSystemId = fileSystemId;
this.isMetricCollectionEnabled.set(
abfsConfiguration.isMetricsCollectionEnabled());
this.isMetricCollectionStopped = new AtomicBoolean(false);
this.aggregateMetricsManager = AggregateMetricsManager.getInstance(
abfsConfiguration.getMetricsEmitIntervalInMins(),
abfsConfiguration.getMaxMetricsCallsPerSecond());
this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
this.accountName = accountName;
if (isMetricCollectionEnabled()) {
try {
String metricAccountName = abfsConfiguration.getMetricAccount();
String metricAccountKey = abfsConfiguration.getMetricAccountKey();
this.metricFormat = abfsConfiguration.getMetricFormat();
if (isNotEmpty(metricAccountName) && isNotEmpty(
metricAccountKey)) {
int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
throw new InvalidUriException(
metricAccountName + " - account name is not fully qualified.");
}
try {
metricSharedkeyCredentials = new SharedKeyCredentials(
metricAccountName.substring(0, dotIndex),
metricAccountKey);
hasSeparateMetricAccount = true;
setMetricsUrl(metricAccountName.startsWith(HTTPS_SCHEME)
? metricAccountName : HTTPS_SCHEME + COLON
+ FORWARD_SLASH + FORWARD_SLASH + metricAccountName);
} catch (IllegalArgumentException e) {
throw new IOException(
"Exception while initializing metric credentials ", e);
}
} else {
setMetricsUrl(baseUrlString.substring(0, indexLastForwardSlash + 1));
}
// Once the metric URL is set, initialize the metrics
abfsCounters.initializeMetrics(metricFormat, abfsConfiguration);
// Metrics emitter scheduler
this.metricsEmitScheduler
= Executors.newSingleThreadScheduledExecutor();
// run every 1 minute to check the metrics count
this.metricsEmitScheduler.scheduleWithFixedDelay(
() -> {
if (abfsCounters.getAbfsBackoffMetrics()
.getMetricValue(TOTAL_NUMBER_OF_REQUESTS)
>= abfsConfiguration.getMetricsEmitThreshold()) {
emitCollectedMetrics();
}
},
abfsConfiguration.getMetricsEmitThresholdIntervalInSecs(),
abfsConfiguration.getMetricsEmitThresholdIntervalInSecs(),
TimeUnit.SECONDS);
// run every metricInterval minutes
this.metricsEmitScheduler.scheduleWithFixedDelay(
this::emitCollectedMetrics,
abfsConfiguration.getMetricsEmitIntervalInMins(),
abfsConfiguration.getMetricsEmitIntervalInMins(),
TimeUnit.MINUTES);
// emit metrics based on idea time
if (abfsConfiguration.shouldEmitMetricsOnIdleTime()) {
this.timer = new Timer(
ABFS_CLIENT_TIMER_THREAD_NAME, true);
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
} catch (Exception e) {
LOG.error("Metrics disabled. Failed to initialize metrics for {}",
baseUrlString, e);
this.isMetricCollectionEnabled.set(false);
}
}
}
/**
* Closes the metrics resources.
* This method cancels any running timer tasks, shuts down the metrics emission scheduler,
* and emits any collected metrics before closing.
*/
@Override
public void close() {
if (runningTimerTask != null) {
runningTimerTask.cancel();
runningTimerTask = null;
}
if (timer != null) {
timer.cancel();
timer = null;
}
if (metricsEmitScheduler != null && !metricsEmitScheduler.isShutdown()) {
metricsEmitScheduler.shutdownNow();
metricsEmitScheduler = null;
}
if (isMetricCollectionEnabled()) {
emitCollectedMetrics();
}
}
/**
* Retrieves a TracingContext object configured for metric tracking.
* This method creates a TracingContext object with the validated client correlation ID,
* the host name of the local machine (or "UnknownHost" if unable to determine),
* the file system operation type set to GET_ATTR, and additional configuration parameters
* for metric tracking.
* The TracingContext is intended for use in tracking metrics related to Azure Blob FileSystem (ABFS) operations.
*
* @return A TracingContext object configured for metric tracking.
*/
private synchronized String getMetricsData() {
String metrics = abfsCounters.toString();
if (StringUtils.isEmpty(metrics)) {
return null;
}
abfsCounters.initializeMetrics(metricFormat, abfsConfiguration);
return TracingContext.validateClientCorrelationID(
abfsConfiguration.getClientCorrelationId()) + COLON + fileSystemId
+ COLON + metrics;
}
/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
boolean timerOrchestrator(TimerFunctionality timerFunctionality,
TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isMetricCollectionEnabled() && isMetricCollectionStopped.get()) {
synchronized (this) {
if (isMetricCollectionStopped.get()) {
resumeTimer();
}
}
}
break;
case SUSPEND:
long now = System.currentTimeMillis();
long lastExecutionTime = abfsCounters.getLastExecutionTime().get();
if (isMetricCollectionEnabled() && (now - lastExecutionTime
>= metricAnalysisPeriod)) {
synchronized (this) {
if (!isMetricCollectionStopped.get()) {
timerTask.cancel();
timer.purge();
isMetricCollectionStopped.set(true);
return true;
}
}
}
break;
default:
break;
}
return false;
}
/**
* Resumes the timer for metric collection.
* This method sets the isMetricCollectionStopped flag to false
* and schedules a new TimerTaskImpl to run at fixed intervals
* defined by the metricIdlePeriod.
*/
private void resumeTimer() {
isMetricCollectionStopped.set(false);
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
/**
* Checks if metric collection is enabled.
*
* @return true if metric collection is enabled, false otherwise.
*/
public boolean isMetricCollectionEnabled() {
return isMetricCollectionEnabled.get() && fileSystemId != null;
}
/**
* Getter for metric URL.
*
* @return metricUrl
*/
@VisibleForTesting
public URL getMetricsUrl() {
return metricUrl;
}
/**
* Setter for metric URL.
* Converts blob URL to dfs URL in case of blob storage account.
*
* @param urlString to be set as metricUrl.
* @throws IOException if URL is malformed.
*/
private void setMetricsUrl(String urlString) throws IOException {
metricUrl = UriUtils.changeUrlFromBlobToDfs(new URL(urlString));
}
/**
* TimerTask implementation for emitting collected metrics based on ideal time.
* This class extends TimerTask and overrides the run method to
* check if the timer should be suspended based on the configured
* metric analysis period. If the timer is suspended, it triggers
* the emission of collected metrics.
*/
class TimerTaskImpl extends TimerTask {
TimerTaskImpl() {
runningTimerTask = this;
}
@Override
public void run() {
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
emitCollectedMetrics();
}
}
}
/**
* Emits the collected metrics by making a metric call to the Azure Blob FileSystem (ABFS).
* This method checks if metric collection is enabled and, if so, attempts to perform
* a metric call using the configured tracing context. Any IOException encountered during
* the metric call is logged and ignored to prevent termination of the timer task.
* Finally, it re-initializes the metrics in the AbfsCounters instance using the specified
* metric format.
*/
public void emitCollectedMetrics() {
if (!isMetricCollectionEnabled()) {
return;
}
this.aggregateMetricsManager.recordMetric(accountName, getMetricsData());
}
/**
* Getter for timer.
*/
@VisibleForTesting
protected Timer getTimer() {
return timer;
}
/**
* Getter for metricsEmitScheduler.
*/
@VisibleForTesting
ScheduledExecutorService getMetricsEmitScheduler() {
return metricsEmitScheduler;
}
/**
* @return true if metric account name and key are different from storage account.
*/
public boolean hasSeparateMetricAccount() {
return hasSeparateMetricAccount;
}
/**
* Getter for metric shared key credentials.
*/
public SharedKeyCredentials getMetricSharedkeyCredentials() {
return metricSharedkeyCredentials;
}
/**
* Getter for AggregateMetricsManager.
*/
public AggregateMetricsManager getAggregateMetricsManager() {
return aggregateMetricsManager;
}
}