AbstractKafkaBinderHealthIndicator.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.common;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.health.actuate.endpoint.StatusAggregator;
import org.springframework.boot.health.contributor.AbstractHealthIndicator;
import org.springframework.boot.health.contributor.Health;
import org.springframework.boot.health.contributor.Status;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.util.Assert;

/**
 * Base class that abstracts the common health indicator details for the various Kafka binder flavors.
 *
 * @author Soby Chacko
 * @since 4.1.0
 */
public abstract class AbstractKafkaBinderHealthIndicator extends AbstractHealthIndicator implements DisposableBean {

	private static final int DEFAULT_TIMEOUT = 60;

	protected int timeout = DEFAULT_TIMEOUT;

	private final ExecutorService executor;

	protected Consumer<?, ?> metadataConsumer;

	// if the binder detects that a partition for the topic
	// is without a leader, mark the binder health as DOWN.
	protected boolean considerDownWhenAnyPartitionHasNoLeader = true;

	private final ConsumerFactory<?, ?> consumerFactory;

	public AbstractKafkaBinderHealthIndicator(ConsumerFactory<?, ?> consumerFactory) {
		this.consumerFactory = consumerFactory;
		this.executor = createHealthBinderExecutorService();
		Assert.notNull(this.executor, "The health indicator executor service must not be null");
	}

	protected abstract Map<String, TopicInformation> getTopicsInUse();

	protected abstract Health buildBinderSpecificHealthDetails();

	protected  abstract ExecutorService createHealthBinderExecutorService();

	private void initMetadataConsumer() {
		if (this.metadataConsumer == null) {
			this.metadataConsumer = this.consumerFactory.createConsumer();
		}
	}

	@Override
	public void destroy() {
		executor.shutdown();
		if (this.metadataConsumer != null) {
			this.metadataConsumer.close();
		}
	}

	@Override
	protected void doHealthCheck(Health.Builder builder) throws Exception {
		Health topicsHealth = safelyBuildTopicsHealth();
		Health listenerContainersHealth = buildBinderSpecificHealthDetails();
		merge(topicsHealth, listenerContainersHealth, builder);
	}

	protected Health safelyBuildTopicsHealth() {
		Future<Health> future = executor.submit(this::buildTopicsHealth);
		try {
			return future.get(this.timeout, TimeUnit.SECONDS);
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return Health.down()
				.withDetail("Interrupted while waiting for partition information in",
					this.timeout + " seconds")
				.build();
		}
		catch (ExecutionException ex) {
			return Health.down(ex).build();
		}
		catch (TimeoutException ex) {
			return Health.down().withDetail("Failed to retrieve partition information in",
				this.timeout + " seconds").build();
		}
	}

	private Health buildTopicsHealth() {
		try {
			initMetadataConsumer();
			Set<String> downMessages = new HashSet<>();
			Set<String> checkedTopics = new HashSet<>();
			final Map<String, TopicInformation> topicsInUse = getTopicsInUse();
			if (topicsInUse.isEmpty()) {
				try {
					this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
				}
				catch (Exception e) {
					return Health.down().withDetail("No topic information available",
						"Kafka broker is not reachable").build();
				}
				return Health.unknown().withDetail("No bindings found",
					"Kafka binder may not be bound to destinations on the broker").build();
			}
			else {
				for (String topic : topicsInUse.keySet()) {
					TopicInformation topicInformation = topicsInUse
						.get(topic);
					if (!topicInformation.isTopicPattern()) {
						List<PartitionInfo> partitionInfos = this.metadataConsumer
							.partitionsFor(topic);
						for (PartitionInfo partitionInfo : partitionInfos) {
							if (topicInformation.partitionInfos()
								.contains(partitionInfo)
								&& partitionInfo.leader() == null ||
								(partitionInfo.leader() != null && partitionInfo.leader().id() == -1)) {
								downMessages.add(partitionInfo.toString());
							}
							else if (this.considerDownWhenAnyPartitionHasNoLeader &&
								partitionInfo.leader() == null || (partitionInfo.leader() != null && partitionInfo.leader().id() == -1)) {
								downMessages.add(partitionInfo.toString());
							}
						}
						checkedTopics.add(topic);
					}
					else {
						try {
							// Since destination is a pattern, all we are doing is just to make sure that
							// we can connect to the cluster and query the topics.
							this.metadataConsumer.listTopics(Duration.ofSeconds(this.timeout));
						}
						catch (Exception ex) {
							return Health.down()
								.withDetail("Cluster not connected",
									"Destination provided is a pattern, but cannot connect to the cluster for any verification")
								.build();
						}
					}
				}
			}
			if (downMessages.isEmpty()) {
				return Health.up().withDetail("topicsInUse", checkedTopics).build();
			}
			else {
				return Health.down()
					.withDetail("Following partitions in use have no leaders: ",
						downMessages.toString())
					.build();
			}
		}
		catch (Exception ex) {
			return Health.down(ex).build();
		}
	}

	private void merge(Health topicsHealth, Health listenerContainersHealth, Health.Builder builder) {
		Status aggregatedStatus = StatusAggregator.getDefault()
			.getAggregateStatus(topicsHealth.getStatus(), listenerContainersHealth.getStatus());
		Map<String, Object> aggregatedDetails = new HashMap<>();
		aggregatedDetails.putAll(topicsHealth.getDetails());
		aggregatedDetails.putAll(listenerContainersHealth.getDetails());
		builder.status(aggregatedStatus).withDetails(aggregatedDetails);
	}

	/**
	 * Set the timeout in seconds to retrieve health information.
	 *
	 * @param timeout the timeout - default 60.
	 */
	public void setTimeout(int timeout) {
		this.timeout = timeout;
	}

	public void setConsiderDownWhenAnyPartitionHasNoLeader(boolean considerDownWhenAnyPartitionHasNoLeader) {
		this.considerDownWhenAnyPartitionHasNoLeader = considerDownWhenAnyPartitionHasNoLeader;
	}
}