ProducerOnlyTransactionTests.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @author Soby Chacko
* @since 2.1.4
*
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all"})
@DirtiesContext
@EmbeddedKafka(topics = "output", controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"})
class ProducerOnlyTransactionTests {
@Autowired
private Sender sender;
@Autowired
private ApplicationContext context;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBrokera;
@Test
void producerTx() {
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
this.sender.DoInTransaction(streamBridge);
assertThat(this.sender.isInTx()).isTrue();
Map<String, Object> props = KafkaTestUtils.consumerProps("consumeTx", "false",
embeddedKafkaBrokera);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
Consumer<?, ?> consumer = new KafkaConsumer<>(props);
embeddedKafkaBrokera.consumeFromAllEmbeddedTopics(consumer);
ConsumerRecord<?, ?> record = KafkaTestUtils.getSingleRecord(consumer, "output");
assertThat(record.value()).isEqualTo("foo".getBytes());
}
@EnableAutoConfiguration
@EnableTransactionManagement
@Configuration
public static class Config {
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
try {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> tm = new KafkaTransactionManager<>(pf);
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return tm;
}
catch (BeanCreationException e) { // needed to avoid other tests in this package failing when there is no binder
return null;
}
}
@Bean
public Sender sender() {
return new Sender();
}
}
public static class Sender {
private boolean isInTx;
@Transactional
public void DoInTransaction(StreamBridge streamBridge) {
this.isInTx = TransactionSynchronizationManager.isActualTransactionActive();
streamBridge.send("output", new GenericMessage<>("foo".getBytes()));
}
public boolean isInTx() {
return this.isInTx;
}
}
}