KafkaBinderMetrics.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.noop.NoopGauge;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.Lifecycle;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;

/**
 * Metrics for Kafka binder.
 *
 * @author Henryk Konsek
 * @author Soby Chacko
 * @author Artem Bilan
 * @author Oleg Zhurakousky
 * @author Jon Schneider
 * @author Thomas Cheyney
 * @author Gary Russell
 * @author Lars Bilger
 * @author Tomek Szmytka
 * @author Nico Heller
 * @author Kurt Hong
 * @author Omer Celik
 */
public class KafkaBinderMetrics
		implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable, Lifecycle {

	private static final int DEFAULT_TIMEOUT = 5;

	private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);

	/**
	 * Offset lag micrometer metric name. This can be used for meter filtering.
	 */
	public static final String OFFSET_LAG_METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";

	private final KafkaMessageChannelBinder binder;

	private final KafkaBinderConfigurationProperties binderConfigurationProperties;

	private ConsumerFactory<?, ?> defaultConsumerFactory;

	private final MeterRegistry meterRegistry;

	private Map<String, Consumer<?, ?>> metadataConsumers;

	private int timeout = DEFAULT_TIMEOUT;

	private final Map<String, Long> lastUnconsumedMessagesValues = new ConcurrentHashMap<>();

	ScheduledExecutorService scheduler;

	private final ReentrantLock consumerFactoryLock = new ReentrantLock();

	private final AtomicBoolean running = new AtomicBoolean();

	public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
							KafkaBinderConfigurationProperties binderConfigurationProperties,
							ConsumerFactory<?, ?> defaultConsumerFactory,
							@Nullable MeterRegistry meterRegistry) {

		this.binder = binder;
		this.binderConfigurationProperties = binderConfigurationProperties;
		this.defaultConsumerFactory = defaultConsumerFactory;
		this.meterRegistry = meterRegistry;
		this.metadataConsumers = new ConcurrentHashMap<>();
	}

	public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
							KafkaBinderConfigurationProperties binderConfigurationProperties) {

		this(binder, binderConfigurationProperties, null, null);
	}

	public void setTimeout(int timeout) {
		this.timeout = timeout;
	}

	@Override
	public void bindTo(MeterRegistry registry) {
		/*
		 * We can't just replace one scheduler with another.
		 * Before and even after the old one is gathered by GC, it's threads still exist, consume memory and CPU resources to switch contexts.
		 * Theoretically, as a result of processing n topics, there will be about (1+n)*n/2 threads simultaneously at the same time.
		 */
		if (this.scheduler != null) {
			LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) scheduler).getPoolSize() + " threads");
			this.scheduler.shutdownNow();
		}

		this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());

		for (Map.Entry<String, TopicInformation> topicInfo : this.binder
				.getTopicsInUse().entrySet()) {

			if (!topicInfo.getValue().isConsumerTopic()) {
				continue;
			}

			String topic = topicInfo.getKey();
			String group = topicInfo.getValue().consumerGroup();

			ToDoubleFunction<KafkaBinderMetrics> offsetComputation = computeOffsetComputationFunction(topic, group);
			final Gauge register = Gauge.builder(OFFSET_LAG_METRIC_NAME, this, offsetComputation)
				.tag("group", group)
				.tag("topic", topic)
				.description("Unconsumed messages for a particular group and topic")
				.register(registry);

			if (!(register instanceof NoopGauge)) {
				lastUnconsumedMessagesValues.put(topic + "-" + group, 0L);
				this.scheduler.scheduleWithFixedDelay(
					() -> computeUnconsumedMessages(topic, group),
			1,
					binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(),
					TimeUnit.SECONDS
				);
			}
		}
	}

	private ToDoubleFunction<KafkaBinderMetrics> computeOffsetComputationFunction(String topic, String group) {
		if (this.binderConfigurationProperties.getMetrics().isDefaultOffsetLagMetricsEnabled()) {
			return (o) -> computeAndGetUnconsumedMessagesWithTimeout(topic, group);
		}
		else {
			return (o) -> lastUnconsumedMessagesValues.get(topic + "-" + group);
		}
	}

	private long computeAndGetUnconsumedMessagesWithTimeout(String topic, String group) {
		Future<Long> future = scheduler.submit(() -> computeUnconsumedMessages(topic, group));
		try {
			return future.get(this.timeout, TimeUnit.SECONDS);
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return lastUnconsumedMessagesValues.get(topic + "-" + group);
		}
		catch (ExecutionException | TimeoutException ex) {
			return lastUnconsumedMessagesValues.get(topic + "-" + group);
		}
	}

	private long computeUnconsumedMessages(String topic, String group) {
		long lag = 0;
		try {
			lag = findTotalTopicGroupLag(topic, group, this.metadataConsumers);
			this.lastUnconsumedMessagesValues.put(topic + "-" + group, lag);
		}
		catch (Exception ex) {
			LOG.debug("Cannot generate metric for topic: " + topic, ex);
		}
		return lag;
	}

	private long findTotalTopicGroupLag(String topic, String group, Map<String, Consumer<?, ?>> metadataConsumers) {
		long lag = 0;
		Consumer<?, ?> metadataConsumer = metadataConsumers.computeIfAbsent(
				group,
				(g) -> createConsumerFactory().createConsumer(g, "monitoring"));
		List<PartitionInfo> partitionInfos = metadataConsumer
				.partitionsFor(topic);
		List<TopicPartition> topicPartitions = new LinkedList<>();
		for (PartitionInfo partitionInfo : partitionInfos) {
			topicPartitions.add(new TopicPartition(partitionInfo.topic(),
					partitionInfo.partition()));
		}

		Map<TopicPartition, Long> endOffsets = metadataConsumer
				.endOffsets(topicPartitions);

		final Map<TopicPartition, OffsetAndMetadata> committedOffsets = metadataConsumer.committed(endOffsets.keySet());
		final Map<TopicPartition, Long> beginningOffsets = metadataConsumer.beginningOffsets(endOffsets.keySet());

		for (Map.Entry<TopicPartition, Long> endOffset : endOffsets
				.entrySet()) {
			OffsetAndMetadata current = committedOffsets.get(endOffset.getKey());
			Long beginningOffset = beginningOffsets.get(endOffset.getKey());
			lag += endOffset.getValue();
			if (current != null) {
				lag -= current.offset();
			}
			else if (beginningOffset != null) {
				lag -= beginningOffset;
			}
		}
		return lag;
	}

	/**
	 * Double-Checked Locking Optimization was used to avoid unnecessary locking overhead.
	 */
	private ConsumerFactory<?, ?> createConsumerFactory() {
		if (this.defaultConsumerFactory == null) {
			try {
				this.consumerFactoryLock.lock();
				if (this.defaultConsumerFactory == null) {
					Map<String, Object> props = new HashMap<>();
					props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
						ByteArrayDeserializer.class);
					props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
						ByteArrayDeserializer.class);
					Map<String, Object> mergedConfig = this.binderConfigurationProperties
						.mergedConsumerConfiguration();
					if (!ObjectUtils.isEmpty(mergedConfig)) {
						props.putAll(mergedConfig);
					}
					if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
						props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
							this.binderConfigurationProperties
								.getKafkaConnectionString());
					}
					this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(
						props);
				}
			}
			finally {
				this.consumerFactoryLock.unlock();
			}
		}
		return this.defaultConsumerFactory;
	}

	@Override
	public void onApplicationEvent(BindingCreatedEvent event) {
		if (this.meterRegistry != null) {
			// It is safe to call bindTo multiple times, since meters are idempotent when called with the same arguments
			this.bindTo(this.meterRegistry);
		}
	}

	@Override
	public void close() {
		if (this.scheduler != null) {
			this.consumerFactoryLock.lock();
			try {
				if (this.meterRegistry != null) {
					this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
				}
				this.scheduler.shutdownNow();
				try {
					this.scheduler.awaitTermination(
						binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(),
						TimeUnit.SECONDS);
				}
				catch (InterruptedException ex) {
					Thread.currentThread().interrupt();
					ReflectionUtils.rethrowRuntimeException(ex);
				}
			}
			finally {
				this.scheduler = null;
				this.metadataConsumers.values().forEach(Consumer::close);
				this.metadataConsumers.clear();
				this.consumerFactoryLock.unlock();
			}
		}
	}

	@Override
	public void start() {
		this.running.set(true);
		// Nothing else to do here. The 'bindTo()' is called from the 'onApplicationEvent()',
		// which, in turn, is emitted from the 'AbstractBindingLifecycle.start()' logic.
	}

	@Override
	public void stop() {
		if (this.running.compareAndSet(true, false)) {
			close();
		}
	}

	@Override
	public boolean isRunning() {
		return this.running.get();
	}

}