KafkaMultiBinderCustomConfigurationTests3148.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.integration;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;

import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.io.support.EncodedResource;
import org.springframework.core.io.support.PropertySourceFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/**
 *
 */
@SpringBootTest(classes = KafkaMultiBinderCustomConfigurationTests3148.SampleApplication.class,
		webEnvironment = SpringBootTest.WebEnvironment.NONE
)
@EmbeddedKafka(controlledShutdown = true)
public class KafkaMultiBinderCustomConfigurationTests3148 {

	@Autowired
	ApplicationContext applicationContext;

	@Autowired
	private DefaultBinderFactory binderFactory;

	@Test
	public void test() {
		ConfigurableApplicationContext kafkaContext = getBinderContext("test-binder");

		KafkaBindingRebalanceListener kafkaBindingRebalanceListener = kafkaContext.getBean(KafkaBindingRebalanceListener.class);
		assertThat(kafkaBindingRebalanceListener).isNotNull();
	}

	private ConfigurableApplicationContext getBinderContext(String binderName) {
		Field binderInstanceCacheField = ReflectionUtils.findField(DefaultBinderFactory.class, "binderInstanceCache");
		assertThat(binderInstanceCacheField).isNotNull();
		ReflectionUtils.makeAccessible(binderInstanceCacheField);
		try {
			Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>> binderInstanceCache =
				(Map<String, Map.Entry<Binder<?, ?, ?>, ConfigurableApplicationContext>>) binderInstanceCacheField.get(this.binderFactory);
			return binderInstanceCache.get(binderName).getValue();
		} catch (Exception e) {
			fail();
		}
		return null;
	}
	@Configuration
	@EnableAutoConfiguration
	@PropertySource(value= "classpath:test3148.yml", factory = KafkaMultiBinderCustomConfigurationTests3148.YamlPropertySourceFactory.class)
	public static class SampleApplication {

		@Bean
		public Consumer<String> testConsumer() {
			return consumer -> {};
		}

		@Bean
		public KafkaBindingRebalanceListener rebalanceListener() {
			return new KafkaBindingRebalanceListener() {
				public void onPartitionsAssigned(String bindingName, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
												 Collection<TopicPartition> partitions, boolean initial) {
					// do nothing
				}
			};
		}
	}



	public static class YamlPropertySourceFactory implements PropertySourceFactory {

		@Override
		public org.springframework.core.env.PropertySource<?> createPropertySource(String name, EncodedResource encodedResource)
			throws IOException {
			YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
			factory.setResources(encodedResource.getResource());

			Properties properties = factory.getObject();

			return new PropertiesPropertySource(encodedResource.getResource().getFilename(), properties);
		}
	}
}