KafkaMultiBinderCustomConfigurationTests.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.integration;

import java.lang.reflect.Field;
import java.util.Map;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * Integration tests to verify that custom configurations defined in spring.main.sources
 * are properly loaded before the default binder configuration in multi-binder scenarios.
 *
 * @author Fernando Blanch
 * @since 4.1.0
 */
@SpringBootTest(classes = KafkaMultiBinderCustomConfigurationTests.class,
		webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
		"spring.cloud.stream.defaultBinder=kafka1",
		"spring.cloud.stream.binders.kafka1.type=kafka",
		"spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}",
		"spring.cloud.stream.binders.kafka2.type=kafka",
		"spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}",
		"spring.cloud.stream.binders.kafka2.environment.spring.main.sources=" +
				"org.springframework.cloud.stream.binder.kafka.integration.KafkaMultiBinderCustomConfigurationTests$CustomConfiguration"
})
@DirtiesContext
@EnableAutoConfiguration
@EmbeddedKafka(controlledShutdown = true)
class KafkaMultiBinderCustomConfigurationTests {

	@Autowired
	private DefaultBinderFactory binderFactory;

	/**
	 * Verifies that the custom user configuration is loaded from spring.main.sources.
	 */
	@Test
	void binderKafka2UsesCustomConfigurationIsLoadedFromSpringMainSources() throws IllegalAccessException {
		// Force initialization of binders
		Binder<?, ?, ?> kafka2Binder = binderFactory.getBinder("kafka2", Object.class);
		assertThat(kafka2Binder).isNotNull();

		// Get the kafka2 binder context
		ConfigurableApplicationContext kafka2Context = getBinderContext("kafka2");
		assertThat(kafka2Context).isNotNull();

		// Verify that our custom bean is used instead of the default one
		KafkaBinderMetrics kafkaBinderMetrics = kafka2Context.getBean(KafkaBinderMetrics.class);
		assertThat(kafkaBinderMetrics).isInstanceOf(CustomKafkaBinderMetrics.class);
	}

	/**
	 * Verifies that the default configuration is used when no custom user configuration is provided.
	 */
	@Test
	void binderKafka1UsesDefaultBeanFromKafkaBinderMetricsConfigurationWithMultiBinder() throws IllegalAccessException {
		// Force initialization of binders
		Binder<?, ?, ?> kafka1Binder = binderFactory.getBinder("kafka1", Object.class);
		assertThat(kafka1Binder).isNotNull();

		ConfigurableApplicationContext kafka1Context = getBinderContext("kafka1");
		assertThat(kafka1Context).isNotNull();

		// Verify that the metrics bean is from KafkaBinderMetricsConfigurationWithMultiBinder configuration
		// (not a CustomKafkaBinderMetrics instance)
		KafkaBinderMetrics kafka1BinderMetrics = kafka1Context.getBean(KafkaBinderMetrics.class);
		assertThat(kafka1BinderMetrics).isNotInstanceOf(CustomKafkaBinderMetrics.class);
	}

	/**
	 * Helper method to get the binder context from the binderInstanceCache field in DefaultBinderFactory.
	 */
	private ConfigurableApplicationContext getBinderContext(String binderName) throws IllegalAccessException {
		Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache");
		assertThat(binderInstanceCacheField).isNotNull();
		ReflectionUtils.makeAccessible(binderInstanceCacheField);
		@SuppressWarnings("unchecked")
		Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>> binderInstanceCache =
				(Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>>) binderInstanceCacheField.get(this.binderFactory);
		return binderInstanceCache.get(binderName).getValue();
	}

	/**
	 * Custom configuration that provides a custom KafkaBinderMetrics.
	 */
	static class CustomConfiguration {

		@Bean
		MeterRegistry meterRegistry() {
			return new SimpleMeterRegistry();
		}

		@Bean
		KafkaBinderMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
				KafkaBinderConfigurationProperties configurationProperties,
				MeterRegistry meterRegistry) {
			return new CustomKafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, meterRegistry);
		}

	}

	static class CustomKafkaBinderMetrics extends KafkaBinderMetrics {

		CustomKafkaBinderMetrics(KafkaMessageChannelBinder binder,
				KafkaBinderConfigurationProperties binderConfigurationProperties,
				MeterRegistry meterRegistry) {
			super(binder, binderConfigurationProperties, null, meterRegistry);
		}

	}

}