KafkaBinderTransactionCustomizerTest.java
/*
* Copyright 2025-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.core.retry.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
* @author Soby Chacko
* @author Artem Bilan
*/
@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"})
class KafkaBinderTransactionCustomizerTest {
private static EmbeddedKafkaBroker embeddedKafka;
@BeforeAll
public static void setup() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}
@SuppressWarnings("unchecked")
@Test
void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties, mock(ObjectProvider.class));
configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties, prop -> {
});
RetryPolicy retryPolicy = RetryPolicy.builder().maxRetries(2).delay(Duration.ZERO).build();
provisioningProvider.setMetadataRetryOperations(new RetryTemplate(retryPolicy));
// Create a tracking list for customized factories
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
// Create a customizer that we'll register after the binder is created
ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizedFactories.add(pf);
}
};
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider));
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
// Add the customizer AFTER binder creation but BEFORE afterPropertiesSet
binder.addClientFactoryCustomizer(customizer);
// Now initialize the binder (this triggers onInit)
binder.afterPropertiesSet();
// Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit
verify(binder).getProducerFactory(
eq("custom-tx-"),
any(ExtendedProducerProperties.class),
eq("custom-tx-.producer"),
isNull());
// Verify customizer was applied
assertThat(customizedFactories).isNotEmpty();
// Verify that the producer factory from the transaction manager is in our list of customized factories
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
TestUtils.getPropertyValue(binder, "transactionManager");
assertThat(txManager).isNotNull();
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
// This verifies that the same producer factory that was customized is used for the transaction manager
assertThat(customizedFactories).contains(producerFactory);
}
@SuppressWarnings("unchecked")
@Test
void multipleCustomizersAppliedInOrder() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties, mock(ObjectProvider.class));
configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties, prop -> {
});
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
// Track order of customizers and customized factories
List<String> customizationOrder = new ArrayList<>();
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider));
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
// Add multiple customizers
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer1");
customizedFactories.add(pf);
}
});
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer2");
}
});
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer3");
}
});
binder.afterPropertiesSet();
assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3");
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
TestUtils.getPropertyValue(binder, "transactionManager");
assertThat(txManager).isNotNull();
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
// Verify that the producer factory used in transaction manager is one that was customized
assertThat(customizedFactories).contains(producerFactory);
}
}