KafkaBinderPropertiesTest.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.common.AbstractKafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
*/
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaAutoConfiguration.class,
KafkaBinderPropertiesTest.class })
@TestPropertySource(locations = "classpath:binder-config.properties", properties =
"spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup=health-consumer-group")
class KafkaBinderPropertiesTest {
@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Autowired
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
@Autowired
private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;
@Test
@SuppressWarnings("unchecked")
void kafkaBinderConfigurationProperties() throws Exception {
assertThat(this.kafkaBinderConfigurationProperties).isNotNull();
// Testing a scenario in health indicator that is originally triggered by a property in KafkaBinderConfigurationProperties,
// which ultimately creates a Kafka Consumer in the health indicator implementation.
assertThat(this.kafkaBinderConfigurationProperties.getHealthIndicatorConsumerGroup())
.isEqualTo("health-consumer-group");
assertThat(this.kafkaBinderHealthIndicator).isNotNull();
Field consumerFactoryField = AbstractKafkaBinderHealthIndicator.class.getDeclaredField("consumerFactory");
consumerFactoryField.setAccessible(true);
ConsumerFactory<?, ?> healthIndicatorConsumerFactory =
(ConsumerFactory<?, ?>) consumerFactoryField.get(this.kafkaBinderHealthIndicator);
assertThat(healthIndicatorConsumerFactory).isNotNull();
Consumer<?, ?> consumer = healthIndicatorConsumerFactory.createConsumer();
ConsumerGroupMetadata consumerGroupMetadata = consumer.groupMetadata();
assertThat(consumerGroupMetadata.groupId()).isEqualTo("health-consumer-group");
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100);
kafkaProducerProperties.setCloseTimeout(10);
kafkaProducerProperties
.setCompressionType(KafkaProducerProperties.CompressionType.gzip);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("getProducerFactory", String.class,
ExtendedProducerProperties.class, String.class, String.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties, "bar.producer", "bar");
Field producerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils
.getField(producerFactoryConfigField, producerFactory);
assertThat(producerConfigs.get("batch.size")).isEqualTo("12345");
assertThat(producerConfigs.get("linger.ms")).isEqualTo("100");
assertThat(producerConfigs.get("key.serializer"))
.isEqualTo(ByteArraySerializer.class);
assertThat(producerConfigs.get("value.serializer"))
.isEqualTo(ByteArraySerializer.class);
assertThat(producerConfigs.get("compression.type")).isEqualTo("gzip");
Field physicalCloseTimeoutField = ReflectionUtils
.findField(DefaultKafkaProducerFactory.class, "physicalCloseTimeout", Duration.class);
ReflectionUtils.makeAccessible(physicalCloseTimeoutField);
Duration physicalCloseTimeoutConfig = (Duration) ReflectionUtils
.getField(physicalCloseTimeoutField, producerFactory);
assertThat(physicalCloseTimeoutConfig).isEqualTo(Duration.ofSeconds(10));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertThat((((String) producerConfigs.get("bootstrap.servers"))
.contains("10.98.09.199:9082"))).isTrue();
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class
.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
String.class, ExtendedConsumerProperties.class, String.class, String.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties, "test.consumer", "test");
Field consumerFactoryConfigField = ReflectionUtils
.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils
.getField(consumerFactoryConfigField, consumerFactory);
assertThat(consumerConfigs.get("key.deserializer"))
.isEqualTo(ByteArrayDeserializer.class);
assertThat(consumerConfigs.get("value.deserializer"))
.isEqualTo(ByteArrayDeserializer.class);
assertThat((((String) consumerConfigs.get("bootstrap.servers"))
.contains("10.98.09.199:9082"))).isTrue();
}
}