AbstractKafkaStreamsBinderProcessor.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.context.properties.bind.BindContext;
import org.springframework.boot.context.properties.bind.BindHandler;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.lang.NonNull;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* @author Soby Chacko
* @author Ralf Wiedmann
* @author Gihong Park
* @since 3.0.0
*/
public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware {
private static final Log LOG = LogFactory.getLog(AbstractKafkaStreamsBinderProcessor.class);
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final BindingServiceProperties bindingServiceProperties;
private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
private final CleanupConfig cleanupConfig;
private final KeyValueSerdeResolver keyValueSerdeResolver;
protected ConfigurableApplicationContext applicationContext;
private static final ThreadLocal<Boolean> matchedRecordThreadLocal = ThreadLocal.withInitial(() -> false);
public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KeyValueSerdeResolver keyValueSerdeResolver, CleanupConfig cleanupConfig) {
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.cleanupConfig = cleanupConfig;
}
@Override
public final void setApplicationContext(@NonNull ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
protected Topology.AutoOffsetReset getAutoOffsetReset(String inboundName, KafkaStreamsConsumerProperties extendedConsumerProperties) {
final KafkaConsumerProperties.StartOffset startOffset = extendedConsumerProperties
.getStartOffset();
Topology.AutoOffsetReset autoOffsetReset = null;
if (startOffset != null) {
autoOffsetReset = switch (startOffset) {
case earliest -> Topology.AutoOffsetReset.EARLIEST;
case latest -> Topology.AutoOffsetReset.LATEST;
};
}
if (extendedConsumerProperties.isResetOffsets()) {
AbstractKafkaStreamsBinderProcessor.LOG.warn("Detected resetOffsets configured on binding "
+ inboundName + ". "
+ "Setting resetOffsets in Kafka Streams binder does not have any effect.");
}
return autoOffsetReset;
}
@SuppressWarnings("unchecked")
protected void handleKTableGlobalKTableInputs(Object[] arguments, int index, String input, Class<?> parameterType, Object targetBean,
StreamsBuilderFactoryBean streamsBuilderFactoryBean, StreamsBuilder streamsBuilder,
KafkaStreamsConsumerProperties extendedConsumerProperties,
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
if (firstBuild) {
addStateStoreBeans(streamsBuilder);
}
if (parameterType.isAssignableFrom(KTable.class)) {
String materializedAs = extendedConsumerProperties.getMaterializedAs();
String bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
KTable<?, ?> table = getKTable(extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, materializedAs,
bindingDestination, autoOffsetReset);
KTableBoundElementFactory.KTableWrapper kTableWrapper =
(KTableBoundElementFactory.KTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
kTableWrapper.wrap((KTable<Object, Object>) table);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean,
bindingServiceProperties.getConsumerProperties(input));
arguments[index] = table;
}
else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
String materializedAs = extendedConsumerProperties.getMaterializedAs();
String bindingDestination = this.bindingServiceProperties.getBindingDestination(input);
GlobalKTable<?, ?> table = getGlobalKTable(extendedConsumerProperties, streamsBuilder, keySerde, valueSerde, materializedAs,
bindingDestination, autoOffsetReset);
GlobalKTableBoundElementFactory.GlobalKTableWrapper globalKTableWrapper =
(GlobalKTableBoundElementFactory.GlobalKTableWrapper) targetBean;
//wrap the proxy created during the initial target type binding with real object (KTable)
globalKTableWrapper.wrap((GlobalKTable<Object, Object>) table);
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(input, streamsBuilderFactoryBean);
this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean,
bindingServiceProperties.getConsumerProperties(input));
arguments[index] = table;
}
}
@SuppressWarnings({ "unchecked" })
protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix,
ApplicationContext applicationContext, String inboundName,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
StreamsBuilderFactoryBeanConfigurer customizer,
ConfigurableEnvironment environment, BindingProperties bindingProperties) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext
.getBeanFactory();
Map<String, Object> streamConfigGlobalProperties = applicationContext
.getBean("streamConfigGlobalProperties", Map.class);
// Use a copy because the global configuration will be shared by multiple processors.
Map<String, Object> streamConfiguration = new HashMap<>(streamConfigGlobalProperties);
if (kafkaStreamsBinderConfigurationProperties != null) {
final Map<String, KafkaStreamsBinderConfigurationProperties.Functions> functionConfigMap = kafkaStreamsBinderConfigurationProperties.getFunctions();
if (!CollectionUtils.isEmpty(functionConfigMap)) {
final KafkaStreamsBinderConfigurationProperties.Functions functionConfig = functionConfigMap.get(beanNamePostPrefix);
if (functionConfig != null) {
final Map<String, String> functionSpecificConfig = functionConfig.getConfiguration();
if (!CollectionUtils.isEmpty(functionSpecificConfig)) {
streamConfiguration.putAll(functionSpecificConfig);
}
String applicationId = functionConfig.getApplicationId();
if (StringUtils.hasText(applicationId)) {
streamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
}
}
}
}
final MutablePropertySources propertySources = environment.getPropertySources();
if (StringUtils.hasText(bindingProperties.getBinder())) {
final KafkaStreamsBinderConfigurationProperties multiBinderKafkaStreamsBinderConfigurationProperties =
applicationContext.getBean(bindingProperties.getBinder() + "-KafkaStreamsBinderConfigurationProperties", KafkaStreamsBinderConfigurationProperties.class);
String connectionString = multiBinderKafkaStreamsBinderConfigurationProperties.getKafkaConnectionString();
if (!StringUtils.hasText(connectionString)) {
connectionString = (String) Objects.requireNonNull(propertySources.get(bindingProperties.getBinder() + "-kafkaStreamsBinderEnv")).getProperty("spring.cloud.stream.kafka.binder.brokers");
}
streamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString);
String binderProvidedApplicationId = multiBinderKafkaStreamsBinderConfigurationProperties.getApplicationId();
if (StringUtils.hasText(binderProvidedApplicationId)) {
streamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
binderProvidedApplicationId);
}
if (multiBinderKafkaStreamsBinderConfigurationProperties
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
}
else if (multiBinderKafkaStreamsBinderConfigurationProperties
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
}
else if (multiBinderKafkaStreamsBinderConfigurationProperties
.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
SendToDlqAndContinue sendToDlqAndContinue = applicationContext.getBean(SendToDlqAndContinue.class);
streamConfiguration.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, sendToDlqAndContinue);
}
if (!ObjectUtils.isEmpty(multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration())) {
streamConfiguration.putAll(multiBinderKafkaStreamsBinderConfigurationProperties.getConfiguration());
}
if (!streamConfiguration.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG)) {
streamConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
(int) multiBinderKafkaStreamsBinderConfigurationProperties.getReplicationFactor());
}
}
//this is only used primarily for StreamListener based processors. Although in theory, functions can use it,
//it is ideal for functions to use the approach used in the above if statement by using a property like
//spring.cloud.stream.kafka.streams.binder.functions.process.configuration.num.threads (assuming that process is the function name).
KafkaStreamsConsumerProperties extendedConsumerProperties = this.kafkaStreamsExtendedBindingProperties
.getExtendedConsumerProperties(inboundName);
Map<String, String> bindingConfig = extendedConsumerProperties.getConfiguration();
Assert.state(!bindingConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " cannot be overridden at the binding level; "
+ "use multiple binders instead");
// We will only add the per binding configuration to the current streamConfiguration and not the global one.
streamConfiguration
.putAll(bindingConfig);
String bindingLevelApplicationId = extendedConsumerProperties.getApplicationId();
// override application.id if set at the individual binding level.
// We provide this for backward compatibility with StreamListener based processors.
// For function based processors see the approach used above
// (i.e. use a property like spring.cloud.stream.kafka.streams.binder.functions.process.applicationId).
if (StringUtils.hasText(bindingLevelApplicationId)) {
streamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
bindingLevelApplicationId);
}
//If the application id is not set by any mechanism, then generate it.
streamConfiguration.computeIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG,
k -> {
String generatedApplicationID = beanNamePostPrefix + "-applicationId";
LOG.info("Binder Generated Kafka Streams Application ID: " + generatedApplicationID);
LOG.info("Use the binder generated application ID only for development and testing. ");
LOG.info("For production deployments, please consider explicitly setting an application ID using a configuration property.");
LOG.info("The generated applicationID is static and will be preserved over application restarts.");
return generatedApplicationID;
});
handleConcurrency(applicationContext, inboundName, streamConfiguration);
// Override deserialization exception handlers per binding
final DeserializationExceptionHandler deserializationExceptionHandler =
extendedConsumerProperties.getDeserializationExceptionHandler();
if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndFail) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
}
else if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndContinue) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
}
else if (deserializationExceptionHandler == DeserializationExceptionHandler.sendToDlq) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
streamConfiguration.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER,
applicationContext.getBean(SendToDlqAndContinue.class));
}
else if (deserializationExceptionHandler == DeserializationExceptionHandler.skipAndContinue) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SkipAndContinueExceptionHandler.class);
}
KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(streamConfiguration);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.cleanupConfig == null
? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration)
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration,
this.cleanupConfig);
streamsBuilderFactoryBean.setAutoStartup(false);
BeanDefinition streamsBuilderBeanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(
(Class<StreamsBuilderFactoryBean>) streamsBuilderFactoryBean.getClass(),
() -> streamsBuilderFactoryBean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(
"stream-builder-" + beanNamePostPrefix, streamsBuilderBeanDefinition);
extendedConsumerProperties.setApplicationId((String) streamConfiguration.get(StreamsConfig.APPLICATION_ID_CONFIG));
if (customizer != null) {
customizer.configure(streamsBuilderFactoryBean);
}
return applicationContext.getBean(
"&stream-builder-" + beanNamePostPrefix, StreamsBuilderFactoryBean.class);
}
private void handleConcurrency(ApplicationContext applicationContext, String inboundName,
Map<String, Object> streamConfiguration) {
// This rebinding is necessary to capture the concurrency explicitly set by the application.
// This is added to fix this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/899
org.springframework.boot.context.properties.bind.Binder explicitConcurrencyResolver =
new org.springframework.boot.context.properties.bind.Binder(ConfigurationPropertySources.get(applicationContext.getEnvironment()),
new PropertySourcesPlaceholdersResolver(applicationContext.getEnvironment()),
IntegrationUtils.getConversionService(this.applicationContext.getBeanFactory()), null);
boolean[] concurrencyExplicitlyProvided = new boolean[] {false};
BindHandler handler = new BindHandler() {
@Override
public Object onSuccess(ConfigurationPropertyName name, Bindable<?> target,
BindContext context, Object result) {
if (!concurrencyExplicitlyProvided[0]) {
concurrencyExplicitlyProvided[0] = name.getLastElement(ConfigurationPropertyName.Form.UNIFORM).equals("concurrency") &&
// name is normalized to contain only uniform elements and thus safe to call toLowerCase here.
ConfigurationPropertyName.of("spring.cloud.stream.bindings." + inboundName.toLowerCase(Locale.ROOT) + ".consumer")
.isAncestorOf(name);
}
return result;
}
};
//Re-bind spring.cloud.stream properties to check if the application explicitly provided concurrency.
try {
explicitConcurrencyResolver.bind("spring.cloud.stream",
Bindable.ofInstance(new BindingServiceProperties()), handler);
}
catch (Exception e) {
// Ignore this exception
}
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName)
.getConcurrency();
// override concurrency if set at the individual binding level.
// Concurrency will be mapped to num.stream.threads.
// This conditional also takes into account explicit concurrency settings left at the default value of 1
// by the application to address concurrency behavior in applications with multiple processors.
// See this GH issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/844
if (concurrency >= 1 && concurrencyExplicitlyProvided[0]) {
streamConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
concurrency);
}
}
protected Serde<?> getValueSerde(String inboundName, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, ResolvableType resolvableType) {
if (bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding()) {
BindingProperties bindingProperties = this.bindingServiceProperties
.getBindingProperties(inboundName);
return this.keyValueSerdeResolver.getInboundValueSerde(
bindingProperties.getConsumer(), kafkaStreamsConsumerProperties, resolvableType);
}
else {
return Serdes.ByteArray();
}
}
@SuppressWarnings({"unchecked"})
protected KStream<?, ?> getKStream(String inboundName, BindingProperties bindingProperties, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
StreamsBuilder streamsBuilder, Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
if (firstBuild) {
addStateStoreBeans(streamsBuilder);
}
final boolean nativeDecoding = this.bindingServiceProperties
.getConsumerProperties(inboundName).isUseNativeDecoding();
if (nativeDecoding) {
LOG.info("Native decoding is enabled for " + inboundName
+ ". Inbound deserialization done at the broker.");
}
else {
LOG.info("Native decoding is disabled for " + inboundName
+ ". Inbound message conversion done by Spring Cloud Stream.");
}
KStream<?, ?> stream;
final Serde<?> valueSerdeToUse = getValueSerdeToUse(kafkaStreamsConsumerProperties, valueSerde);
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
if (this.kafkaStreamsExtendedBindingProperties
.getExtendedConsumerProperties(inboundName).isDestinationIsPattern()) {
final Pattern pattern = Pattern.compile(this.bindingServiceProperties.getBindingDestination(inboundName));
stream = streamsBuilder.stream(pattern, consumed);
}
else {
String[] bindingTargets = StringUtils.commaDelimitedListToStringArray(
this.bindingServiceProperties.getBindingDestination(inboundName));
stream = streamsBuilder.stream(Arrays.asList(bindingTargets),
consumed);
}
//Check to see if event type based routing is enabled.
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, topicObject, headersObject));
// Branching based on event type match.
final Map<String, ? extends KStream<?, ?>> stringKStreamMap = stream.split()
.branch((key, value) -> {
if (matchedRecordThreadLocal.get()) {
matchedRecordThreadLocal.set(false);
return true;
}
return false;
})
.noDefaultBranch();
final KStream<?, ?>[] branch = stringKStreamMap.values().toArray(new KStream[0]);
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = !kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ?
branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get())) : (KStream<?, Object>) branch[0];
return getkStream(bindingProperties, deserializedKStream, nativeDecoding);
}
return getkStream(bindingProperties, stream, nativeDecoding);
}
private Serde<?> getValueSerdeToUse(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde<?> valueSerde) {
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
return kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ? valueSerde : new Serdes.BytesSerde();
}
return valueSerde;
}
private KStream<?, ?> getkStream(BindingProperties bindingProperties, KStream<?, ?> stream, boolean nativeDecoding) {
if (!nativeDecoding) {
AtomicReference<Headers> headersAtomicReference = new AtomicReference<>();
stream.process((ProcessorSupplier<Object, Object, Void, Void>) () -> new Processor<Object, Object, Void, Void>() {
@Override
public void init(ProcessorContext<Void, Void> context) {
Processor.super.init(context);
}
@Override
public void process(Record<Object, Object> record) {
final Headers headers = record.headers();
headersAtomicReference.set(headers);
}
@Override
public void close() {
Processor.super.close();
}
});
stream = stream.mapValues((value) -> {
Object returnValue;
String contentType = bindingProperties.getContentType();
if (value != null && StringUtils.hasText(contentType)) {
final Headers headers = headersAtomicReference.get();
final Map<String, Object> headersMap = new HashMap<>();
headers.forEach(header -> headersMap.put(header.key(), header.value()));
returnValue = MessageBuilder.withPayload(value).copyHeaders(headersMap)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
}
else {
returnValue = value;
}
return returnValue;
});
}
return stream;
}
@SuppressWarnings("rawtypes")
private void addStateStoreBeans(StreamsBuilder streamsBuilder) {
try {
final Map<String, StoreBuilder> storeBuilders = applicationContext.getBeansOfType(StoreBuilder.class);
if (!CollectionUtils.isEmpty(storeBuilders)) {
storeBuilders.values().forEach(storeBuilder -> {
streamsBuilder.addStateStore(storeBuilder);
if (LOG.isInfoEnabled()) {
LOG.info("state store " + storeBuilder.name() + " added to topology");
}
});
}
}
catch (Exception e) {
// Pass through.
}
}
private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName,
Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
final Consumed<K, V> consumed = getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination),
consumed, getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
}
private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(
String storeName, Serde<K> k, Serde<V> v, Boolean isCachingDisabled, Boolean isLoggingDisabled) {
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k).withValueSerde(v);
if (isCachingDisabled != null) {
if (isCachingDisabled) {
materialized = materialized.withCachingDisabled();
}
else {
materialized = materialized.withCachingEnabled();
}
}
if (isLoggingDisabled != null) {
if (isLoggingDisabled) {
materialized = materialized.withLoggingDisabled();
}
}
return materialized;
}
private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
StreamsBuilder streamsBuilder, String destination, String storeName,
Serde<K> k, Serde<V> v, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
final Consumed<K, V> consumed = getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
return streamsBuilder.globalTable(
this.bindingServiceProperties.getBindingDestination(destination),
consumed,
getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
}
private GlobalKTable<?, ?> getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
StreamsBuilder streamsBuilder,
Serde<?> keySerde, Serde<?> valueSerde, String materializedAs,
String bindingDestination, Topology.AutoOffsetReset autoOffsetReset) {
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
return materializedAs != null
? materializedAsGlobalKTable(streamsBuilder, bindingDestination,
materializedAs, keySerde, valueSerde, autoOffsetReset, kafkaStreamsConsumerProperties)
: streamsBuilder.globalTable(bindingDestination,
consumed);
}
@SuppressWarnings({"unchecked"})
private KTable<?, ?> getKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
StreamsBuilder streamsBuilder, Serde<?> keySerde,
Serde<?> valueSerde, String materializedAs, String bindingDestination,
Topology.AutoOffsetReset autoOffsetReset) {
final Serde<?> valueSerdeToUse = getValueSerdeToUse(kafkaStreamsConsumerProperties, valueSerde);
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
final KTable<?, ?> kTable = materializedAs != null
? materializedAs(streamsBuilder, bindingDestination, materializedAs,
keySerde, valueSerdeToUse, autoOffsetReset, kafkaStreamsConsumerProperties)
: streamsBuilder.table(bindingDestination,
consumed);
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
final KStream<?, ?> stream = kTable.toStream();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, topicObject, headersObject));
// Branching based on event type match.
final Map<String, ? extends KStream<?, ?>> stringKStreamMap = stream.split()
.branch((key, value) -> {
if (matchedRecordThreadLocal.get()) {
matchedRecordThreadLocal.set(false);
return true;
}
return false;
})
.noDefaultBranch();
final KStream<?, ?>[] branch = stringKStreamMap.values().toArray(new KStream[0]);
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = !kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ?
branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get())) : (KStream<?, Object>) branch[0];
return deserializedKStream.toTable();
}
return kTable;
}
private <K, V> Consumed<K, V> getConsumed(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
Serde<K> keySerde, Serde<V> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
TimestampExtractor timestampExtractor = null;
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getTimestampExtractorBeanName())) {
timestampExtractor = applicationContext.getBean(kafkaStreamsConsumerProperties.getTimestampExtractorBeanName(),
TimestampExtractor.class);
}
Consumed<K, V> consumed = Consumed.with(keySerde, valueSerde)
.withOffsetResetPolicy(autoOffsetReset);
if (timestampExtractor != null) {
consumed = consumed.withTimestampExtractor(timestampExtractor);
}
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getConsumedAs())) {
consumed = consumed.withName(kafkaStreamsConsumerProperties.getConsumedAs());
}
return consumed;
}
private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
return new Processor<>() {
org.apache.kafka.streams.processor.api.ProcessorContext<?, ?> context;
@Override
public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
Processor.super.init(context);
this.context = context;
}
@Override
public void process(Record<K, V> record) {
final Headers headers = record.headers();
headersObject.set(headers);
final Optional<RecordMetadata> optional = this.context.recordMetadata();
if (optional.isPresent()) {
final RecordMetadata recordMetadata = optional.get();
topicObject.set(recordMetadata.topic());
}
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());
final String[] eventTypesFromBinding = StringUtils.commaDelimitedListToStringArray(kafkaStreamsConsumerProperties.getEventTypes());
for (String eventTypeFromBinding : eventTypesFromBinding) {
if (eventTypeFromHeader.equals(eventTypeFromBinding)) {
AbstractKafkaStreamsBinderProcessor.matchedRecordThreadLocal.set(true);
break;
}
}
}
}
@Override
public void close() {
}
};
}
}