ConsumerProducerTransactionTests.java

/*
 * Copyright 2019-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.integration2;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.backoff.FixedBackOff;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
 * @author Gary Russell
 * @author Soby Chacko
 * @since 3.0
 *
 */
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
		"spring.kafka.consumer.properties.isolation.level=read_committed",
		"spring.kafka.consumer.enable-auto-commit=false",
		"spring.kafka.consumer.auto-offset-reset=earliest",
		"spring.cloud.function.definition=listenIn;listenIn2",
		"spring.cloud.stream.function.bindings.listenIn-in-0=input",
		"spring.cloud.stream.function.bindings.listenIn-out-0=output",
		"spring.cloud.stream.function.bindings.listenIn2-in-0=input2",
		"spring.cloud.stream.function.bindings.listenIn2-out-0=output2",
		"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
		"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
		"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
		"spring.cloud.stream.kafka.bindings.input2.consumer.transaction-manager=tm",
		"spring.cloud.stream.kafka.bindings.output2.producer.transaction-manager=tm",
		"spring.cloud.stream.bindings.output.destination=consumer.producer.txOut",
		"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
		"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
		"spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all"})
@DirtiesContext
@EmbeddedKafka(topics = "consumer.producer.txOut", controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1",
	"transaction.state.log.min.isr=1"})
@Disabled
class ConsumerProducerTransactionTests {

	@Autowired
	private Config config;

	@Autowired
	private ApplicationContext context;

	@Test
	public void producerRunsInConsumerTransaction() throws InterruptedException {
		assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(this.config.outs).containsExactlyInAnyOrder("ONE", "THREE");
	}

	@Test
	void externalTM() {
		assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
				.isSameAs(this.config.tm);
		final MessageChannel output2 = context.getBean("output2", MessageChannel.class);

		Object handler = KafkaTestUtils.getPropertyValue(output2, "dispatcher.handlers", Set.class)
				.iterator().next();
		assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
				.isSameAs(this.config.pf);
	}

	@EnableAutoConfiguration
	@Configuration
	public static class Config {

		final List<String> outs = new ArrayList<>();

		final CountDownLatch latch = new CountDownLatch(2);

		AbstractMessageListenerContainer<?, ?> input2Container;

		ProducerFactory pf;

		KafkaAwareTransactionManager<byte[], byte[]> tm;

		@KafkaListener(id = "test.cons.prod", topics = "consumer.producer.txOut")
		public void listenOut(String in) {
			this.outs.add(in);
			this.latch.countDown();
		}

		@Bean
		public Function<String, String> listenIn() {
			return in -> {
				if (in.equals("two")) {
					throw new RuntimeException("fail");
				}
				return in.toUpperCase(Locale.ROOT);
			};
		}

		@Bean
		public Function<String, String> listenIn2() {
			return in -> in;
		}

		@Bean
		public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
			return args -> {
				template.send("consumer.producer.txIn", "one".getBytes());
				template.send("consumer.producer.txIn", "two".getBytes());
				template.send("consumer.producer.txIn", "three".getBytes());
			};
		}

		@Bean
		public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
			return (container, dest, group) -> {
				container.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
				if ("input2".equals(dest)) {
					this.input2Container = container;
				}
			};
		}

		@SuppressWarnings({ "rawtypes", "unchecked" })
		@Bean
		public KafkaAwareTransactionManager<byte[], byte[]> tm(ProducerFactory pf) {
			KafkaAwareTransactionManager mock = mock(KafkaAwareTransactionManager.class);
			this.pf = pf;
			given(mock.getProducerFactory()).willReturn(pf);
			this.tm = mock;
			return mock;
		}
	}
}