KafkaStreamsFunctionProcessor.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.FunctionConstants;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* @author Soby Chacko
* @author Byungjun You
* @author Georg Friedrich
* @author Omer Celik
* @since 2.2.0
*/
public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderProcessor implements BeanFactoryAware {
private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionProcessor.class);
private static final String OUTBOUND = "outbound";
private final BindingServiceProperties bindingServiceProperties;
private final Map<String, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
private BeanFactory beanFactory;
private final StreamFunctionProperties streamFunctionProperties;
private final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
StreamsBuilderFactoryBeanConfigurer customizer;
ConfigurableEnvironment environment;
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
CleanupConfig cleanupConfig,
StreamFunctionProperties streamFunctionProperties,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) {
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
keyValueSerdeResolver, cleanupConfig);
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.streamFunctionProperties = streamFunctionProperties;
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
this.customizer = customizer;
this.environment = environment;
}
private Map<String, ResolvableType> buildTypeMap(ResolvableType resolvableType,
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory,
Method method, String functionName) {
Map<String, ResolvableType> resolvableTypeMap = new LinkedHashMap<>();
if (method != null) { // Component functional bean.
final ResolvableType firstMethodParameter = ResolvableType.forMethodParameter(method, 0);
ResolvableType currentOutputGeneric = ResolvableType.forMethodReturnType(method);
final Set<String> inputs = new LinkedHashSet<>(kafkaStreamsBindableProxyFactory.getInputs());
final Iterator<String> iterator = inputs.iterator();
populateResolvableTypeMap(firstMethodParameter, resolvableTypeMap, iterator, method, functionName);
traverseReturnTypeForComponentBeans(resolvableTypeMap, currentOutputGeneric, inputs, iterator);
}
else if (resolvableType != null && resolvableType.getRawClass() != null) {
int inputCount = 1;
ResolvableType currentOutputGeneric;
if (resolvableType.getRawClass().isAssignableFrom(BiFunction.class) ||
resolvableType.getRawClass().isAssignableFrom(BiConsumer.class)) {
inputCount = 2;
currentOutputGeneric = resolvableType.getGeneric(2);
}
else {
currentOutputGeneric = resolvableType.getGeneric(1);
}
while (currentOutputGeneric.getRawClass() != null && functionOrConsumerFound(currentOutputGeneric)) {
inputCount++;
currentOutputGeneric = currentOutputGeneric.getGeneric(1);
}
final Set<String> inputs = new LinkedHashSet<>(kafkaStreamsBindableProxyFactory.getInputs());
final Iterator<String> iterator = inputs.iterator();
populateResolvableTypeMap(resolvableType, resolvableTypeMap, iterator);
ResolvableType iterableResType = resolvableType;
int i = resolvableType.getRawClass().isAssignableFrom(BiFunction.class) ||
resolvableType.getRawClass().isAssignableFrom(BiConsumer.class) ? 2 : 1;
ResolvableType outboundResolvableType;
if (i == inputCount) {
outboundResolvableType = iterableResType.getGeneric(i);
}
else {
while (i < inputCount && iterator.hasNext()) {
iterableResType = iterableResType.getGeneric(1);
if (iterableResType.getRawClass() != null &&
functionOrConsumerFound(iterableResType)) {
populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
}
i++;
}
outboundResolvableType = iterableResType.getGeneric(1);
}
resolvableTypeMap.put(OUTBOUND, outboundResolvableType);
}
return resolvableTypeMap;
}
private void traverseReturnTypeForComponentBeans(Map<String, ResolvableType> resolvableTypeMap, ResolvableType currentOutputGeneric,
Set<String> inputs, Iterator<String> iterator) {
final Class<?> outputRawclass = currentOutputGeneric.getRawClass();
if (outputRawclass != null && !outputRawclass.equals(Void.TYPE) || currentOutputGeneric.isArray()) {
ResolvableType iterableResType = currentOutputGeneric;
int i = 1;
// Traverse through the return signature.
while (i < inputs.size() && iterator.hasNext()) {
if (iterableResType.getRawClass() != null &&
functionOrConsumerFound(iterableResType)) {
populateResolvableTypeMap(iterableResType, resolvableTypeMap, iterator);
}
iterableResType = iterableResType.getGeneric(1);
i++;
}
if (iterableResType.getRawClass() != null && KStream.class.isAssignableFrom(iterableResType.getRawClass())
|| iterableResType.isArray() && iterableResType.getComponentType().getRawClass() != null
&& KStream.class.isAssignableFrom(iterableResType.getComponentType().getRawClass())) {
resolvableTypeMap.put(OUTBOUND, iterableResType);
}
}
}
private boolean functionOrConsumerFound(ResolvableType iterableResType) {
return Objects.requireNonNull(iterableResType.getRawClass()).equals(Function.class) ||
iterableResType.getRawClass().equals(Consumer.class);
}
private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap,
Iterator<String> iterator) {
final String next = iterator.next();
resolvableTypeMap.put(next, resolvableType.getGeneric(0));
if (resolvableType.getRawClass() != null &&
(resolvableType.getRawClass().isAssignableFrom(BiFunction.class) ||
resolvableType.getRawClass().isAssignableFrom(BiConsumer.class))
&& iterator.hasNext()) {
resolvableTypeMap.put(iterator.next(), resolvableType.getGeneric(1));
}
}
private void populateResolvableTypeMap(ResolvableType resolvableType, Map<String, ResolvableType> resolvableTypeMap,
Iterator<String> iterator, Method method, String functionName) {
final String next = iterator.next();
resolvableTypeMap.put(next, resolvableType);
if (method != null) {
final Object bean = beanFactory.getBean(functionName);
if (BiFunction.class.isAssignableFrom(bean.getClass()) || BiConsumer.class.isAssignableFrom(bean.getClass())) {
resolvableTypeMap.put(iterator.next(), ResolvableType.forMethodParameter(method, 1));
}
}
}
private ResolvableType checkOutboundForComposedFunctions(
ResolvableType outputResolvableType) {
ResolvableType currentOutputGeneric;
if (outputResolvableType.getRawClass() != null && outputResolvableType.getRawClass().isAssignableFrom(BiFunction.class)) {
currentOutputGeneric = outputResolvableType.getGeneric(2);
}
else {
currentOutputGeneric = outputResolvableType.getGeneric(1);
}
while (currentOutputGeneric.getRawClass() != null && functionOrConsumerFound(currentOutputGeneric)) {
currentOutputGeneric = currentOutputGeneric.getGeneric(1);
}
return currentOutputGeneric;
}
/**
* This method must be kept stateless. In the case of multiple function beans in an application,
* isolated {@link KafkaStreamsBindableProxyFactory} instances are passed in separately for those functions. If the
* state is shared between invocations, that will create potential race conditions. Hence, invocations of this method
* should not be dependent on state modified by a previous invocation.
*
* @param resolvableType type of the binding
* @param functionName bean name of the function
* @param kafkaStreamsBindableProxyFactory bindable proxy factory for the Kafka Streams type
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void setupFunctionInvokerForKafkaStreams(ResolvableType resolvableType, String functionName,
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory, Method method,
ResolvableType outputResolvableType,
String... composedFunctionNames) {
Map<String, ResolvableType> resolvableTypes;
if (method != null && composedFunctionNames.length > 0) { // composed component methods
resolvableTypes = buildTypeMap(resolvableType,
kafkaStreamsBindableProxyFactory, method, composedFunctionNames[0]);
}
else if (method != null) { // non-composed component beans
resolvableTypes = buildTypeMap(resolvableType,
kafkaStreamsBindableProxyFactory, method, functionName);
}
else { // all other cases
resolvableTypes = buildTypeMap(resolvableType, kafkaStreamsBindableProxyFactory, null, functionName);
}
ResolvableType outboundResolvableType;
if (outputResolvableType != null) {
outboundResolvableType = checkOutboundForComposedFunctions(outputResolvableType);
resolvableTypes.remove(OUTBOUND);
}
else {
outboundResolvableType = resolvableTypes.remove(OUTBOUND);
}
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(resolvableTypes, functionName);
try {
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(Consumer.class)) {
Consumer<Object> consumer = (Consumer) this.beanFactory.getBean(functionName);
consumer.accept(adaptedInboundArguments[0]);
}
else if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(BiConsumer.class)) {
BiConsumer<Object, Object> biConsumer = (BiConsumer) this.beanFactory.getBean(functionName);
biConsumer.accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
}
else if (method != null) { // Handling component functional beans
final Object bean = composedFunctionNames.length > 0 ? beanFactory.getBean(composedFunctionNames[0])
: beanFactory.getBean(functionName);
if (Consumer.class.isAssignableFrom(bean.getClass())) {
((Consumer) bean).accept(adaptedInboundArguments[0]);
}
else if (BiConsumer.class.isAssignableFrom(bean.getClass())) {
((BiConsumer) bean).accept(adaptedInboundArguments[0], adaptedInboundArguments[1]);
}
else if (Function.class.isAssignableFrom(bean.getClass()) || BiFunction.class.isAssignableFrom(bean.getClass())) {
Object result;
if (composedFunctionNames.length > 0) {
result = handleComposedFunctions(adaptedInboundArguments, null, composedFunctionNames);
}
else {
if (BiFunction.class.isAssignableFrom(bean.getClass())) {
result = ((BiFunction) bean).apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
}
else {
result = ((Function) bean).apply(adaptedInboundArguments[0]);
}
result = handleCurriedFunctions(adaptedInboundArguments, result);
}
if (result != null) {
final Set<String> outputs = new TreeSet<>(kafkaStreamsBindableProxyFactory.getOutputs());
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
if (result.getClass().isArray()) {
final String initialInput = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean =
this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory,
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
if (KTable.class.isAssignableFrom(result.getClass())) {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
}
}
}
}
}
else {
Object result = null;
if (resolvableType.getRawClass() != null && resolvableType.getRawClass().equals(BiFunction.class)) {
if (composedFunctionNames != null && composedFunctionNames.length > 0) {
result = handleComposedFunctions(adaptedInboundArguments, result, composedFunctionNames);
}
else {
BiFunction<Object, Object, Object> biFunction = (BiFunction) beanFactory.getBean(functionName);
result = biFunction.apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
result = handleCurriedFunctions(adaptedInboundArguments, result);
}
}
else {
if (composedFunctionNames != null && composedFunctionNames.length > 0) {
result = handleComposedFunctions(adaptedInboundArguments, result, composedFunctionNames);
}
else {
Function<Object, Object> function = (Function) beanFactory.getBean(functionName);
result = function.apply(adaptedInboundArguments[0]);
result = handleCurriedFunctions(adaptedInboundArguments, result);
}
}
if (result != null) {
final Set<String> outputs = new TreeSet<>(kafkaStreamsBindableProxyFactory.getOutputs());
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
if (result.getClass().isArray()) {
final String initialInput = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean =
this.kafkaStreamsBindingInformationCatalogue.getStreamsBuilderFactoryBeanPerBinding().get(initialInput);
handleKStreamArrayOutbound(resolvableType, functionName, kafkaStreamsBindableProxyFactory,
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
if (KTable.class.isAssignableFrom(result.getClass())) {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
}
}
}
}
}
catch (Exception ex) {
throw new BeanInitializationException("Cannot setup function invoker for this Kafka Streams function.", ex);
}
}
@SuppressWarnings({"unchecked"})
private Object handleComposedFunctions(Object[] adaptedInboundArguments, Object result, String... composedFunctionNames) {
Object bean = beanFactory.getBean(composedFunctionNames[0]);
if (BiFunction.class.isAssignableFrom(bean.getClass())) {
result = ((BiFunction<Object, Object, Object>) bean).apply(adaptedInboundArguments[0], adaptedInboundArguments[1]);
}
else if (Function.class.isAssignableFrom(bean.getClass())) {
result = ((Function<Object, Object>) bean).apply(adaptedInboundArguments[0]);
}
// If the return is a curried function, apply it
result = handleCurriedFunctions(adaptedInboundArguments, result);
// Apply composed functions
return applyComposedFunctions(result, composedFunctionNames);
}
@SuppressWarnings({"unchecked"})
private Object applyComposedFunctions(Object result, String[] composedFunctionNames) {
for (int i = 1; i < composedFunctionNames.length; i++) {
final Object bean = beanFactory.getBean(composedFunctionNames[i]);
if (Consumer.class.isAssignableFrom(bean.getClass())) {
((Consumer<Object>) bean).accept(result);
result = null;
}
else if (Function.class.isAssignableFrom(bean.getClass())) {
result = ((Function<Object, Object>) bean).apply(result);
}
else {
throw new IllegalStateException("You can only compose functions of type either java.util.function.Function or java.util.function.Consumer.");
}
}
return result;
}
@SuppressWarnings({"unchecked"})
private Object handleCurriedFunctions(Object[] adaptedInboundArguments, Object result) {
int i = 1;
while (result instanceof Function || result instanceof Consumer) {
if (result instanceof Function function) {
result = function.apply(adaptedInboundArguments[i]);
}
else {
((Consumer<Object>) result).accept(adaptedInboundArguments[i]);
result = null;
}
i++;
}
return result;
}
private void handleSingleKStreamOutbound(Map<String, ResolvableType> resolvableTypes, ResolvableType outboundResolvableType,
KStream<Object, Object> result, Iterator<String> outboundDefinitionIterator) {
if (outboundDefinitionIterator.hasNext()) {
String outbound = outboundDefinitionIterator.next();
Object targetBean = handleSingleKStreamOutbound(result, outbound);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean,
outboundResolvableType);
final String next = resolvableTypes.keySet().iterator().next();
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(next);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(outbound, streamsBuilderFactoryBean);
}
}
private Object handleSingleKStreamOutbound(KStream<Object, Object> result, String next) {
Object targetBean = this.applicationContext.getBean(next);
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap(result);
return targetBean;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void handleKStreamArrayOutbound(ResolvableType resolvableType, String functionName,
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory,
ResolvableType outboundResolvableType, Object[] result,
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
// Binding target as the output bindings were deferred in the KafkaStreamsBindableProxyFactory
// due to the fact that it didn't know the returned array size. At this point in the execution,
// we know exactly the number of outbound components (from the array length), so do the binding.
final int length = result.length;
List<String> outputBindings = getOutputBindings(functionName, length);
Iterator<String> iterator = outputBindings.iterator();
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
for (Object o : result) {
String next = iterator.next();
kafkaStreamsBindableProxyFactory.addOutputBinding(next, KStream.class);
RootBeanDefinition rootBeanDefinition1 = new RootBeanDefinition();
rootBeanDefinition1.setInstanceSupplier(() -> kafkaStreamsBindableProxyFactory.getOutputHolders().get(next).boundTarget());
registry.registerBeanDefinition(next, rootBeanDefinition1);
Object targetBean = this.applicationContext.getBean(next);
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap((KStream) o);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(next, streamsBuilderFactoryBean);
}
}
private List<String> getOutputBindings(String functionName, int outputs) {
List<String> outputBindings = this.streamFunctionProperties.getOutputBindings(functionName);
List<String> outputBindingNames = new ArrayList<>();
if (!CollectionUtils.isEmpty(outputBindings)) {
outputBindingNames.addAll(outputBindings);
return outputBindingNames;
}
else {
for (int i = 0; i < outputs; i++) {
outputBindingNames.add(String.format("%s-%s-%d", functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX, i));
}
}
return outputBindingNames;
}
@SuppressWarnings({"unchecked"})
private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> stringResolvableTypeMap,
String functionName) {
Object[] arguments = new Object[stringResolvableTypeMap.size()];
int i = 0;
for (String input : stringResolvableTypeMap.keySet()) {
Class<?> parameterType = stringResolvableTypeMap.get(input).getRawClass();
if (input != null) {
Object targetBean = applicationContext.getBean(input);
BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(input);
//Retrieve the StreamsConfig created for this method if available.
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext,
input, kafkaStreamsBinderConfigurationProperties, customizer, this.environment, bindingProperties);
this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
}
try {
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
this.methodStreamsBuilderFactoryBeanMap.get(functionName);
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
final String applicationId = Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration())
.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
KafkaStreamsConsumerProperties extendedConsumerProperties =
this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(input);
extendedConsumerProperties.setApplicationId(applicationId);
//get state store spec
Serde<?> keySerde = this.keyValueSerdeResolver.getInboundKeySerde(extendedConsumerProperties, stringResolvableTypeMap.get(input));
LOG.info("Key Serde used for " + input + ": " + keySerde.getClass().getName());
Serde<?> valueSerde = bindingServiceProperties.getConsumerProperties(input).isUseNativeDecoding() ?
getValueSerde(input, extendedConsumerProperties, stringResolvableTypeMap.get(input)) : Serdes.ByteArray();
LOG.info("Value Serde used for " + input + ": " + valueSerde.getClass().getName());
final Topology.AutoOffsetReset autoOffsetReset = getAutoOffsetReset(input, extendedConsumerProperties);
if (Objects.requireNonNull(parameterType).isAssignableFrom(KStream.class)) {
KStream<?, ?> stream = getKStream(input, bindingProperties, extendedConsumerProperties,
streamsBuilder, keySerde, valueSerde, autoOffsetReset, i == 0);
KStreamBoundElementFactory.KStreamWrapper kStreamWrapper =
(KStreamBoundElementFactory.KStreamWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KStream)
kStreamWrapper.wrap((KStream<Object, Object>) stream);
this.kafkaStreamsBindingInformationCatalogue.addKeySerde((KStream<?, ?>) kStreamWrapper, keySerde);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean,
bindingServiceProperties.getConsumerProperties(input));
if (KStream.class.isAssignableFrom(Objects.requireNonNull(stringResolvableTypeMap.get(input).getRawClass()))) {
final Class<?> valueClass =
(stringResolvableTypeMap.get(input).getGeneric(1).getRawClass() != null)
? (stringResolvableTypeMap.get(input).getGeneric(1).getRawClass()) : Object.class;
if (this.kafkaStreamsBindingInformationCatalogue.isUseNativeDecoding(
(KStream<?, ?>) kStreamWrapper)) {
arguments[i] = stream;
}
else {
arguments[i] = this.kafkaStreamsMessageConversionDelegate.deserializeOnInbound(
valueClass, stream);
}
}
if (arguments[i] == null) {
arguments[i] = stream;
}
Assert.notNull(arguments[i], "Problems encountered while adapting the function argument.");
}
else {
handleKTableGlobalKTableInputs(arguments, i, input, parameterType, targetBean, streamsBuilderFactoryBean,
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, i == 0);
}
i++;
}
catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
}
return arguments;
}
@Override
public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}