KStreamBinder.java
/*
* Copyright 2017-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.springframework.aop.framework.Advised;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.StringUtils;
/**
* {@link org.springframework.cloud.stream.binder.Binder} implementation for
* {@link KStream}. This implemenation extends from the {@link AbstractBinder} directly.
* <p>
* Provides both producer and consumer bindings for the bound KStream.
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
class KStreamBinder extends
// @checkstyle:off
AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements
ExtendedPropertiesBinder<KStream<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
// @checkstyle:on
private static final Log LOG = LogFactory.getLog(KStreamBinder.class);
private final KafkaTopicProvisioner kafkaTopicProvisioner;
// @checkstyle:off
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
// @checkstyle:on
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
KStream<Object, Object> inputTarget,
ExtendedConsumerProperties<KafkaStreamsConsumerProperties> properties) {
KStream<Object, Object> delegate = ((KStreamBoundElementFactory.KStreamWrapperHandler)
((Advised) inputTarget).getAdvisors()[0].getAdvice()).getDelegate();
this.kafkaStreamsBindingInformationCatalogue.registerConsumerProperties(delegate, properties.getExtension());
if (!StringUtils.hasText(group)) {
group = properties.getExtension().getApplicationId();
}
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
KafkaStreamsBinderUtils.prepareConsumerBinding(name, group,
getApplicationContext(), this.kafkaTopicProvisioner,
this.binderConfigurationProperties, properties, retryTemplate, getBeanFactory(),
this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(inputTarget),
this.kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
return new DefaultBinding<KStream<Object, Object>>(bindingName, group,
inputTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return true;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()).get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@Override
public synchronized void stop() {
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()).get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};
}
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name,
KStream<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties =
(ExtendedProducerProperties) properties;
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
Serde<?> keySerde = this.keyValueSerdeResolver
.getOuboundKeySerde(properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget));
LOG.info("Key Serde used for (outbound) " + name + ": " + keySerde.getClass().getName());
Serde<?> valueSerde;
if (properties.isUseNativeEncoding()) {
valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde(properties,
properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget));
}
else {
valueSerde = Serdes.ByteArray();
}
LOG.info("Value Serde used for (outbound) " + name + ": " + valueSerde.getClass().getName());
to(properties.isUseNativeEncoding(), name, outboundBindTarget,
(Serde<Object>) keySerde, (Serde<Object>) valueSerde, properties.getExtension());
final String bindingName = this.kafkaStreamsBindingInformationCatalogue.bindingNamePerTarget(outboundBindTarget);
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeanPerBinding().get(bindingName);
// We need the application id to pass to DefaultBinding so that it won't be interpreted as an anonymous group.
// In case, if we can't find application.id (which is unlikely), we just default to bindingName.
// This will only be used for lifecycle management through actuator endpoints.
final Properties streamsConfiguration = streamsBuilderFactoryBean.getStreamsConfiguration();
final String applicationId = streamsConfiguration != null ? (String) streamsConfiguration.get("application.id") : bindingName;
return new DefaultBinding<KStream<Object, Object>>(bindingName,
applicationId, outboundBindTarget, streamsBuilderFactoryBean) {
@Override
public boolean isInput() {
return false;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
}
}
}
@Override
public synchronized void stop() {
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
}
}
};
}
@SuppressWarnings("unchecked")
private void to(boolean isNativeEncoding, String name,
KStream<Object, Object> outboundBindTarget, Serde<Object> keySerde,
Serde<Object> valueSerde, KafkaStreamsProducerProperties properties) {
final Produced<Object, Object> produced = Produced.with(keySerde, valueSerde);
if (StringUtils.hasText(properties.getProducedAs())) {
produced.withName(properties.getProducedAs());
}
StreamPartitioner streamPartitioner = null;
if (StringUtils.hasText(properties.getStreamPartitionerBeanName())) {
streamPartitioner = getApplicationContext().getBean(properties.getStreamPartitionerBeanName(),
StreamPartitioner.class);
}
if (streamPartitioner != null) {
produced.withStreamPartitioner(streamPartitioner);
}
if (!isNativeEncoding) {
LOG.info("Native encoding is disabled for " + name
+ ". Outbound message conversion done by Spring Cloud Stream.");
outboundBindTarget.filter((k, v) -> v == null)
.to(name, produced);
this.kafkaStreamsMessageConversionDelegate
.serializeOnOutbound(outboundBindTarget)
.to(name, produced);
}
else {
LOG.info("Native encoding is enabled for " + name
+ ". Outbound serialization done at the broker.");
outboundBindTarget.to(name, produced);
}
}
@Override
public KafkaStreamsConsumerProperties getExtendedConsumerProperties(
String channelName) {
return this.kafkaStreamsExtendedBindingProperties
.getExtendedConsumerProperties(channelName);
}
@Override
public KafkaStreamsProducerProperties getExtendedProducerProperties(
String channelName) {
return this.kafkaStreamsExtendedBindingProperties
.getExtendedProducerProperties(channelName);
}
public void setKafkaStreamsExtendedBindingProperties(
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
}
@Override
public String getDefaultsPrefix() {
return this.kafkaStreamsExtendedBindingProperties.getDefaultsPrefix();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.kafkaStreamsExtendedBindingProperties
.getExtendedPropertiesEntryClass();
}
}