KafkaStreamsBinderHealthIndicator.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.health.contributor.AbstractHealthIndicator;
import org.springframework.boot.health.contributor.Health;
import org.springframework.boot.health.contributor.Status;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
/**
* Health indicator for Kafka Streams.
*
* @author Arnaud Jardin��
* @author Soby Chacko
*/
public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator implements DisposableBean {
/**
* Static initialization for detecting whether the application is using Kafka client 2.5 vs lower versions.
*/
private static final ClassLoader CLASS_LOADER = KafkaStreamsBinderHealthIndicator.class.getClassLoader();
private static boolean isKafkaStreams25 = true;
private static Method methodForIsRunning;
static {
try {
Class<?> KAFKA_STREAMS_STATE_CLASS = CLASS_LOADER.loadClass("org.apache.kafka.streams.KafkaStreams$State");
Method[] declaredMethods = KAFKA_STREAMS_STATE_CLASS.getDeclaredMethods();
for (Method m : declaredMethods) {
if (m.getName().equals("isRunning")) {
isKafkaStreams25 = false;
methodForIsRunning = m;
}
}
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("KafkaStreams$State class not found", e);
}
}
private final KafkaStreamsRegistry kafkaStreamsRegistry;
private final KafkaStreamsBinderConfigurationProperties configurationProperties;
private final Map<String, Object> adminClientProperties;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private AdminClient adminClient;
private final Lock lock = new ReentrantLock();
KafkaStreamsBinderHealthIndicator(KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
super("Kafka-streams health check failed");
kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaStreamsBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties();
KafkaTopicProvisioner.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaStreamsBinderConfigurationProperties);
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
try {
this.lock.lock();
if (this.adminClient == null) {
this.adminClient = AdminClient.create(this.adminClientProperties);
}
final ListTopicsResult listTopicsResult = this.adminClient.listTopics();
listTopicsResult.listings().get(this.configurationProperties.getHealthTimeout(), TimeUnit.SECONDS);
if (this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeans().isEmpty()) {
builder.withDetail("No Kafka Streams bindings have been established", "Kafka Streams binder did not detect any processors");
builder.status(Status.UNKNOWN);
}
else {
boolean up = true;
final Set<KafkaStreams> kafkaStreams = kafkaStreamsRegistry.getKafkaStreams();
Set<KafkaStreams> allKafkaStreams = new HashSet<>(kafkaStreams);
if (this.configurationProperties.isIncludeStoppedProcessorsForHealthCheck()) {
allKafkaStreams.addAll(kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().values());
}
for (KafkaStreams kStream : allKafkaStreams) {
if (isKafkaStreams25) {
up &= kStream.state().isRunningOrRebalancing();
}
else {
// if Kafka client version is lower than 2.5, then call the method reflectively.
final boolean isRunningInvokedResult = (boolean) methodForIsRunning.invoke(kStream.state());
up &= isRunningInvokedResult;
}
builder.withDetails(buildDetails(kStream));
}
builder.status(up ? Status.UP : Status.DOWN);
}
}
catch (Exception e) {
builder.withDetail("No topic information available", "Kafka broker is not reachable");
builder.status(Status.DOWN);
builder.withException(e);
}
finally {
this.lock.unlock();
}
}
private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) throws Exception {
final Map<String, Object> details = new HashMap<>();
final Map<String, Object> perAppdIdDetails = new HashMap<>();
boolean isRunningResult;
if (isKafkaStreams25) {
isRunningResult = kafkaStreams.state().isRunningOrRebalancing();
}
else {
// if Kafka client version is lower than 2.5, then call the method reflectively.
isRunningResult = (boolean) methodForIsRunning.invoke(kafkaStreams.state());
}
if (isRunningResult) {
final Set<ThreadMetadata> threadMetadata = kafkaStreams.metadataForLocalThreads();
final Map<String, Object> threadDetails = new HashMap<>();
for (ThreadMetadata metadata : threadMetadata) {
final Map<String, Object> threadDetail = new HashMap<>();
threadDetail.put("threadName", metadata.threadName());
threadDetail.put("threadState", metadata.threadState());
threadDetail.put("adminClientId", metadata.adminClientId());
threadDetail.put("consumerClientId", metadata.consumerClientId());
threadDetail.put("restoreConsumerClientId", metadata.restoreConsumerClientId());
threadDetail.put("producerClientIds", metadata.producerClientIds());
threadDetail.put("activeTasks", taskDetails(metadata.activeTasks()));
threadDetail.put("standbyTasks", taskDetails(metadata.standbyTasks()));
threadDetails.put(metadata.threadName(), threadDetail);
}
perAppdIdDetails.put("threadDetails", threadDetails);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
details.put(applicationId, perAppdIdDetails);
}
else {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
String applicationId = null;
if (streamsBuilderFactoryBean != null) {
applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
}
else {
final Map<String, KafkaStreams> stoppedKafkaStreamsPerBinding = kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams();
for (String appId : stoppedKafkaStreamsPerBinding.keySet()) {
if (stoppedKafkaStreamsPerBinding.get(appId).equals(kafkaStreams)) {
applicationId = appId;
}
}
}
details.put(applicationId, String.format("The processor with application.id %s is down. Current state: %s", applicationId, kafkaStreams.state()));
}
return details;
}
private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadata) {
final Map<String, Object> details = new HashMap<>();
for (TaskMetadata metadata : taskMetadata) {
details.put("taskId", metadata.taskId());
if (details.containsKey("partitions")) {
@SuppressWarnings("unchecked")
List<String> partitionsInfo = (List<String>) details.get("partitions");
partitionsInfo.addAll(addPartitionsInfo(metadata));
}
else {
details.put("partitions",
addPartitionsInfo(metadata));
}
}
return details;
}
private static List<String> addPartitionsInfo(TaskMetadata metadata) {
return metadata.topicPartitions().stream().map(
p -> "partition=" + p.partition() + ", topic=" + p.topic())
.collect(Collectors.toList());
}
@Override
public void destroy() throws Exception {
if (adminClient != null) {
adminClient.close(Duration.ofSeconds(0));
}
}
}