MetricsBucket.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.utils.SimpleRateLimiter;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLOSING_SQUARE_BRACKET;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OPENING_SQUARE_BRACKET;
/**
* MetricsBucket holds metrics for multiple AbfsClients and
* dispatches them in batches, respecting rate limits.
*/
final class MetricsBucket {
// Logger for the class.
private static final Logger LOG = LoggerFactory.getLogger(MetricsBucket.class);
// Rate limiter to control the rate of dispatching metrics.
private final SimpleRateLimiter rateLimiter;
// Buffer to hold metrics before sending.
private final AtomicReference<ConcurrentLinkedQueue<String>> metricsBuffer =
new AtomicReference<>(new ConcurrentLinkedQueue<>());
// Set of registered AbfsClients.
private final Set<AbfsClient> clients =
ConcurrentHashMap.newKeySet();
// Maximum size of metrics header in characters.
private static final long MAX_HEADER_SIZE = 1024;
// Constructor
MetricsBucket(SimpleRateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
/**
* Register a new AbfsClient.
* @param client the AbfsClient to register
*/
public void registerClient(AbfsClient client) {
if (client != null) {
clients.add(client);
}
}
/**
* Deregister an AbfsClient. If this is the last client, drain and send
* any remaining metrics.
* @param client the AbfsClient to deregister
* @return true if the client was deregistered, false otherwise
*/
public boolean deregisterClient(AbfsClient client) {
if (client == null) {
return false;
}
ConcurrentLinkedQueue<String> batchToSend = null;
boolean isLastClient = false;
synchronized (this) {
if (!clients.contains(client)) {
return false;
}
if (clients.size() == 1) {
// This client is the last one ��� drain metrics now
batchToSend = metricsBuffer.getAndSet(new ConcurrentLinkedQueue<>());
isLastClient = true;
}
clients.remove(client);
}
if (isLastClient) {
sendMetrics(client, batchToSend);
}
return true;
}
/**
* Add a metric to the buffer.
* @param metric the metric to add
*/
void addRequest(String metric) {
if (metric != null) {
metricsBuffer.get().add(metric);
}
}
/**
* Drain the metrics buffer and send if there are registered clients.
*/
public void drainAndSendIfReady() {
AbfsClient client;
synchronized (this) {
if (clients.isEmpty()) {
return;
}
client = clients.iterator().next();
}
ConcurrentLinkedQueue<String> batch = metricsBuffer.getAndSet(
new ConcurrentLinkedQueue<>());
if (batch.isEmpty()) {
return;
}
sendMetrics(client, batch);
}
// Send metrics outside synchronized block
private void sendMetrics(AbfsClient client,
ConcurrentLinkedQueue<String> batchToSend) {
// Send outside synchronized block
if (client != null && batchToSend != null && !batchToSend.isEmpty()) {
for (String chunk : splitListBySize(batchToSend, MAX_HEADER_SIZE)) {
rateLimiter.acquire(5, TimeUnit.SECONDS); // Rate limiting
try {
client.getMetricCall(chunk);
} catch (IOException ignored) {
LOG.debug("Failed to send metrics: {}", ignored.getMessage());
}
}
}
}
// Check if there are no registered clients
public synchronized boolean isEmpty() {
return clients.isEmpty();
}
/**
* Split the list of metrics into chunks that fit within maxChars.
* Each metric is wrapped in square brackets and separated by colons.
*/
private static List<String> splitListBySize(
ConcurrentLinkedQueue<String> items, long maxChars) {
if (items.isEmpty()) {
return Collections.emptyList();
}
List<String> result = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (String s : items) {
String wrapped = OPENING_SQUARE_BRACKET + s + CLOSING_SQUARE_BRACKET;
int additional =
sb.length() == 0 ? wrapped.length()
: wrapped.length() + 1;
if (wrapped.length() > maxChars) {
if (sb.length() > 0) {
result.add(sb.toString());
sb.setLength(0);
}
result.add(wrapped);
continue;
}
if (sb.length() + additional > maxChars) {
result.add(sb.toString());
sb.setLength(0);
}
if (sb.length() > 0) {
sb.append(COLON);
}
sb.append(wrapped);
}
if (sb.length() > 0) {
result.add(sb.toString());
}
return result;
}
}