KafkaStreamsBinderMetrics.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;
import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.streams.KafkaStreams;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
/**
* Kafka Streams binder metrics implementation that exports the metrics available
* through {@link KafkaStreams#metrics()} into a micrometer {@link io.micrometer.core.instrument.MeterRegistry}.
*
* Boot 2.2 users need to rely on this class for the metrics instead of direct support from Micrometer.
* Micrometer added Kafka Streams metrics support in 1.4.0 which Boot 2.3 includes.
* Therefore, the users who are on Boot 2.2, need to rely on these metrics.
* For users who are on 2.3 Boot, this class won't be activated (See the configuration for the various
* conditionals used).
*
* For the most part, this class is a copy of the Micrometer Kafka Streams support that was added in version 1.4.0.
* We will keep this class, as long as we support Boot 2.2.x.
*
* @author Soby Chacko
* @author Omer Celik
* @since 3.0.0
*/
public class KafkaStreamsBinderMetrics {
static final String DEFAULT_VALUE = "unknown";
static final String CLIENT_ID_TAG_NAME = "client-id";
static final String METRIC_GROUP_APP_INFO = "app-info";
static final String VERSION_METRIC_NAME = "version";
static final String START_TIME_METRIC_NAME = "start-time-ms";
static final String KAFKA_VERSION_TAG_NAME = "kafka-version";
static final String METRIC_NAME_PREFIX = "kafka.";
static final String METRIC_GROUP_METRICS_COUNT = "kafka-metrics-count";
private String kafkaVersion = DEFAULT_VALUE;
private String clientId = DEFAULT_VALUE;
private final MeterRegistry meterRegistry;
private MeterBinder meterBinder;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private volatile Set<MetricName> currentMeters = new HashSet<>();
private static final ReentrantLock metricsLock = new ReentrantLock();
public KafkaStreamsBinderMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void bindTo(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
if (this.meterBinder == null) {
this.meterBinder = registry -> {
if (streamsBuilderFactoryBeans != null) {
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
if (streamsBuilderFactoryBean.isRunning()) {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = Objects.requireNonNull(kafkaStreams).metrics();
prepareToBindMetrics(registry, metrics);
checkAndBindMetrics(registry, metrics);
}
}
}
};
}
this.meterBinder.bindTo(this.meterRegistry);
}
public void addMetrics(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
try {
metricsLock.lock();
this.bindTo(streamsBuilderFactoryBeans);
}
finally {
metricsLock.unlock();
}
}
void prepareToBindMetrics(MeterRegistry registry, Map<MetricName, ? extends Metric> metrics) {
Metric startTime = null;
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
if (clientId.equals(DEFAULT_VALUE) && name.tags().get(CLIENT_ID_TAG_NAME) != null) {
clientId = name.tags().get(CLIENT_ID_TAG_NAME);
}
if (METRIC_GROUP_APP_INFO.equals(name.group())) {
if (VERSION_METRIC_NAME.equals(name.name())) {
kafkaVersion = (String) entry.getValue().metricValue();
}
else if (START_TIME_METRIC_NAME.equals(name.name())) {
startTime = entry.getValue();
}
}
}
if (startTime != null) {
bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
}
}
private void bindMeter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
if (name.endsWith("total") || name.endsWith("count")) {
registerCounter(registry, metric, name, tags);
}
else {
registerGauge(registry, metric, name, tags);
}
}
private void registerCounter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
FunctionCounter.builder(name, metric, toMetricValue())
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}
private ToDoubleFunction<Metric> toMetricValue() {
return metric -> ((Number) metric.metricValue()).doubleValue();
}
private void registerGauge(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
Gauge.builder(name, metric, toMetricValue())
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}
private List<Tag> meterTags(Metric metric) {
return meterTags(metric, false);
}
private String meterName(Metric metric) {
String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
return name.replaceAll("-metrics", "").replaceAll("-", ".");
}
private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
List<Tag> tags = new ArrayList<>();
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value)));
tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, kafkaVersion));
return tags;
}
private boolean differentClient(List<Tag> tags) {
for (Tag tag : tags) {
if (tag.getKey().equals(CLIENT_ID_TAG_NAME)) {
if (!clientId.equals(tag.getValue())) {
return true;
}
}
}
return false;
}
void checkAndBindMetrics(MeterRegistry registry, Map<MetricName, ? extends Metric> metrics) {
if (!currentMeters.equals(metrics.keySet())) {
currentMeters = new HashSet<>(metrics.keySet());
metrics.forEach((name, metric) -> {
//Filter out non-numeric values
if (!(metric.metricValue() instanceof Number)) {
return;
}
//Filter out metrics from groups that include metadata
if (METRIC_GROUP_APP_INFO.equals(name.group())) {
return;
}
if (METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
return;
}
String meterName = meterName(metric);
List<Tag> meterTagsWithCommonTags = meterTags(metric, true);
//Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
//Remove meters with lower number of tags
boolean hasLessTags = false;
for (Meter other : registry.find(meterName).meters()) {
List<Tag> tags = other.getId().getTags();
// Only consider meters from the same client before filtering
if (differentClient(tags)) {
break;
}
if (tags.size() < meterTagsWithCommonTags.size()) {
registry.remove(other);
}
// Check if already exists
else if (tags.size() == meterTagsWithCommonTags.size()) {
if (tags.equals(meterTagsWithCommonTags)) {
return;
}
else {
break;
}
}
else {
hasLessTags = true;
}
}
if (hasLessTags) {
return;
}
bindMeter(registry, metric, meterName, meterTags(metric));
});
}
}
}