BindingUtils.java

/*
 * Copyright 2022-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.utils;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
 * Binding Utilities.
 *
 * @author Gary Russell
 * @since 4.0
 *
 */
public final class BindingUtils {

	private BindingUtils() {
	}

	/**
	 * Get the message converter for consumer bindings from the application context. If
	 * the binding properties do not contain a bean name, a default
	 * {@link MessagingMessageConverter} is returned; if the binder properties contain a
	 * header mapper bean name, it is used in the default converter, otherwise a
	 * {@link JsonKafkaHeaderMapper} is used.
	 * @param applicationContext the application context.
	 * @param extendedConsumerProperties the consumer binding properties.
	 * @param configurationProperties the binder properties.
	 * @return the converter
	 * @throws IllegalStateException if a bean name is specified but not found.
	 */
	public static MessageConverter getConsumerMessageConverter(ApplicationContext applicationContext,
			ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
			KafkaBinderConfigurationProperties configurationProperties) {

		MessageConverter messageConverter;
		if (extendedConsumerProperties.getExtension().getConverterBeanName() == null) {
			MessagingMessageConverter mmc = new MessagingMessageConverter();
			StandardHeaders standardHeaders = extendedConsumerProperties.getExtension()
					.getStandardHeaders();
			mmc.setGenerateMessageId(StandardHeaders.id.equals(standardHeaders)
							|| StandardHeaders.both.equals(standardHeaders));
			mmc.setGenerateTimestamp(
					StandardHeaders.timestamp.equals(standardHeaders)
							|| StandardHeaders.both.equals(standardHeaders));
			KafkaHeaderMapper headerMapper = getHeaderMapper(applicationContext, configurationProperties);
			if (headerMapper == null) {
				headerMapper = new JsonKafkaHeaderMapper();
			}
			mmc.setHeaderMapper(headerMapper);
			messageConverter = mmc;
		}
		else {
			try {
				messageConverter = applicationContext.getBean(
						extendedConsumerProperties.getExtension().getConverterBeanName(), MessageConverter.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new IllegalStateException(
						"Converter bean not present in application context", ex);
			}
		}
		return messageConverter;
	}

	/**
	 * Get the header mapper bean, if the binder properties contains a bean name; if not
	 * look for a bean with name  {@code kafkaBinderHeaderMapper} is looked up; if that
	 * doesn't exist, null is returned.
	 * @param applicationContext the application context.
	 * @param configurationProperties the binder properties.
	 * @return the mapper.
	 */
	@Nullable
	public static KafkaHeaderMapper getHeaderMapper(ApplicationContext applicationContext,
			KafkaBinderConfigurationProperties configurationProperties) {

		KafkaHeaderMapper mapper = null;
		if (configurationProperties.getHeaderMapperBeanName() != null) {
			mapper = applicationContext.getBean(
					configurationProperties.getHeaderMapperBeanName(),
					KafkaHeaderMapper.class);
		}
		if (mapper == null) {
			//First, try to see if there is a bean named headerMapper registered by other frameworks using the binder (for e.g. spring cloud sleuth)
			try {
				mapper = applicationContext.getBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class);
			}
			catch (BeansException be) {
			}
		}
		return mapper;
	}

	/**
	 * Create the Kafka configuration map for a consumer binding. With anonymous bindings
	 * (those without a {@code group} property, which are given a {@code UUID.toString()}
	 * in the group id) consumption begins from the current end of the topic, otherwise
	 * consumption starts from the beginning, the first time the binding consumes.
	 * @param anonymous true if this is for an anonymous binding.
	 * @param consumerGroup the group.
	 * @param consumerProperties the binding properties.
	 * @param configurationProperties the binder properties.
	 * @return the config map.
	 */
	public static Map<String, Object> createConsumerConfigs(boolean anonymous, String consumerGroup,
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties,
			KafkaBinderConfigurationProperties configurationProperties) {

		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
				anonymous ? "latest" : "earliest");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);

		Map<String, Object> mergedConfig = configurationProperties.mergedConsumerConfiguration();
		if (!ObjectUtils.isEmpty(mergedConfig)) {
			props.putAll(mergedConfig);
		}
		if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
			props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
					configurationProperties.getKafkaConnectionString());
		}
		Map<String, String> config = consumerProperties.getExtension().getConfiguration();
		if (!ObjectUtils.isEmpty(config)) {
			Assert.state(!config.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
					ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " cannot be overridden at the binding level; "
							+ "use multiple binders instead");
			props.putAll(config);
		}
		if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
			props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
					consumerProperties.getExtension().getStartOffset().name());
		}
		return props;
	}

	/**
	 * Create the Kafka configuration map for a producer binding.
	 * @param producerProperties the binding properties.
	 * @param configurationProperties the binder properties.
	 * @return the config map.
	 */
	public static Map<String, Object> createProducerConfigs(
			ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
			KafkaBinderConfigurationProperties configurationProperties) {

		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				ByteArraySerializer.class);
		props.put(ProducerConfig.ACKS_CONFIG,
				String.valueOf(configurationProperties.getRequiredAcks()));
		Map<String, Object> mergedConfig = configurationProperties
				.mergedProducerConfiguration();
		if (!ObjectUtils.isEmpty(mergedConfig)) {
			props.putAll(mergedConfig);
		}
		if (ObjectUtils.isEmpty(props.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
			props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
					configurationProperties.getKafkaConnectionString());
		}
		final KafkaProducerProperties kafkaProducerProperties = producerProperties.getExtension();
		if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
			props.put(ProducerConfig.BATCH_SIZE_CONFIG,
					String.valueOf(kafkaProducerProperties.getBufferSize()));
		}
		if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
			props.put(ProducerConfig.LINGER_MS_CONFIG,
					String.valueOf(kafkaProducerProperties.getBatchTimeout()));
		}
		if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
			props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
					kafkaProducerProperties.getCompressionType().toString());
		}
		Map<String, String> configs = producerProperties.getExtension().getConfiguration();
		Assert.state(!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
				ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " cannot be overridden at the binding level; "
						+ "use multiple binders instead");
		if (!ObjectUtils.isEmpty(configs)) {
			props.putAll(configs);
		}
		if (!ObjectUtils.isEmpty(kafkaProducerProperties.getConfiguration())) {
			props.putAll(kafkaProducerProperties.getConfiguration());
		}
		return props;
	}

}