KafkaBinderAutoConfigurationPropertiesTest.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * @author Ilayaperumal Gopinathan
 * @author Soby Chacko
 */
@SpringBootTest(classes = { KafkaBinderConfiguration.class })
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
class KafkaBinderAutoConfigurationPropertiesTest {

	@Autowired
	private KafkaMessageChannelBinder kafkaMessageChannelBinder;

	@Autowired
	private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;

	@Test
	@SuppressWarnings("unchecked")
	void kafkaBinderConfigurationWithKafkaProperties() throws Exception {
		assertThat(this.kafkaMessageChannelBinder).isNotNull();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
				new KafkaProducerProperties());
		Method getProducerFactoryMethod = KafkaMessageChannelBinder.class
				.getDeclaredMethod("getProducerFactory", String.class,
						ExtendedProducerProperties.class, String.class, String.class);
		getProducerFactoryMethod.setAccessible(true);
		DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
				.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties, "foo.producer", "foo");
		Field producerFactoryConfigField = ReflectionUtils
				.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
		ReflectionUtils.makeAccessible(producerFactoryConfigField);
		Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils
				.getField(producerFactoryConfigField, producerFactory);
		assertThat(producerConfigs.get("batch.size").equals(10)).isTrue();
		assertThat(producerConfigs.get("key.serializer")).isEqualTo(LongSerializer.class);
		assertThat(producerConfigs.get("key.deserializer")).isNull();
		assertThat(producerConfigs.get("value.serializer"))
				.isEqualTo(LongSerializer.class);
		assertThat(producerConfigs.get("value.deserializer")).isNull();
		assertThat(producerConfigs.get("compression.type")).isEqualTo("snappy");
		List<String> bootstrapServers = new ArrayList<>();
		bootstrapServers.add("10.98.09.199:9092");
		bootstrapServers.add("10.98.09.196:9092");
		assertThat((((List<String>) producerConfigs.get("bootstrap.servers"))
				.containsAll(bootstrapServers))).isTrue();
		Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class
				.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
						String.class, ExtendedConsumerProperties.class, String.class, String.class);
		createKafkaConsumerFactoryMethod.setAccessible(true);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
				new KafkaConsumerProperties());
		DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
				.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer", "test");
		Field consumerFactoryConfigField = ReflectionUtils
				.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
		ReflectionUtils.makeAccessible(consumerFactoryConfigField);
		Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils
				.getField(consumerFactoryConfigField, consumerFactory);
		assertThat(consumerConfigs.get("key.deserializer"))
				.isEqualTo(LongDeserializer.class);
		assertThat(consumerConfigs.get("key.serializer")).isNull();
		assertThat(consumerConfigs.get("value.deserializer"))
				.isEqualTo(LongDeserializer.class);
		assertThat(consumerConfigs.get("value.serialized")).isNull();
		assertThat(consumerConfigs.get("group.id")).isEqualTo("test");
		assertThat(consumerConfigs.get("auto.offset.reset")).isEqualTo("earliest");
		assertThat((((List<String>) consumerConfigs.get("bootstrap.servers"))
				.containsAll(bootstrapServers))).isTrue();
	}

	@Test
	@SuppressWarnings("unchecked")
	void kafkaHealthIndicatorProperties() {
		assertThat(this.kafkaBinderHealthIndicator).isNotNull();
		Field consumerFactoryField = ReflectionUtils.findField(
				KafkaBinderHealthIndicator.class, "consumerFactory",
				ConsumerFactory.class);
		ReflectionUtils.makeAccessible(consumerFactoryField);
		DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) ReflectionUtils
				.getField(consumerFactoryField, this.kafkaBinderHealthIndicator);
		Field configField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class,
				"configs", Map.class);
		ReflectionUtils.makeAccessible(configField);
		Map<String, Object> configs = (Map<String, Object>) ReflectionUtils
				.getField(configField, consumerFactory);
		assertThat(configs.containsKey("bootstrap.servers")).isTrue();
		List<String> bootstrapServers = new ArrayList<>();
		bootstrapServers.add("10.98.09.199:9092");
		bootstrapServers.add("10.98.09.196:9092");
		assertThat(((List<String>) configs.get("bootstrap.servers"))
				.containsAll(bootstrapServers)).isTrue();
	}

}