KafkaStreamsVersionAgnosticTopologyInfoFacade.java

/*
 * Copyright 2022-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Objects;
import java.util.StringJoiner;

import org.apache.kafka.streams.KafkaStreams;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

/**
 * A facade to access topology info for a Kafka Streams application in a version agnostic
 * manner.
 * <p>Before kafka-streams 3.1 the topology exists at 'KafkaStreams.internalTopologyBuilder'.
 * Starting in kafka-streams 3.1 the topology exists at 'KafkaStreams.topologyMetadata'.
 *
 * @author Chris Bono
 * @author Soby Chacko
 * @since 3.2.6
 */
class KafkaStreamsVersionAgnosticTopologyInfoFacade {

	private final LogAccessor logger = new LogAccessor(KafkaStreamsVersionAgnosticTopologyInfoFacade.class);

	@Nullable
	private Field topologyInfoField;

	@Nullable
	private Method sourceTopicsForStoreMethod;

	private boolean sourceTopicsForStoreMethodHasTwoArgs;

	KafkaStreamsVersionAgnosticTopologyInfoFacade() {
		this(KafkaStreams.class);
	}

	KafkaStreamsVersionAgnosticTopologyInfoFacade(Class<?> rootClass) {

		// First look for KafkaStreams.internalTopologyBuilder (exists in kafka-streams <= 3.0)
		Field internalTopologyBuilderField = ReflectionUtils.findField(rootClass, "internalTopologyBuilder");
		if (internalTopologyBuilderField != null) {
			internalTopologyBuilderField.setAccessible(true);
			this.topologyInfoField = internalTopologyBuilderField;
			this.sourceTopicsForStoreMethod = ReflectionUtils.findMethod(internalTopologyBuilderField.getType(), "sourceTopicsForStore", String.class);
		}

		// Otherwise look for KafkaStreams.topologyMetadata (exists in kafka-streams >= 3.1)
		if (this.sourceTopicsForStoreMethod == null) {
			Field topologyMetadataField = ReflectionUtils.findField(rootClass, "topologyMetadata");
			if (topologyMetadataField != null) {
				topologyMetadataField.setAccessible(true);
				this.topologyInfoField = topologyMetadataField;
				this.sourceTopicsForStoreMethod = ReflectionUtils.findMethod(topologyMetadataField.getType(), "sourceTopicsForStore", String.class);
				if (this.sourceTopicsForStoreMethod == null) {
					// The sourceTopicsForStore method has extra arg in kafka-streams >= 3.3
					this.sourceTopicsForStoreMethod = ReflectionUtils.findMethod(topologyMetadataField.getType(), "sourceTopicsForStore", String.class, String.class);
					this.sourceTopicsForStoreMethodHasTwoArgs = true;
				}
			}
		}

		if (this.sourceTopicsForStoreMethod != null) {
			this.sourceTopicsForStoreMethod.setAccessible(true);
			logger.info(() -> "Using " + methodDescription(Objects.requireNonNull(this.sourceTopicsForStoreMethod)));
		}
		else {
			logger.warn("Could not find 'topologyMetadata.sourceTopicsForStore' or 'internalTopologyBuilder.sourceTopicsForStore' " +
					"from KafkaStreams class - will be unable to reason about state stores.");
		}
	}

	/**
	 * Determines if a state store is actually available to a KafkaStreams instance by
	 * querying the topology info source topics for the requested store.
	 *
	 * @param kafkaStreams the streams app
	 * @param storeName the name of the state store
	 * @return {@code true} if state store is available or {@code false} if the state store is
	 * 	not available or there was a problem reflecting on the topology info
	 */
	@SuppressWarnings("unchecked")
	boolean streamsAppActuallyHasStore(KafkaStreams kafkaStreams, String storeName) {
		if (this.sourceTopicsForStoreMethod == null) {
			logger.warn("Unable to reason about state store because sourceTopicsForStore method was not found - returning false");
			return false;
		}
		try {
			Object topologyInfo = ReflectionUtils.getField(Objects.requireNonNull(this.topologyInfoField), kafkaStreams);
			if (topologyInfo == null) {
				logger.warn("Unable to reason about state store because topologyInfo field was null - returning false");
				return false;
			}
			Object[] args = this.sourceTopicsForStoreMethodHasTwoArgs ?
					new Object[] { storeName, null } : new Object[] { storeName };
			Collection<String> sourceTopicsForStore = (Collection<String>)
					ReflectionUtils.invokeMethod(this.sourceTopicsForStoreMethod, topologyInfo, args);
			return !CollectionUtils.isEmpty(sourceTopicsForStore);
		}
		catch (Exception ex) {
			logger.error(ex, () -> "Unable to reason about state store due to error: " + ex.getMessage() + " - returning false");
		}
		return false;
	}

	private String methodDescription(Method method) {
		StringJoiner sj = new StringJoiner(",", method.getName() + "(", ")");
		for (Class<?> parameterType : method.getParameterTypes()) {
			sj.add(parameterType.getTypeName());
		}
		return "method " + method.getDeclaringClass().getTypeName() + '.' + sj.toString();
	}
}