KafkaBinderTests.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import tools.jackson.databind.ObjectMapper;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistry;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentMatchers;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.PollableSource;
import org.springframework.cloud.stream.binder.RequeueCurrentMessageException;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.common.BinderHeaderMapper;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.backoff.FixedBackOff;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
 * @author Soby Chacko
 * @author Ilayaperumal Gopinathan
 * @author Henryk Konsek
 * @author Gary Russell
 * @author Chris Bono
 * @author Oliver F��hrer
 * @author Didier Loiseau
 */
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = "error.pollableDlq.group-pcWithDlq", brokerProperties = {"transaction.state.log.replication.factor=1",
		"transaction.state.log.min.isr=1"})
class KafkaBinderTests extends

	PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {

	private static final int DEFAULT_OPERATION_TIMEOUT = 30;

	private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class
			.getSimpleName();

	private KafkaTestBinder binder;

	private AdminClient adminClient;

	private static EmbeddedKafkaBroker embeddedKafka;

	@BeforeAll
	public static void setup() {
		embeddedKafka = EmbeddedKafkaCondition.getBroker();
	}

	@Override
	protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
		final ExtendedConsumerProperties<KafkaConsumerProperties> kafkaConsumerProperties = new ExtendedConsumerProperties<>(
				new KafkaConsumerProperties());
		// set the default values that would normally be propagated by Spring Cloud Stream
		kafkaConsumerProperties.setInstanceCount(1);
		kafkaConsumerProperties.setInstanceIndex(0);
		return kafkaConsumerProperties;
	}

	private ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
		return this.createProducerProperties(null);
	}

	@Override
	protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties(TestInfo testInto) {
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
				new KafkaProducerProperties());
		producerProperties.getExtension().setSync(true);
		return producerProperties;
	}

	@Override
	protected void binderBindUnbindLatency() throws InterruptedException {
		Thread.sleep(500);
	}

	@Override
	protected KafkaTestBinder getBinder() {
		if (binder == null) {
			KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
			KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
					binderConfiguration, new TestKafkaProperties(), prop -> {
			});
			try {
				kafkaTopicProvisioner.afterPropertiesSet();
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
			binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
		}
		return binder;
	}

	private KafkaTestBinder getBinder(
			KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {

		return getBinder(kafkaBinderConfigurationProperties, null, null);
	}

	private KafkaTestBinder getBinder(
			KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
			DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) {

		var provisioningProvider = new KafkaTopicProvisioner(
				kafkaBinderConfigurationProperties, new TestKafkaProperties(), prop -> {
		});
		try {
			provisioningProvider.afterPropertiesSet();
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
		return new KafkaTestBinder(kafkaBinderConfigurationProperties,
				provisioningProvider, dlqPartitionFunction, dlqDestinationResolver);
	}

	@SuppressWarnings("unchecked")
	private KafkaBinderConfigurationProperties createConfigurationProperties() {
		var binderConfiguration = new KafkaBinderConfigurationProperties(
				new TestKafkaProperties(), mock(ObjectProvider.class));
		binderConfiguration.setBrokers(embeddedKafka.getBrokersAsString());
		return binderConfiguration;
	}

	private int partitionSize(String topic) {
		return consumerFactory().createConsumer().partitionsFor(topic).size();
	}

	private void invokeCreateTopic(String topic, int partitions, int replicationFactor)
			throws Exception {

		var newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
		CreateTopicsResult topics = adminClient
				.createTopics(Collections.singletonList(newTopic));
		topics.all().get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS);
	}

	private String getKafkaOffsetHeaderKey() {
		return KafkaHeaders.OFFSET;
	}

	@BeforeEach
	public void init() {
		String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
		if (multiplier != null) {
			timeoutMultiplier = Double.parseDouble(multiplier);
		}

		Map<String, Object> adminConfigs = new HashMap<>();
		adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
				embeddedKafka.getBrokersAsString());
		adminClient = AdminClient.create(adminConfigs);
	}

	private int invokePartitionSize(String topic) throws Throwable {

		DescribeTopicsResult describeTopicsResult = adminClient
				.describeTopics(Collections.singletonList(topic));
		KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.allTopicNames();
		Map<String, TopicDescription> stringTopicDescriptionMap = all
				.get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS);
		var topicDescription = stringTopicDescriptionMap.get(topic);
		return topicDescription.partitions().size();
	}

	@Override
	protected boolean usesExplicitRouting() {
		return false;
	}

	@Override
	protected String getClassUnderTestName() {
		return CLASS_UNDER_TEST_NAME;
	}

	@Override
	public Spy spyOn(final String name) {
		throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
	}

	private ConsumerFactory<byte[], byte[]> consumerFactory() {
		Map<String, Object> props = new HashMap<>();
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
				configurationProperties.getKafkaConnectionString());
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
		Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
		Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();

		return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void bindersAdmin() throws Exception {
		KafkaBinderConfigurationProperties props = createConfigurationProperties();
		props.getConfiguration().put(AdminClientConfig.CLIENT_ID_CONFIG, "binder");
		props.setEnableObservation(true);
		Binder binder = getBinder(props);
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("admin.0",
				moduleOutputChannel, producerBindingProperties.getProducer());

		Binding<MessageChannel> consumerBinding = binder.bindConsumer("admin.0",
				"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
				consumerProperties);

		assertThat(
				KafkaTestUtils.getPropertyValue(producerBinding, "lifecycle.kafkaTemplate.kafkaAdmin", KafkaAdmin.class)
						.getConfigurationProperties()).contains(entry(AdminClientConfig.CLIENT_ID_CONFIG, "binder"));
		assertThat(KafkaTestUtils
				.getPropertyValue(consumerBinding, "lifecycle.messageListenerContainer.kafkaAdmin", KafkaAdmin.class)
				.getConfigurationProperties()).contains(entry(AdminClientConfig.CLIENT_ID_CONFIG, "binder"));

		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void defaultHeaderMapper() throws Exception {
		Binder binder = getBinder();

		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension()
				.setTrustedPackages(new String[] { "org.springframework.cloud.stream.binder.kafka" });

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
				moduleOutputChannel, producerBindingProperties.getProducer());

		Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
				"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
				consumerProperties);
		binderBindUnbindLatency();

		var pojoHeader = new Pojo("testing");
		var message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo")
				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
				.setHeader("foo", pojoHeader).build();

		moduleOutputChannel.send(message);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		Assertions.assertThat(inboundMessageRef.get()).isNotNull();
		Assertions.assertThat(inboundMessageRef.get().getPayload())
				.isEqualTo("foo".getBytes());
		Assertions
				.assertThat(inboundMessageRef.get().getHeaders()
						.get(MessageHeaders.CONTENT_TYPE))
				.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
		Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo"))
				.isInstanceOf(Pojo.class);
		var actual = (Pojo) inboundMessageRef.get().getHeaders().get("foo");
		Assertions.assertThat(actual.field).isEqualTo(pojoHeader.field);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void customHeaderMapper() throws Exception {

		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
		binderConfiguration.setHeaderMapperBeanName("headerMapper");

		var kafkaTopicProvisioner = new KafkaTopicProvisioner(
				binderConfiguration, new TestKafkaProperties(), prop -> {
		});
		try {
			kafkaTopicProvisioner.afterPropertiesSet();
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
		var binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
		((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
				KafkaHeaderMapper.class, () -> new KafkaHeaderMapper() {
					@Override
					public void fromHeaders(MessageHeaders headers, Headers target) {
						target.add(new RecordHeader("custom-header", "foobar".getBytes()));
					}

					@Override
					public void toHeaders(Headers source, Map<String, Object> target) {
						if (source.headers("custom-header").iterator().hasNext()) {
							target.put("custom-header", source.headers("custom-header").iterator().next().value());
						}

					}
				});

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
				moduleOutputChannel, producerProperties);

		Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
				"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
				consumerProperties);
		binderBindUnbindLatency();

		var message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo")
				.setHeader("foo", MimeTypeUtils.TEXT_PLAIN).build();

		moduleOutputChannel.send(message);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		Assertions.assertThat(inboundMessageRef.get()).isNotNull();
		Assertions.assertThat(inboundMessageRef.get().getPayload())
				.isEqualTo("foo".getBytes());
		Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo"))
				.isNull();
		Assertions.assertThat(inboundMessageRef.get().getHeaders().get("custom-header"))
				.isEqualTo("foobar".getBytes());
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void wellKnownHeaderMapperWithBeanNameKafkaHeaderMapper() throws Exception {

		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();

		var kafkaTopicProvisioner = new KafkaTopicProvisioner(
				binderConfiguration, new TestKafkaProperties(), prop -> {
		});
		try {
			kafkaTopicProvisioner.afterPropertiesSet();
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
		var binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
		((GenericApplicationContext) binder.getApplicationContext()).registerBean("kafkaBinderHeaderMapper",
				KafkaHeaderMapper.class, () -> new BinderHeaderMapper() {
					@Override
					public void fromHeaders(MessageHeaders headers, Headers target) {
						target.add(new RecordHeader("custom-header", "foobar".getBytes()));
						super.fromHeaders(headers, target);
					}

					@Override
					public void toHeaders(Headers source, Map<String, Object> target) {
						if (source.headers("custom-header").iterator().hasNext()) {
							target.put("custom-header", source.headers("custom-header").iterator().next().value());
						}

					}
				});

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
				moduleOutputChannel, producerProperties);

		Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
				"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
				consumerProperties);
		binderBindUnbindLatency();

		var message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo")
				.setHeader("foo", MimeTypeUtils.TEXT_PLAIN).build();

		moduleOutputChannel.send(message);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		Assertions.assertThat(inboundMessageRef.get()).isNotNull();
		Assertions.assertThat(inboundMessageRef.get().getPayload())
				.isEqualTo("foo".getBytes());
		Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo"))
				.isNull();
		Assertions.assertThat(inboundMessageRef.get().getHeaders().get("custom-header"))
				.isEqualTo("foobar".getBytes());
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testSendAndReceiveNoOriginalContentType(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();

		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));
		Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
				moduleOutputChannel, producerBindingProperties.getProducer());

		consumerProperties.getExtension()
				.setTrustedPackages(new String[] { "org.springframework.util" });
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
				"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
				consumerProperties);
		binderBindUnbindLatency();

		// TODO: Will have to fix the MimeType to convert to byte array once this issue
		// has been resolved:
		// https://github.com/spring-projects/spring-kafka/issues/424
		var message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo")
				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
		moduleOutputChannel.send(message);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef.get()).isNotNull();
		assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo".getBytes());
		assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
				.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testSendAndReceive(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();
		BindingProperties outputBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				outputBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var mmc = new MessagingMessageConverter();
		((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext())
				.registerBean("tSARmmc", MessagingMessageConverter.class, () -> mmc);
		consumerProperties.getExtension().setConverterBeanName("tSARmmc");

		Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
				moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
				"testSendAndReceive", moduleInputChannel, consumerProperties);
		assertThat(KafkaTestUtils.getPropertyValue(consumerBinding,
				"lifecycle.messageListenerContainer.applicationContext")).isNotNull();
		var message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo".getBytes(StandardCharsets.UTF_8))
				.setHeader(MessageHeaders.CONTENT_TYPE,
						MimeTypeUtils.APPLICATION_OCTET_STREAM)
				.build();

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef.get()).isNotNull();
		assertThat(
				new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8))
						.isEqualTo("foo");
		assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
				.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM);

		Map<String, TopicInformation> topicsInUse = ((KafkaTestBinder) binder)
				.getCoreBinder().getTopicsInUse();
		assertThat(topicsInUse.keySet()).contains("foo.bar");
		TopicInformation topic = topicsInUse.get("foo.bar");
		assertThat(topic.isConsumerTopic()).isTrue();
		assertThat(topic.consumerGroup()).isEqualTo("testSendAndReceive");

		assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, "lifecycle.recordListener.messageConverter"))
				.isSameAs(mmc);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void testSendAndReceiveBatch() throws Exception {
		Binder binder = getBinder();
		BindingProperties outputBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				outputBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setBatchMode(true);
		consumerProperties.getExtension().getConfiguration().put("fetch.min.bytes", "1000");
		consumerProperties.getExtension().getConfiguration().put("fetch.max.wait.ms", "5000");
		consumerProperties.getExtension().getConfiguration().put("max.poll.records", "2");
		BatchMessagingMessageConverter bmmc = new BatchMessagingMessageConverter();
		((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext())
				.registerBean("tSARBbmmc", BatchMessagingMessageConverter.class, () -> bmmc);
		consumerProperties.getExtension().setConverterBeanName("tSARBbmmc");
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("c.batching",
				moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("c.batching",
				"testSendAndReceiveBatch", moduleInputChannel, consumerProperties);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo".getBytes(StandardCharsets.UTF_8))
				.setHeader(KafkaHeaders.PARTITION, 0)
				.build();

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		message = MessageBuilder
				.withPayload("bar".getBytes(StandardCharsets.UTF_8))
				.setHeader(KafkaHeaders.PARTITION, 0)
				.build();
		moduleOutputChannel.send(message);
		CountDownLatch latch = new CountDownLatch(1);
		AtomicReference<Message<List<byte[]>>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.compareAndSet(null, (Message<List<byte[]>>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(10, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef.get()).isNotNull();
		List<byte[]> payload = inboundMessageRef.get().getPayload();
		assertThat(payload.get(0)).isEqualTo("foo".getBytes());
		if (payload.size() > 1) { // it's a race as to whether we'll get them both or just one.
			assertThat(payload.get(1)).isEqualTo("bar".getBytes());
		}

		assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, "lifecycle.batchListener.batchMessageConverter"))
				.isSameAs(bmmc);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void testSendAndReceiveBatchWithDlqEnabled() throws Exception {
		Binder binder = getBinder();
		BindingProperties outputBindingProperties = createProducerBindingProperties(
			createProducerProperties());
		DirectChannel moduleOutputChannel = createBindableChannel("output",
			outputBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setBatchMode(true);
		consumerProperties.getExtension().setEnableDlq(true);
		consumerProperties.getExtension().setDlqName("tsarbwde-dlq-topic");
		consumerProperties.getExtension().getConfiguration().put("fetch.min.bytes", "1000");
		consumerProperties.getExtension().getConfiguration().put("fetch.max.wait.ms", "5000");
		consumerProperties.getExtension().getConfiguration().put("max.poll.records", "2");
		BatchMessagingMessageConverter bmmc = new BatchMessagingMessageConverter();
		((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext())
			.registerBean("tSARBbmmc", BatchMessagingMessageConverter.class, () -> bmmc);
		consumerProperties.getExtension().setConverterBeanName("tSARBbmmc");
		DirectChannel moduleInputChannel = createBindableChannel("input",
			createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("tsarbwde.batching",
			moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("tsarbwde.batching",
			"testSendAndReceiveBatch", moduleInputChannel, consumerProperties);

		var dlqChannel = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
			"tsarbwde-dlq-topic", null, dlqChannel,
			dlqConsumerProperties);

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);

		var testMessagePayload = "test." + UUID.randomUUID();
		var message = org.springframework.integration.support.MessageBuilder
			.withPayload(testMessagePayload.getBytes(StandardCharsets.UTF_8))
			.setHeader(KafkaHeaders.PARTITION, 0)
			.build();

		moduleOutputChannel.send(message);

		Message<?> receivedMessage = receive(dlqChannel, 30);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());

		producerBinding.unbind();
		consumerBinding.unbind();
		dlqConsumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

		// Native serialization for producer
		producerProperties.setUseNativeEncoding(true);
		Map<String, String> producerConfig = new HashMap<>();
		producerConfig.put("value.serializer",
				"org.apache.kafka.common.serialization.StringSerializer");
		producerProperties.getExtension().setConfiguration(producerConfig);

		BindingProperties outputBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				outputBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		// Native Deserialization for consumer
		consumerProperties.setUseNativeDecoding(true);
		Map<String, String> consumerConfig = new HashMap<>();
		consumerConfig.put("value.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");
		consumerProperties.getExtension().setConfiguration(consumerConfig);

		// Setting dlq producer properties on the consumer
		consumerProperties.getExtension()
				.setDlqProducerProperties(producerProperties.getExtension());
		consumerProperties.getExtension().setEnableDlq(true);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
				moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
				"testDlqWithNativeEncoding-1", moduleInputChannel, consumerProperties);

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);

		// Consumer for the DLQ destination
		var dlqChannel = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);

		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				"error.foo.bar." + "testDlqWithNativeEncoding-1", null, dlqChannel,
				dlqConsumerProperties);
		binderBindUnbindLatency();

		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo").build();

		moduleOutputChannel.send(message);
		Message<?> receivedMessage = receive(dlqChannel, 30);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getPayload()).isEqualTo("foo".getBytes());
		Awaitility.await().until(() -> handler.getInvocationCount() == consumerProperties.getMaxAttempts() + 1);
		assertThat(receivedMessage.getHeaders()
				.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
						.isEqualTo("foo.bar".getBytes(StandardCharsets.UTF_8));
		assertThat(new String((byte[]) receivedMessage.getHeaders()
				.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
						"Dispatcher failed to deliver Message");
		assertThat(receivedMessage.getHeaders()
				.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
		binderBindUnbindLatency();

		dlqConsumerBinding.unbind();

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void testDlqWithNativeDecodingOnConsumerButMissingSerializerOnDlqProducer()
			throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		// Native serialization for producer
		producerProperties.setUseNativeEncoding(true);
		Map<String, String> producerConfig = new HashMap<>();
		producerConfig.put("value.serializer",
				"org.apache.kafka.common.serialization.StringSerializer");
		producerProperties.getExtension().setConfiguration(producerConfig);
		BindingProperties outputBindingProperties = createProducerBindingProperties(
				producerProperties);
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				outputBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		// Native Deserialization for consumer
		consumerProperties.setUseNativeDecoding(true);

		Map<String, String> consumerConfig = new HashMap<>();
		consumerConfig.put("value.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");

		// No Dlq producer properties set on the consumer with a native serializer.
		// This should cause an error for DLQ sending.

		consumerProperties.getExtension().setConfiguration(consumerConfig);
		consumerProperties.getExtension().setEnableDlq(true);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
				moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
				"testDlqWithNativeEncoding-2", moduleInputChannel, consumerProperties);

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);

		// Consumer for the DLQ destination
		var dlqChannel = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);

		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				"error.foo.bar." + "testDlqWithNativeEncoding-2", null, dlqChannel,
				dlqConsumerProperties);
		binderBindUnbindLatency();

		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo").build();

		moduleOutputChannel.send(message);

		Message<?> receivedMessage = dlqChannel.receive(5000);
		// Ensure that we didn't receive anything on DLQ because of serializer config
		// missing on dlq producer while native Decoding is enabled.
		assertThat(receivedMessage).isNull();

		binderBindUnbindLatency();

		dlqConsumerBinding.unbind();

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	// For more details on the context of this test: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/657
	@Test
	@SuppressWarnings("unchecked")
	void testDlqWithProducerPropertiesSetAtBinderLevel()
			throws Exception {

		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();

		Map<String, String> consumerProps = new HashMap<>();
		consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		binderConfiguration.setConsumerProperties(consumerProps);

		Map<String, String> producerProps = new HashMap<>();
		producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		binderConfiguration.setProducerProperties(producerProps);
		Binder binder = getBinder(binderConfiguration);

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setUseNativeEncoding(true);
		BindingProperties outputBindingProperties = createProducerBindingProperties(
				producerProperties);
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				outputBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setUseNativeDecoding(true);
		consumerProperties.getExtension().setEnableDlq(true);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
				moduleOutputChannel, outputBindingProperties.getProducer());
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
				"tdwcapsabl", moduleInputChannel, consumerProperties);

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);

		// Consumer for the DLQ destination
		var dlqChannel = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);

		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				"error.foo.bar." + "tdwcapsabl", null, dlqChannel,
				dlqConsumerProperties);
		binderBindUnbindLatency();

		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("foo").build();

		moduleOutputChannel.send(message);

		Message<?> receivedMessage = dlqChannel.receive(10000);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getPayload()).isEqualTo("foo");

		binderBindUnbindLatency();

		dlqConsumerBinding.unbind();

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	void dlqAndRetry() throws Exception {
		testDlqGuts(true, null, null, false, false);
	}

	@Test
	@Disabled
	void dlqAndRetryWithNonRetryableException() throws Exception {
		testDlqGuts(true, null, null, false, false, true, true);
	}

	@Test
	@Disabled
	void dlqAndRetryDefaultFalse() throws Exception {
		testDlqGuts(true, null, null, false, false, false, false);
	}

//	@Test
	void dlqAndRetryDefaultFalseWithRetryableException() throws Exception {
		testDlqGuts(true, null, null, false, false, false, true);
	}

	@Test
	@Disabled
	void dlqAndRetryTransactional() throws Exception {
		testDlqGuts(true, null, null, true, false);
	}

	@Test
	void dlqAndRetryWithNonRetryableExceptionTransactional() throws Exception {
		testDlqGuts(true, null, null, true, false, true, true);
	}

	@Test
	@Disabled
	void dlqAndRetryDefaultFalseTransactional() throws Exception {
		testDlqGuts(true, null, null, true, false, false, false);
	}

	@Test
	@Disabled
	void dlqAndRetryDefaultFalseWithRetryableExceptionTransactional() throws Exception {
		testDlqGuts(true, null, null, true, false, false, true);
	}

	@Test
	void dlq() throws Exception {
		testDlqGuts(false, null, 3, false, false);
	}

	//@Test
	void dlqWithDlqDestinationResolver() throws Exception {
		testDlqGuts(false, null, 3, false, true);
	}

	@Test
	void dlqTransactional() throws Exception {
		testDlqGuts(false, null, 3, true, false);
	}

	@Test
	void dlqNone() throws Exception {
		testDlqGuts(false, HeaderMode.none, 1, false, false);
	}

	@Test
	void dlqEmbedded() throws Exception {
		testDlqGuts(false, HeaderMode.embeddedHeaders, 3, false, false);
	}

	private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions,
			boolean transactional, boolean useDlqDestResolver) throws Exception {
		testDlqGuts(withRetry, headerMode, dlqPartitions, transactional,
				useDlqDestResolver, true, false);
	}

	private void testDlqGuts(boolean withRetry, HeaderMode headerMode,
			Integer dlqPartitions, boolean transactional, boolean useDlqDestResolver,
			boolean defaultRetryable, boolean useConfiguredRetryableException)
			throws Exception {

		int expectedDlqPartition = dlqPartitions == null ? 0 : dlqPartitions - 1;
		KafkaBinderConfigurationProperties binderConfig = createConfigurationProperties();
		if (transactional) {
			binderConfig.getTransaction().setTransactionIdPrefix("tx-");
			binderConfig.getTransaction().getProducer().getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "1");
			binderConfig.setRequiredAcks("all");
		}
		DlqPartitionFunction dlqPartitionFunction;
		if (Integer.valueOf(1).equals(dlqPartitions)) {
			dlqPartitionFunction = null; // test that ZERO_PARTITION is used
		}
		else if (dlqPartitions == null) {
			dlqPartitionFunction = (group, rec, ex) -> 0;
		}
		else {
			dlqPartitionFunction = (group, rec, ex) -> dlqPartitions - 1;
		}
		DlqDestinationResolver dlqDestinationResolver = null;
		if (useDlqDestResolver) {
			dlqDestinationResolver = (cr, e) -> "foo.dlq";
		}
		AbstractKafkaTestBinder binder = getBinder(binderConfig, dlqPartitionFunction, dlqDestinationResolver);

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.getExtension()
				.setHeaderPatterns(new String[] { MessageHeaders.CONTENT_TYPE });
		producerProperties.setHeaderMode(headerMode);
		producerProperties.setPartitionCount(2);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);
		consumerProperties.getExtension().setEnableDlq(true);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		consumerProperties.setHeaderMode(headerMode);
		consumerProperties.setMultiplex(true);
		consumerProperties.getExtension().setDlqPartitions(dlqPartitions);
		consumerProperties.setConcurrency(2);
		consumerProperties.populateBindingName("foobar");
		consumerProperties.setDefaultRetryable(defaultRetryable);
		consumerProperties.getRetryableExceptions().put(NumberFormatException.class,
				!defaultRetryable);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var dlqChannel = new QueueChannel();
		var handler = new FailingInvocationCountingMessageHandler(
				() -> useConfiguredRetryableException
						? new NumberFormatException("fail")
						: new RuntimeException("fail"));
		moduleInputChannel.subscribe(handler);

		long uniqueBindingId = System.currentTimeMillis();

		String producerName = "dlqTest." + uniqueBindingId + ".0";
		Binding<MessageChannel> producerBinding = binder.bindProducer(producerName,
				moduleOutputChannel, producerProperties);
		String consumerDest = producerName + ", " + producerName.replaceAll("0", "1");
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(consumerDest,
				"testGroup", moduleInputChannel, consumerProperties);

		AbstractMessageListenerContainer container = TestUtils.getPropertyValue(consumerBinding,
				"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
		assertThat(container.getContainerProperties().getTopicPartitions().length)
				.isEqualTo(4); // 2 topics 2 partitions each
		if (transactional) {
			assertThat(TestUtils.getPropertyValue(container.getAfterRollbackProcessor(), "kafkaTemplate")).isNotNull();
			assertThat(
					TestUtils.getPropertyValue(container.getAfterRollbackProcessor(), "commitRecovered", Boolean.class))
							.isTrue();
		}

		String dlqTopic = useDlqDestResolver ? "foo.dlq" : "error.dlqTest." + uniqueBindingId + ".0.testGroup";
		try (AdminClient admin = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
				embeddedKafka.getBrokersAsString()))) {
			if (useDlqDestResolver) {
				List<NewTopic> nonProvisionedDlqTopics = new ArrayList<>();
				var nTopic = new NewTopic(dlqTopic, 3, (short) 1);
				nonProvisionedDlqTopics.add(nTopic);
				admin.createTopics(nonProvisionedDlqTopics);
			}
			Map<String, TopicDescription> topicDescriptions = admin.describeTopics(Collections.singletonList(dlqTopic))
				.allTopicNames()
				.get(10, TimeUnit.SECONDS);
			assertThat(topicDescriptions).hasSize(1);
			assertThat(topicDescriptions.values().iterator().next().partitions())
				.hasSize(dlqPartitions == null ? 2 : dlqPartitions);
		}

		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);
		dlqConsumerProperties.setHeaderMode(headerMode);
		dlqConsumerProperties.getExtension().setTxCommitRecovered(false);

		ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(),
				"applicationContext", ApplicationContext.class);
		SubscribableChannel boundErrorChannel = context.getBean(binder.getBinder().getBinderIdentity() + ".foobar.errors-0", SubscribableChannel.class);
//		SubscribableChannel boundErrorChannel = context
//				.getBean(consumerDest + ".errors-0", SubscribableChannel.class);
		SubscribableChannel globalErrorChannel = context.getBean("errorChannel",
				SubscribableChannel.class);
		final AtomicReference<Message<?>> boundErrorChannelMessage = new AtomicReference<>();
		final AtomicReference<Message<?>> globalErrorChannelMessage = new AtomicReference<>();
		final var hasRecovererInCallStack = new AtomicBoolean(!withRetry);
		final var hasAfterRollbackProcessorInStack = new AtomicBoolean(!withRetry);
		boundErrorChannel.subscribe(message -> {
			boundErrorChannelMessage.set(message);
			String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
			hasRecovererInCallStack
					.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
			hasAfterRollbackProcessorInStack.set(stackTrace.contains("DefaultAfterRollbackProcessor"));
		});
		globalErrorChannel.subscribe(globalErrorChannelMessage::set);

		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				dlqTopic, null, dlqChannel,
				dlqConsumerProperties);
		binderBindUnbindLatency();
		if (transactional) {
			assertThat(TestUtils.getPropertyValue(dlqConsumerBinding,
					"lifecycle.messageListenerContainer.afterRollbackProcessor.kafkaTemplate")).isNotNull();
			assertThat(
					TestUtils.getPropertyValue(dlqConsumerBinding,
							"lifecycle.messageListenerContainer.afterRollbackProcessor.commitRecovered", Boolean.class))
							.isFalse();
		}

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes())
				.setHeader(KafkaHeaders.PARTITION, 1)
				.build();
		moduleOutputChannel.send(testMessage);

		Message<?> receivedMessage = receive(dlqChannel, 30);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
		if (HeaderMode.embeddedHeaders.equals(headerMode)) {
			assertThat(handler.getInvocationCount())
					.isEqualTo(consumerProperties.getMaxAttempts());

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
							.isEqualTo(producerName);

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION)).isEqualTo(1);

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET)).isEqualTo(0);

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
							.isEqualTo(TimestampType.CREATE_TIME.toString());

			assertThat(((String) receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
							"Dispatcher failed to deliver Message");
			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();
			assertThat(receivedMessage.getHeaders()
					.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(expectedDlqPartition);
		}
		else if (!HeaderMode.none.equals(headerMode)) {
			boolean shouldHaveRetried = withRetry && !useConfiguredRetryableException;
			assertThat(handler.getInvocationCount())
					.isEqualTo(
							shouldHaveRetried ? consumerProperties.getMaxAttempts() + 1 : 1);

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
							.isEqualTo(producerName.getBytes(StandardCharsets.UTF_8));

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION)).isEqualTo(
							ByteBuffer.allocate(Integer.BYTES).putInt(1).array());

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET)).isEqualTo(
							ByteBuffer.allocate(Long.BYTES).putLong(0).array());

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
							.isEqualTo(TimestampType.CREATE_TIME.toString().getBytes());

			if (transactional) {
				assertThat(new String((byte[]) receivedMessage.getHeaders()
						.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
								"Transaction rollback limit exceeded");
			}
			else {
				assertThat(new String((byte[]) receivedMessage.getHeaders()
						.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith(
								"Dispatcher failed to deliver Message; fail");
			}

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();

			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();

			assertThat(receivedMessage.getHeaders()
					.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(expectedDlqPartition);
		}
		else {
			assertThat(receivedMessage.getHeaders()
					.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isNull();
		}
		binderBindUnbindLatency();

		// verify we got a message on the dedicated error channel and the global (via
		// bridge)
		assertThat(boundErrorChannelMessage.get()).isNotNull();
//		assertThat(globalErrorChannelMessage.get()).isNotNull();
//		assertThat(hasRecovererInCallStack.get()).isEqualTo(withRetry && !transactional);
		assertThat(hasAfterRollbackProcessorInStack.get()).isEqualTo(transactional);

		dlqConsumerBinding.unbind();
		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void defaultAutoCommitOnErrorWithDlq() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setMaxAttempts(3);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);
		consumerProperties.getExtension().setEnableDlq(true);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
				producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
				consumerProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);
		var dlqChannel = new QueueChannel();
		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
				dlqConsumerProperties);

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes()).build();
		moduleOutputChannel.send(testMessage);

		Message<?> dlqMessage = receive(dlqChannel, 30);
		assertThat(dlqMessage).isNotNull();
		assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());

		// first attempt fails
		assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
		Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
				.next().getValue();
		assertThat(handledMessage).isNotNull();
		assertThat(
				new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
						.isEqualTo(testMessagePayload);
		assertThat(handler.getInvocationCount())
				.isEqualTo(consumerProperties.getMaxAttempts() + 1);
		binderBindUnbindLatency();
		dlqConsumerBinding.unbind();
		consumerBinding.unbind();

		// on the second attempt the message is not redelivered because the DLQ is set
		var successfulInputChannel = new QueueChannel();
		consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
				"testGroup", successfulInputChannel, consumerProperties);
		String testMessage2Payload = "test." + UUID.randomUUID().toString();
		Message<byte[]> testMessage2 = MessageBuilder
				.withPayload(testMessage2Payload.getBytes()).build();
		moduleOutputChannel.send(testMessage2);

		Message<?> receivedMessage = receive(successfulInputChannel);
		assertThat(receivedMessage.getPayload())
				.isEqualTo(testMessage2Payload.getBytes());

		binderBindUnbindLatency();
		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void retriesWithoutDlq() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setMaxAttempts(2);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
				producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
				consumerProperties);

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes()).build();
		moduleOutputChannel.send(testMessage);

		Thread.sleep(3000);

		// Since we don't have a DLQ, assert that we are invoking the handler exactly the same number of times
		// as set in consumerProperties.maxAttempt and not the default set by Spring Kafka (10 times).
		assertThat(handler.getInvocationCount())
				.isEqualTo(consumerProperties.getMaxAttempts() + 1);
		binderBindUnbindLatency();
		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void commonErrorHandlerBeanNameOnConsumerBinding() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		var latch = new CountDownLatch(1);
		CommonErrorHandler commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0L)) {
			@Override
			public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
										Consumer<?, ?> consumer, MessageListenerContainer container) {
				super.handleRemaining(thrownException, records, consumer, container);
				latch.countDown();
			}
		};

		ConfigurableApplicationContext context = TestUtils.getPropertyValue(binder,
				"binder.applicationContext", ConfigurableApplicationContext.class);
		context.getBeanFactory().registerSingleton("fooCommonErrorHandler", commonErrorHandler);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setMaxAttempts(2);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);
		consumerProperties.getExtension().setCommonErrorHandlerBeanName("fooCommonErrorHandler");

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
				producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
				consumerProperties);

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes()).build();
		moduleOutputChannel.send(testMessage);

		Thread.sleep(3000);

		//Assertions for the CommonErrorHandler configured on the consumer binding (commonErrorHandlerBeanName).
		assertThat(KafkaTestUtils.getPropertyValue(consumerBinding,
				"lifecycle.messageListenerContainer.commonErrorHandler")).isSameAs(commonErrorHandler);
		latch.await(10, TimeUnit.SECONDS);

		binderBindUnbindLatency();
		consumerBinding.unbind();
		producerBinding.unbind();
	}


	//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
	@Test
	@SuppressWarnings("unchecked")
	void autoCommitOnErrorWhenManualAcknowledgement() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setMaxAttempts(3);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);
		//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
		// (if DLQ is enabled, which is, in this case).
		consumerProperties.getExtension().setEnableDlq(true);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
				producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
				consumerProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);
		var dlqChannel = new QueueChannel();
		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
				"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
				dlqConsumerProperties);

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes()).build();
		moduleOutputChannel.send(testMessage);

		Message<?> dlqMessage = receive(dlqChannel, 30);
		assertThat(dlqMessage).isNotNull();
		assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());

		// first attempt fails
		assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
		Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
				.next().getValue();
		assertThat(handledMessage).isNotNull();
		assertThat(
				new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo(testMessagePayload);
		assertThat(handler.getInvocationCount())
				.isEqualTo(consumerProperties.getMaxAttempts() + 1);
		binderBindUnbindLatency();
		dlqConsumerBinding.unbind();
		consumerBinding.unbind();

		// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
		var successfulInputChannel = new QueueChannel();
		consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
				"testGroup", successfulInputChannel, consumerProperties);
		String testMessage2Payload = "test1." + UUID.randomUUID();
		Message<byte[]> testMessage2 = MessageBuilder
				.withPayload(testMessage2Payload.getBytes()).build();
		moduleOutputChannel.send(testMessage2);

		Message<?> receivedMessage = receive(successfulInputChannel);
		assertThat(receivedMessage.getPayload())
				.isEqualTo(testMessage2Payload.getBytes());

		binderBindUnbindLatency();
		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void configurableDlqName() throws Exception {
		Binder binder = getBinder();

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setMaxAttempts(3);
		consumerProperties.setBackOffInitialInterval(100);
		consumerProperties.setBackOffMaxInterval(150);
		consumerProperties.getExtension().setEnableDlq(true);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		String dlqName = "dlqTest";
		consumerProperties.getExtension().setDlqName(dlqName);
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				producerProperties);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				producerBindingProperties);

		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		var handler = new FailingInvocationCountingMessageHandler();
		moduleInputChannel.subscribe(handler);

		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
				producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
				consumerProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
		dlqConsumerProperties.setMaxAttempts(1);
		var dlqChannel = new QueueChannel();
		Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(dlqName, null,
				dlqChannel, dlqConsumerProperties);

		String testMessagePayload = "test." + UUID.randomUUID();
		Message<byte[]> testMessage = MessageBuilder
				.withPayload(testMessagePayload.getBytes()).build();
		moduleOutputChannel.send(testMessage);

		Message<?> dlqMessage = receive(dlqChannel, 30);
		assertThat(dlqMessage).isNotNull();
		assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());

		// first attempt fails
		assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
		Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
				.next().getValue();
		assertThat(handledMessage).isNotNull();
		assertThat(
				new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
						.isEqualTo(testMessagePayload);
		assertThat(handler.getInvocationCount())
				.isEqualTo(consumerProperties.getMaxAttempts() + 1);
		binderBindUnbindLatency();
		dlqConsumerBinding.unbind();
		consumerBinding.unbind();

		// on the second attempt the message is not redelivered because the DLQ is set
		var successfulInputChannel = new QueueChannel();
		consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
				"testGroup", successfulInputChannel, consumerProperties);
		String testMessage2Payload = "test." + UUID.randomUUID().toString();
		Message<byte[]> testMessage2 = MessageBuilder
				.withPayload(testMessage2Payload.getBytes()).build();
		moduleOutputChannel.send(testMessage2);

		Message<?> receivedMessage = receive(successfulInputChannel);
		assertThat(receivedMessage.getPayload())
				.isEqualTo(testMessage2Payload.getBytes());

		binderBindUnbindLatency();
		consumerBinding.unbind();
		producerBinding.unbind();
	}

	@Test
	public void validateKafkaTopicName() {
		try {
			KafkaTopicUtils.validateTopicName("foo:bar");
			fail("Expecting IllegalArgumentException");
		}
		catch (Exception e) {
			// TODO: handle exception
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	// TODO: This test needs to be rethought - sending byte[] without explicit content
	// type
	// - yet being converted by the json converter
	void compression() throws Exception {
		final KafkaProducerProperties.CompressionType[] codecs = new KafkaProducerProperties.CompressionType[] {
				KafkaProducerProperties.CompressionType.none,
				KafkaProducerProperties.CompressionType.gzip,
				KafkaProducerProperties.CompressionType.snappy,
				KafkaProducerProperties.CompressionType.zstd};
		var testPayload = new byte[2048];
		Arrays.fill(testPayload, (byte) 65);
		Binder binder = getBinder();
		for (KafkaProducerProperties.CompressionType codec : codecs) {
			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
			producerProperties.getExtension().setCompressionType(
					KafkaProducerProperties.CompressionType.valueOf(codec.toString()));

			DirectChannel moduleOutputChannel = createBindableChannel("output",
					createProducerBindingProperties(producerProperties));

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

			DirectChannel moduleInputChannel = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			Binding<MessageChannel> producerBinding = binder.bindProducer(
					"testCompression", moduleOutputChannel, producerProperties);
			Binding<MessageChannel> consumerBinding = binder.bindConsumer(
					"testCompression", "test", moduleInputChannel, consumerProperties);
			Message<?> message = org.springframework.integration.support.MessageBuilder
					.withPayload(testPayload).build();
			// Let the consumer actually bind to the producer before sending a msg
			binderBindUnbindLatency();
			moduleOutputChannel.send(message);
			var latch = new CountDownLatch(1);

			AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
			moduleInputChannel.subscribe(message1 -> {
				try {
					inboundMessageRef.set((Message<byte[]>) message1);
				}
				finally {
					latch.countDown();
				}
			});
			Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

			assertThat(inboundMessageRef.get()).isNotNull();
			assertThat(inboundMessageRef.get().getPayload()).containsExactly(testPayload);
			producerBinding.unbind();
			consumerBinding.unbind();
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void earliest() throws Exception {
		Binding<MessageChannel> producerBinding = null;
		Binding<MessageChannel> consumerBinding = null;

		try {
			Binder binder = getBinder();
			BindingProperties producerBindingProperties = createProducerBindingProperties(
					createProducerProperties());
			DirectChannel output = createBindableChannel("output",
					producerBindingProperties);

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);
			consumerProperties.getExtension()
					.setStartOffset(KafkaConsumerProperties.StartOffset.earliest);
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);

			DirectChannel input1 = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			String testTopicName = UUID.randomUUID().toString();
			producerBinding = binder.bindProducer(testTopicName, output,
					producerBindingProperties.getProducer());
			String testPayload1 = "foo-" + UUID.randomUUID().toString();
			output.send(new GenericMessage<>(testPayload1.getBytes()));

			consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1,
					consumerProperties);
			var latch = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef1 = new AtomicReference<>();
			MessageHandler messageHandler = message1 -> {
				try {
					inboundMessageRef1.set((Message<byte[]>) message1);
				}
				finally {
					latch.countDown();
				}
			};
			input1.subscribe(messageHandler);
			Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
			assertThat(inboundMessageRef1.get()).isNotNull();
			String testPayload2 = "foo-" + UUID.randomUUID().toString();
			input1.unsubscribe(messageHandler);
			output.send(new GenericMessage<>(testPayload2.getBytes()));

			var latch1 = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef2 = new AtomicReference<>();
			input1.subscribe(message1 -> {
				try {
					inboundMessageRef2.set((Message<byte[]>) message1);
				}
				finally {
					latch1.countDown();
				}
			});
			Assert.isTrue(latch1.await(5, TimeUnit.SECONDS), "Failed to receive message");

			assertThat(inboundMessageRef2.get()).isNotNull();
			assertThat(new String(inboundMessageRef2.get().getPayload(),
					StandardCharsets.UTF_8)).isEqualTo(testPayload2);
			Thread.sleep(2000);
			producerBinding.unbind();
			consumerBinding.unbind();
		}
		finally {
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
			if (producerBinding != null) {
				producerBinding.unbind();
			}
		}
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testSendAndReceiveMultipleTopics(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();

		DirectChannel moduleOutputChannel1 = createBindableChannel("output1",
				createProducerBindingProperties(createProducerProperties()));
		DirectChannel moduleOutputChannel2 = createBindableChannel("output2",
				createProducerBindingProperties(createProducerProperties()));

		var moduleInputChannel = new QueueChannel();

		ExtendedProducerProperties<KafkaProducerProperties> producer1Props = createProducerProperties();
		producer1Props.getExtension().setUseTopicHeader(true);

		Binding<MessageChannel> producerBinding1 = binder.bindProducer("foo.x",
				moduleOutputChannel1, producer1Props);
		Binding<MessageChannel> producerBinding2 = binder.bindProducer("foo.y",
				moduleOutputChannel2, createProducerProperties());

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		Binding<MessageChannel> consumerBinding1 = binder.bindConsumer("foo.x", "test1",
				moduleInputChannel, consumerProperties);
		Binding<MessageChannel> consumerBinding2 = binder.bindConsumer("foo.y", "test2",
				moduleInputChannel, consumerProperties);

		String testPayload1 = "foo1";
		Message<?> message1 = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload1.getBytes()).build();
		String testPayload2 = "foo2";
		Message<?> message2 = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload2.getBytes()).build();
		String testPayload3 = "foo3";
		Message<?> message3 = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload3.getBytes())
				.setHeader(KafkaHeaders.TOPIC, "foo.y")
				.build();

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel1.send(message1);
		moduleOutputChannel2.send(message2);
		moduleOutputChannel1.send(message3);

		Message<?>[] messages = new Message[3];
		messages[0] = receive(moduleInputChannel);
		messages[1] = receive(moduleInputChannel);
		messages[2] = receive(moduleInputChannel);

		assertThat(messages[0]).isNotNull();
		assertThat(messages[1]).isNotNull();
		assertThat(messages[2]).isNotNull();
		assertThat(messages).extracting("payload").containsExactlyInAnyOrder(
				testPayload1.getBytes(), testPayload2.getBytes(), testPayload3.getBytes());
		Arrays.asList(messages).forEach(message -> {
			if (new String((byte[]) message.getPayload()).equals("foo1")) {
				assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo.x");
			}
			else {
				assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo.y");
			}
		});

		producerBinding1.unbind();
		producerBinding2.unbind();

		consumerBinding1.unbind();
		consumerBinding2.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void manualAckSucceedsWhenAutoCommitOffsetIsTurnedOff() throws Exception {
		Binder binder = getBinder();

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(createProducerProperties()));
		var moduleInputChannel = new QueueChannel();

		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff",
				moduleOutputChannel, createProducerProperties());

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension().setAckMode(ContainerProperties.AckMode.MANUAL);

		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", "test",
				moduleInputChannel, consumerProperties);

		String testPayload1 = "foo" + UUID.randomUUID().toString();
		Message<?> message1 = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload1.getBytes()).build();

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message1);

		Message<?> receivedMessage = receive(moduleInputChannel);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT))
				.isNotNull();
		Acknowledgment acknowledgment = receivedMessage.getHeaders()
				.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
		try {
			acknowledgment.acknowledge();
		}
		catch (Exception e) {
			fail("Acknowledge must not throw an exception");
		}
		finally {
			producerBinding.unbind();
			consumerBinding.unbind();
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void manualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder()
			throws Exception {
		Binder binder = getBinder();

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(createProducerProperties()));
		var moduleInputChannel = new QueueChannel();

		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder",
				moduleOutputChannel, createProducerProperties());

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder",
				"test", moduleInputChannel, consumerProperties);

		AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(
				consumerBinding, "lifecycle.messageListenerContainer",
				AbstractMessageListenerContainer.class);
		assertThat(container.getContainerProperties().getAckMode())
				.isEqualTo(ContainerProperties.AckMode.BATCH);

		String testPayload1 = "foo" + UUID.randomUUID().toString();
		Message<?> message1 = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload1.getBytes()).build();

		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message1);

		Message<?> receivedMessage = receive(moduleInputChannel);
		assertThat(receivedMessage).isNotNull();
		assertThat(receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT))
				.isNull();

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testTwoRequiredGroups(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));

		String testDestination = "testDestination"
				+ UUID.randomUUID().toString().replace("-", "");

		producerProperties.setRequiredGroups("test1", "test2");
		Binding<MessageChannel> producerBinding = binder.bindProducer(testDestination,
				output, producerProperties);

		String testPayload = "foo-" + UUID.randomUUID().toString();
		output.send(new GenericMessage<>(testPayload.getBytes()));

		var inbound1 = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		consumerProperties.getExtension().setAckMode(ContainerProperties.AckMode.RECORD);
		Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination,
				"test1", inbound1, consumerProperties);
		var inbound2 = new QueueChannel();
		Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination,
				"test2", inbound2, consumerProperties);

		AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(
				consumerBinding2, "lifecycle.messageListenerContainer",
				AbstractMessageListenerContainer.class);
		assertThat(container.getContainerProperties().getAckMode())
				.isEqualTo(ContainerProperties.AckMode.RECORD);

		Message<?> receivedMessage1 = receive(inbound1);
		assertThat(receivedMessage1).isNotNull();
		assertThat(new String((byte[]) receivedMessage1.getPayload(),
				StandardCharsets.UTF_8)).isEqualTo(testPayload);
		Message<?> receivedMessage2 = receive(inbound2);
		assertThat(receivedMessage2).isNotNull();
		assertThat(new String((byte[]) receivedMessage2.getPayload(),
				StandardCharsets.UTF_8)).isEqualTo(testPayload);

		consumerBinding1.unbind();
		consumerBinding2.unbind();
		producerBinding.unbind();
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setConcurrency(2);
		consumerProperties.setInstanceIndex(0);
		consumerProperties.setInstanceCount(3);
		consumerProperties.setPartitioned(true);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		var input0 = new QueueChannel();
		input0.setBeanName("test.input0S");
		Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test",
				input0, consumerProperties);
		consumerProperties.setInstanceIndex(1);
		var input1 = new QueueChannel();
		input1.setBeanName("test.input1S");
		Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test",
				input1, consumerProperties);
		consumerProperties.setInstanceIndex(2);
		var input2 = new QueueChannel();
		input2.setBeanName("test.input2S");
		Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test",
				input2, consumerProperties);

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setPartitionKeyExpression(
				spelExpressionParser.parseExpression("payload"));
		producerProperties.setPartitionSelectorExpression(
				spelExpressionParser.parseExpression("hashCode()"));
		producerProperties.setPartitionCount(3);
		invokeCreateTopic("output", 6, 1);

		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		output.setBeanName("test.output");
		Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output,
				producerProperties);
		try {
			Object endpoint = extractEndpoint(outputBinding);
			assertThat(getEndpointRouting(endpoint))
					.contains(getExpectedRoutingBaseDestination("part.0", "test")
							+ "-' + headers['partition']");
		}
		catch (UnsupportedOperationException ignored) {
		}
		List<ChannelInterceptor> interceptors = output.getInterceptors();
		var count = new AtomicInteger();
		interceptors.forEach(interceptor -> {
			if (interceptor instanceof PartitioningInterceptor) {
				count.set(TestUtils.getPropertyValue(interceptor,
						"partitionHandler.partitionCount", Integer.class));
			}
		});
		assertThat(count.get()).isEqualTo(6);
		Message<Integer> message2 = org.springframework.integration.support.MessageBuilder
				.withPayload(2)
				.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
				.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
				.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
		output.send(message2);
		output.send(new GenericMessage<>(1));
		output.send(new GenericMessage<>(0));

		Message<?> receive0 = receive(input0);
		assertThat(receive0).isNotNull();
		Message<?> receive1 = receive(input1);
		assertThat(receive1).isNotNull();
		Message<?> receive2 = receive(input2);
		assertThat(receive2).isNotNull();

		Condition<Message<?>> correlationHeadersForPayload2 = new Condition<Message<?>>() {

			@Override
			public boolean matches(Message<?> value) {
				IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(
						value);
				return "foo".equals(accessor.getCorrelationId())
						&& 42 == accessor.getSequenceNumber()
						&& 43 == accessor.getSequenceSize();
			}
		};

		var om = new ObjectMapper();

		if (usesExplicitRouting()) {
			assertThat(om.readValue((byte[]) receive0.getPayload(), Integer.class))
					.isEqualTo(0);
			assertThat(om.readValue((byte[]) receive1.getPayload(), Integer.class))
					.isEqualTo(1);
			assertThat(om.readValue((byte[]) receive2.getPayload(), Integer.class))
					.isEqualTo(2);
			assertThat(receive2).has(correlationHeadersForPayload2);
		}
		else {
			List<Message<?>> receivedMessages = Arrays.asList(receive0, receive1,
					receive2);
			assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(
					new byte[] { 48 }, new byte[] { 49 }, new byte[] { 50 });
			Condition<Message<?>> payloadIs2 = new Condition<Message<?>>() {

				@Override
				public boolean matches(Message<?> value) {
					return om.readValue((byte[]) value.getPayload(), Integer.class).equals(2);
				}
			};
			assertThat(receivedMessages).filteredOn(payloadIs2).areExactly(1,
					correlationHeadersForPayload2);

		}
		input0Binding.unbind();
		input1Binding.unbind();
		input2Binding.unbind();
		outputBinding.unbind();
	}

	@Test
//	@Override
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void partitionedModuleJava() throws Exception {
		Binder binder = getBinder();

		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setConcurrency(2);
		consumerProperties.setInstanceCount(4);
		consumerProperties.setInstanceIndex(0);
		consumerProperties.setPartitioned(true);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		var input0 = new QueueChannel();
		input0.setBeanName("test.input0J");
		Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test",
				input0, consumerProperties);
		consumerProperties.setInstanceIndex(1);
		var input1 = new QueueChannel();
		input1.setBeanName("test.input1J");
		Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test",
				input1, consumerProperties);
		consumerProperties.setInstanceIndex(2);
		var input2 = new QueueChannel();
		input2.setBeanName("test.input2J");
		Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test",
				input2, consumerProperties);
		consumerProperties.setInstanceIndex(3);
		var input3 = new QueueChannel();
		input3.setBeanName("test.input3J");
		Binding<MessageChannel> input3Binding = binder.bindConsumer("partJ.0", "test",
				input3, consumerProperties);

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

		this.applicationContext.registerBean("pkExtractor",
				PartitionTestSupport.class, () -> new PartitionTestSupport());
		this.applicationContext.registerBean("pkSelector",
				PartitionTestSupport.class, () -> new PartitionTestSupport());
		producerProperties.setPartitionKeyExtractorName("pkExtractor");
		producerProperties.setPartitionSelectorName("pkSelector");
		producerProperties.setPartitionCount(3); // overridden to 8 on the actual topic
		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		output.setBeanName("test.output");
		Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output,
				producerProperties);
		if (usesExplicitRouting()) {
			Object endpoint = extractEndpoint(outputBinding);
			assertThat(getEndpointRouting(endpoint))
					.contains(getExpectedRoutingBaseDestination("partJ.0", "test")
							+ "-' + headers['partition']");
		}

		output.send(new GenericMessage<>(2));
		output.send(new GenericMessage<>(1));
		output.send(new GenericMessage<>(0));
		output.send(new GenericMessage<>(3));

		Message<?> receive0 = receive(input0);
		assertThat(receive0).isNotNull();
		Message<?> receive1 = receive(input1);
		assertThat(receive1).isNotNull();
		Message<?> receive2 = receive(input2);
		assertThat(receive2).isNotNull();
		Message<?> receive3 = receive(input3);
		assertThat(receive3).isNotNull();
		var om = new ObjectMapper();

		assertThat(om.readValue((byte[]) receive0.getPayload(), Integer.class))
				.isEqualTo(0);
		assertThat(om.readValue((byte[]) receive1.getPayload(), Integer.class))
				.isEqualTo(1);
		assertThat(om.readValue((byte[]) receive2.getPayload(), Integer.class))
				.isEqualTo(2);
		assertThat(om.readValue((byte[]) receive3.getPayload(), Integer.class))
				.isEqualTo(3);

		input0Binding.unbind();
		input1Binding.unbind();
		input2Binding.unbind();
		input3Binding.unbind();
		outputBinding.unbind();
	}

	@Test
	@Override
	@SuppressWarnings("unchecked")
	public void testAnonymousGroup(TestInfo testInfo) throws Exception {
		Binder binder = getBinder();
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel output = createBindableChannel("output", producerBindingProperties);
		Binding<MessageChannel> producerBinding = binder.bindProducer("defaultGroup.0",
				output, producerBindingProperties.getProducer());

		var input1 = new QueueChannel();
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		Binding<MessageChannel> binding1 = binder.bindConsumer("defaultGroup.0", null,
				input1, consumerProperties);

		var input2 = new QueueChannel();
		Binding<MessageChannel> binding2 = binder.bindConsumer("defaultGroup.0", null,
				input2, consumerProperties);
		// Since we don't provide any topic info, let Kafka bind the consumer successfully
		Thread.sleep(1000);
		var testPayload1 = "foo-" + UUID.randomUUID();
		output.send(new GenericMessage<>(testPayload1.getBytes()));

		Message<byte[]> receivedMessage1 = (Message<byte[]>) receive(input1);
		assertThat(receivedMessage1).isNotNull();
		assertThat(new String(receivedMessage1.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo(testPayload1);

		Message<byte[]> receivedMessage2 = (Message<byte[]>) receive(input2);
		assertThat(receivedMessage2).isNotNull();
		assertThat(new String(receivedMessage2.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo(testPayload1);

		binding2.unbind();

		String testPayload2 = "foo-" + UUID.randomUUID().toString();
		output.send(new GenericMessage<>(testPayload2.getBytes()));

		binding2 = binder.bindConsumer("defaultGroup.0", null, input2,
				consumerProperties);
		// Since we don't provide any topic info, let Kafka bind the consumer successfully
		Thread.sleep(1000);
		String testPayload3 = "foo-" + UUID.randomUUID().toString();
		output.send(new GenericMessage<>(testPayload3.getBytes()));

		receivedMessage1 = (Message<byte[]>) receive(input1);
		assertThat(receivedMessage1).isNotNull();
		assertThat(new String(receivedMessage1.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo(testPayload2);
		receivedMessage1 = (Message<byte[]>) receive(input1);
		assertThat(receivedMessage1).isNotNull();
		assertThat(new String(receivedMessage1.getPayload(), StandardCharsets.UTF_8))
				.isNotNull();

		receivedMessage2 = (Message<byte[]>) receive(input2);
		assertThat(receivedMessage2).isNotNull();
		assertThat(new String(receivedMessage2.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo(testPayload3);

		Map<String, TopicInformation> topicsInUse = ((KafkaTestBinder) binder)
				.getCoreBinder().getTopicsInUse();
		assertThat(topicsInUse.keySet()).contains("defaultGroup.0");
		TopicInformation topic = topicsInUse.get("defaultGroup.0");
		assertThat(topic.isConsumerTopic()).isTrue();
		assertThat(topic.consumerGroup()).startsWith("anonymous");

		producerBinding.unbind();
		binding1.unbind();
		binding2.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void partitionedModuleJavaWithRawMode() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.setHeaderMode(HeaderMode.none);
		this.applicationContext.registerBean("pkExtractor",
				RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
		this.applicationContext.registerBean("pkSelector",
				RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
		properties.setPartitionKeyExtractorName("pkExtractor");
		properties.setPartitionSelectorName("pkSelector");
		properties.setPartitionCount(6);

		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(properties));
		output.setBeanName("test.output");
		Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.raw.0", output,
				properties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setConcurrency(2);
		consumerProperties.setInstanceCount(3);
		consumerProperties.setInstanceIndex(0);
		consumerProperties.setPartitioned(true);
		consumerProperties.setHeaderMode(HeaderMode.none);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		var input0 = new QueueChannel();
		input0.setBeanName("test.input0J");
		Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.raw.0", "test",
				input0, consumerProperties);
		consumerProperties.setInstanceIndex(1);
		var input1 = new QueueChannel();
		input1.setBeanName("test.input1J");
		Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.raw.0", "test",
				input1, consumerProperties);
		consumerProperties.setInstanceIndex(2);
		var input2 = new QueueChannel();
		input2.setBeanName("test.input2J");
		Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.raw.0", "test",
				input2, consumerProperties);

		output.send(new GenericMessage<>(new byte[] { (byte) 0 }));
		output.send(new GenericMessage<>(new byte[] { (byte) 1 }));
		output.send(new GenericMessage<>(new byte[] { (byte) 2 }));

		Message<?> receive0 = receive(input0);
		assertThat(receive0).isNotNull();
		Message<?> receive1 = receive(input1);
		assertThat(receive1).isNotNull();
		Message<?> receive2 = receive(input2);
		assertThat(receive2).isNotNull();

		assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0],
				((byte[]) receive1.getPayload())[0], ((byte[]) receive2.getPayload())[0]))
						.containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);

		input0Binding.unbind();
		input1Binding.unbind();
		input2Binding.unbind();
		outputBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void partitionedModuleSpELWithRawMode() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.setPartitionKeyExpression(
				spelExpressionParser.parseExpression("payload[0]"));
		properties.setPartitionSelectorExpression(
				spelExpressionParser.parseExpression("hashCode()"));
		properties.setPartitionCount(6);
		properties.setHeaderMode(HeaderMode.none);

		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(properties));
		output.setBeanName("test.output");
		Binding<MessageChannel> outputBinding = binder.bindProducer("part.raw.0", output,
				properties);
		try {
			Object endpoint = extractEndpoint(outputBinding);
			assertThat(getEndpointRouting(endpoint))
					.contains(getExpectedRoutingBaseDestination("part.raw.0", "test")
							+ "-' + headers['partition']");
		}
		catch (UnsupportedOperationException ignored) {
		}

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setConcurrency(2);
		consumerProperties.setInstanceIndex(0);
		consumerProperties.setInstanceCount(3);
		consumerProperties.setPartitioned(true);
		consumerProperties.setHeaderMode(HeaderMode.none);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		var input0 = new QueueChannel();
		input0.setBeanName("test.input0S");
		Binding<MessageChannel> input0Binding = binder.bindConsumer("part.raw.0", "test",
				input0, consumerProperties);
		consumerProperties.setInstanceIndex(1);
		var input1 = new QueueChannel();
		input1.setBeanName("test.input1S");
		Binding<MessageChannel> input1Binding = binder.bindConsumer("part.raw.0", "test",
				input1, consumerProperties);
		consumerProperties.setInstanceIndex(2);
		var input2 = new QueueChannel();
		input2.setBeanName("test.input2S");
		Binding<MessageChannel> input2Binding = binder.bindConsumer("part.raw.0", "test",
				input2, consumerProperties);

		Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder
				.withPayload(new byte[] { 2 })
				.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID,
						"kafkaBinderTestCommonsDelegate")
				.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
				.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
		output.send(message2);
		output.send(new GenericMessage<>(new byte[] { 1 }));
		output.send(new GenericMessage<>(new byte[] { 0 }));
		Message<?> receive0 = receive(input0);
		assertThat(receive0).isNotNull();
		Message<?> receive1 = receive(input1);
		assertThat(receive1).isNotNull();
		Message<?> receive2 = receive(input2);
		assertThat(receive2).isNotNull();
		assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0],
				((byte[]) receive1.getPayload())[0], ((byte[]) receive2.getPayload())[0]))
						.containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
		input0Binding.unbind();
		input1Binding.unbind();
		input2Binding.unbind();
		outputBinding.unbind();
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void partitionedNative() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.setPartitionCount(6);

		DirectChannel output = createBindableChannel("output",
				createProducerBindingProperties(properties));
		output.setBeanName("test.output");
		Binding<MessageChannel> outputBinding = binder.bindProducer("partNative.raw.0",
				output, properties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		var input0 = new QueueChannel();
		input0.setBeanName("test.inputNative");
		Binding<MessageChannel> inputBinding = binder.bindConsumer("partNative.raw.0",
				"test", input0, consumerProperties);

		output.send(new GenericMessage<>("foo".getBytes(),
				Collections.singletonMap(KafkaHeaders.PARTITION, 5)));

		Message<?> received = receive(input0);
		assertThat(received).isNotNull();

		assertThat(received.getPayload()).isEqualTo("foo".getBytes());
		assertThat(received.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION))
				.isEqualTo(5);

		inputBinding.unbind();
		outputBinding.unbind();
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void testSendAndReceiveWithRawMode() throws Exception {
		Binder binder = getBinder();

		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setHeaderMode(HeaderMode.none);
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setHeaderMode(HeaderMode.none);
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));
		Binding<MessageChannel> producerBinding = binder.bindProducer("raw.0",
				moduleOutputChannel, producerProperties);

		Binding<MessageChannel> consumerBinding = binder.bindConsumer("raw.0", "test",
				moduleInputChannel, consumerProperties);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("testSendAndReceiveWithRawMode".getBytes()).build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);

		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		moduleInputChannel.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef.get()).isNotNull();
		assertThat(
				new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8))
						.isEqualTo("testSendAndReceiveWithRawMode");
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void allowNonTransactionalProducerSetting() throws Exception {
		AbstractKafkaTestBinder binder = getBinder();
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				new BindingProperties());
		ExtendedProducerProperties<KafkaProducerProperties> producerProps = new ExtendedProducerProperties<>(
				new KafkaProducerProperties());
		producerProps.getExtension().setAllowNonTransactional(true);
		Binding<MessageChannel> producerBinding = binder.bindProducer("allwNonTrans.0",
				moduleOutputChannel, producerProps);

		KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding,
				"lifecycle", KafkaProducerMessageHandler.class);

		final var kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate");

		assertThat(kafkaTemplate.isAllowNonTransactional()).isTrue();

		producerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void producerErrorChannel() throws Exception {
		AbstractKafkaTestBinder binder = getBinder();
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				new BindingProperties());
		ExtendedProducerProperties<KafkaProducerProperties> producerProps = new ExtendedProducerProperties<>(
				new KafkaProducerProperties());
		producerProps.setHeaderMode(HeaderMode.none);
		producerProps.setErrorChannelEnabled(true);
		producerProps.populateBindingName("foobar");
		Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
				moduleOutputChannel, producerProps);
		final Message<?> message = MessageBuilder.withPayload("bad")
				.setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
		SubscribableChannel ec = binder.getApplicationContext().getBean(binder.getBinder().getBinderIdentity() + ".foobar.errors", SubscribableChannel.class);
		final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
		final var latch = new CountDownLatch(2);
		ec.subscribe(message1 -> {
			errorMessage.set(message1);
			latch.countDown();
		});
		SubscribableChannel globalEc = binder.getApplicationContext().getBean(
				IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
				SubscribableChannel.class);
		globalEc.subscribe(message12 -> latch.countDown());
		KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding,
				"lifecycle", KafkaProducerMessageHandler.class);
		final var fooException = new RuntimeException("foo");
		final AtomicReference<Object> sent = new AtomicReference<>();
		new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate",
				new KafkaTemplate(mock(ProducerFactory.class)) {

					@Override // SIK < 2.3
					public CompletableFuture<SendResult> send(String topic, Object payload) {
						sent.set(payload);
						CompletableFuture<SendResult> future = new CompletableFuture<>();
						future.completeExceptionally(fooException);
						return future;
					}

					@Override // SIK 2.3+
					public CompletableFuture<SendResult> send(ProducerRecord record) {
						sent.set(record.value());
						CompletableFuture<SendResult> future = new CompletableFuture<>();
						future.completeExceptionally(fooException);
						return future;
					}

				});

		moduleOutputChannel.send(message);
		assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
		assertThat(errorMessage.get().getPayload())
				.isInstanceOf(KafkaSendFailureException.class);
		KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage
				.get().getPayload();
		assertThat(exception.getCause()).isSameAs(fooException);
		assertThat(new String((byte[]) exception.getFailedMessage().getPayload(),
				StandardCharsets.UTF_8)).isEqualTo(message.getPayload());
		assertThat(exception.getRecord().value()).isSameAs(sent.get());
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoCreateTopicsEnabledSucceeds() throws Exception {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
		configurationProperties.setAutoCreateTopics(true);
		Binder binder = getBinder(configurationProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		String testTopicName = "nonexisting" + System.currentTimeMillis();
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<?> binding = binder.bindConsumer(testTopicName, "test",
				moduleInputChannel, consumerProperties);
		binding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void customPartitionCountOverridesDefaultIfLarger() throws Exception {
		var testPayload = new byte[2048];
		Arrays.fill(testPayload, (byte) 65);
		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
		binderConfiguration.setMinPartitionCount(10);
		Binder binder = getBinder(binderConfiguration);
		QueueChannel moduleInputChannel = new QueueChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setPartitionCount(10);
		producerProperties.setPartitionKeyExpression(new LiteralExpression("foo"));

		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
				consumerProperties);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload).build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		Message<?> inbound = receive(moduleInputChannel);
		assertThat(inbound).isNotNull();
		assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);

		assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(10);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void customPartitionCountDoesNotOverridePartitioningIfSmaller()
			throws Exception {
		var testPayload = new byte[2048];
		Arrays.fill(testPayload, (byte) 65);
		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
		binderConfiguration.setMinPartitionCount(6);
		Binder binder = getBinder(binderConfiguration);
		var moduleInputChannel = new QueueChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setPartitionCount(5);
		producerProperties.setPartitionKeyExpression(
				spelExpressionParser.parseExpression("payload"));
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		long uniqueBindingId = System.currentTimeMillis();
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
				consumerProperties);
		Thread.sleep(1000);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload).build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		Message<?> inbound = receive(moduleInputChannel);
		assertThat(inbound).isNotNull();
		assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);

		assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(6);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void dynamicKeyExpression() throws Exception {
		Binder binder = getBinder(createConfigurationProperties());
		var moduleInputChannel = new QueueChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.getExtension().getConfiguration().put("key.serializer",
				StringSerializer.class.getName());
		producerProperties.getExtension().setMessageKeyExpression(
				spelExpressionParser.parseExpression("headers.key"));
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		String uniqueBindingId = UUID.randomUUID().toString();
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
				consumerProperties);
		Thread.sleep(1000);
		Message<?> message = MessageBuilder.withPayload("somePayload")
				.setHeader("key", "myDynamicKey").build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		Message<?> inbound = receive(moduleInputChannel);
		assertThat(inbound).isNotNull();
		var receivedKey = new String(inbound.getHeaders()
				.get(KafkaHeaders.RECEIVED_KEY, byte[].class));
		assertThat(receivedKey).isEqualTo("myDynamicKey");
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void customPartitionCountOverridesPartitioningIfLarger() throws Exception {
		var testPayload = new byte[2048];
		Arrays.fill(testPayload, (byte) 65);
		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
		binderConfiguration.setMinPartitionCount(4);
		Binder binder = getBinder(binderConfiguration);

		var moduleInputChannel = new QueueChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.setPartitionCount(5);
		producerProperties.setPartitionKeyExpression(
				spelExpressionParser.parseExpression("payload"));
		DirectChannel moduleOutputChannel = createBindableChannel("output",
				createProducerBindingProperties(producerProperties));
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		long uniqueBindingId = System.currentTimeMillis();
		Binding<MessageChannel> producerBinding = binder.bindProducer(
				"foo" + uniqueBindingId + ".0", moduleOutputChannel, producerProperties);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(
				"foo" + uniqueBindingId + ".0", null, moduleInputChannel,
				consumerProperties);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload(testPayload).build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel.send(message);
		Message<?> inbound = receive(moduleInputChannel);
		assertThat(inbound).isNotNull();
		assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
		assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(5);
		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void defaultConsumerStartsAtEarliest() throws Exception {
		Binder binder = getBinder(createConfigurationProperties());

		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel output = createBindableChannel("output", producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);

		DirectChannel input1 = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		String testTopicName = UUID.randomUUID().toString();
		Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
				output, createProducerProperties());
		String testPayload1 = "foo-" + UUID.randomUUID().toString();
		output.send(new GenericMessage<>(testPayload1.getBytes()));

		Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName,
				"startOffsets", input1, consumerProperties);

		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef1 = new AtomicReference<>();
		MessageHandler messageHandler = message1 -> {
			try {
				inboundMessageRef1.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		};
		input1.subscribe(messageHandler);
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
		assertThat(inboundMessageRef1.get()).isNotNull();
		assertThat(
				new String(inboundMessageRef1.get().getPayload(), StandardCharsets.UTF_8))
						.isEqualTo(testPayload1);

		String testPayload2 = "foo-" + UUID.randomUUID().toString();
		input1.unsubscribe(messageHandler);
		output.send(new GenericMessage<>(testPayload2.getBytes()));

		var latch1 = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef2 = new AtomicReference<>();
		input1.subscribe(message1 -> {
			try {
				inboundMessageRef2.set((Message<byte[]>) message1);
			}
			finally {
				latch1.countDown();
			}
		});
		Assert.isTrue(latch1.await(5, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef2.get()).isNotNull();
		assertThat(
				new String(inboundMessageRef2.get().getPayload(), StandardCharsets.UTF_8))
						.isEqualTo(testPayload2);

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void resume() throws Exception {
		Binding<MessageChannel> producerBinding = null;
		Binding<MessageChannel> consumerBinding = null;

		try {
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			Binder binder = getBinder(configurationProperties);

			BindingProperties producerBindingProperties = createProducerBindingProperties(
					createProducerProperties());
			DirectChannel output = createBindableChannel("output",
					producerBindingProperties);

			DirectChannel input1 = createBindableChannel("input",
					createConsumerBindingProperties(createConsumerProperties()));

			String testTopicName = UUID.randomUUID().toString();
			producerBinding = binder.bindProducer(testTopicName, output,
					producerBindingProperties.getProducer());
			ExtendedConsumerProperties<KafkaConsumerProperties> firstConsumerProperties = createConsumerProperties();
			consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1,
					firstConsumerProperties);
			var latch = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef1 = new AtomicReference<>();
			MessageHandler messageHandler = message1 -> {
				try {
					inboundMessageRef1.set((Message<byte[]>) message1);
				}
				finally {
					latch.countDown();
				}
			};
			input1.subscribe(messageHandler);
			String testPayload1 = "foo1-" + UUID.randomUUID().toString();
			output.send(new GenericMessage<>(testPayload1));
			Assert.isTrue(latch.await(15, TimeUnit.SECONDS), "Failed to receive message");

			assertThat(inboundMessageRef1.get()).isNotNull();
			assertThat(inboundMessageRef1.get().getPayload()).isNotNull();
			input1.unsubscribe(messageHandler);
			var latch1 = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef2 = new AtomicReference<>();
			MessageHandler messageHandler1 = message1 -> {
				try {
					inboundMessageRef2.set((Message<byte[]>) message1);
				}
				finally {
					latch1.countDown();
				}
			};
			input1.subscribe(messageHandler1);
			String testPayload2 = "foo2-" + UUID.randomUUID().toString();
			output.send(new GenericMessage<>(testPayload2.getBytes()));
			Assert.isTrue(latch1.await(15, TimeUnit.SECONDS),
					"Failed to receive message");
			assertThat(inboundMessageRef2.get()).isNotNull();
			assertThat(inboundMessageRef2.get().getPayload()).isNotNull();
			consumerBinding.unbind();

			Thread.sleep(2000);
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1,
					consumerProperties);
			input1.unsubscribe(messageHandler1);
			var latch2 = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef3 = new AtomicReference<>();
			MessageHandler messageHandler2 = message1 -> {
				try {
					inboundMessageRef3.set((Message<byte[]>) message1);
				}
				finally {
					latch2.countDown();
				}
			};
			input1.subscribe(messageHandler2);
			String testPayload3 = "foo3-" + UUID.randomUUID().toString();
			output.send(new GenericMessage<>(testPayload3.getBytes()));
			Assert.isTrue(latch2.await(15, TimeUnit.SECONDS),
					"Failed to receive message");
			assertThat(inboundMessageRef3.get()).isNotNull();
			assertThat(new String(inboundMessageRef3.get().getPayload(),
					StandardCharsets.UTF_8)).isEqualTo(testPayload3);
		}
		finally {
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
			if (producerBinding != null) {
				producerBinding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void syncProducerMetadata() throws Exception {
		Binder binder = getBinder(createConfigurationProperties());
		var output = new DirectChannel();
		String testTopicName = UUID.randomUUID().toString();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.getExtension().setSync(true);
		Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
				output, properties);
		var accessor = new DirectFieldAccessor(
				extractEndpoint(producerBinding));
		KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
				.getWrappedInstance();
		assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync")
				.equals(Boolean.TRUE))
						.withFailMessage("Kafka Sync Producer should have been enabled.");
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void sendTimeoutExpressionProducerMetadata() throws Exception {
		Binder binder = getBinder(createConfigurationProperties());
		var output = new DirectChannel();
		String testTopicName = UUID.randomUUID().toString();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.getExtension().setSync(true);
		var parser = new SpelExpressionParser();
		Expression sendTimeoutExpression = parser.parseExpression("5000");
		properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression);
		Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
				output, properties);
		var accessor = new DirectFieldAccessor(
				extractEndpoint(producerBinding));
		KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
				.getWrappedInstance();
		assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sendTimeoutExpression")
				.equals(sendTimeoutExpression));
		producerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic()
			throws Exception {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
		configurationProperties.setAutoCreateTopics(false);
		Binder binder = getBinder(configurationProperties);
		BindingProperties producerBindingProperties = createProducerBindingProperties(
				createProducerProperties());
		DirectChannel output = createBindableChannel("output", producerBindingProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		DirectChannel input = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		String testTopicName = "createdByBroker-" + System.currentTimeMillis();

		Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
				output, producerBindingProperties.getProducer());

		String testPayload = "foo1-" + UUID.randomUUID().toString();
		output.send(new GenericMessage<>(testPayload));

		Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName,
				"test", input, consumerProperties);
		var latch = new CountDownLatch(1);
		AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
		input.subscribe(message1 -> {
			try {
				inboundMessageRef.set((Message<byte[]>) message1);
			}
			finally {
				latch.countDown();
			}
		});
		Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

		assertThat(inboundMessageRef.get()).isNotNull();
		assertThat(
				new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8))
						.isEqualTo(testPayload);

		producerBinding.unbind();
		consumerBinding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoConfigureTopicsDisabledSucceedsIfTopicExisting()
			throws Throwable {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		String testTopicName = "existing" + System.currentTimeMillis();
		invokeCreateTopic(testTopicName, 5, 1);
		configurationProperties.setAutoCreateTopics(false);
		Binder binder = getBinder(configurationProperties);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		DirectChannel input = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));
		Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test",
				input, consumerProperties);
		binding.unbind();
	}

	@Test
	@SuppressWarnings("unchecked")
	void partitionCountIncreasedIfAutoAddPartitionsSet() throws Throwable {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		String testTopicName = "existing" + System.currentTimeMillis();
		configurationProperties.setMinPartitionCount(6);
		configurationProperties.setAutoAddPartitions(true);
		Binder binder = getBinder(configurationProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel input = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<?> binding = binder.bindConsumer(testTopicName, "test", input,
				consumerProperties);
		binding.unbind();
		assertThat(invokePartitionSize(testTopicName)).isEqualTo(6);
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled()
			throws Throwable {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		String testTopicName = "existing" + System.currentTimeMillis();
		invokeCreateTopic(testTopicName, 1, 1);
		configurationProperties.setAutoAddPartitions(false);
		Binder binder = getBinder(configurationProperties);
		var context = new GenericApplicationContext();
		context.refresh();

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

		DirectChannel input = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		// this consumer must consume from partition 2
		consumerProperties.setInstanceCount(3);
		consumerProperties.setInstanceIndex(2);
		Binding binding = binder.bindConsumer(testTopicName, "test", input,
				consumerProperties);
		binding.unbind();
		assertThat(invokePartitionSize(testTopicName)).isEqualTo(1);
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled()
			throws Throwable {
		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		String testTopicName = "existing" + System.currentTimeMillis();
		invokeCreateTopic(testTopicName, 1, 1);
		configurationProperties.setAutoAddPartitions(false);
		Binder binder = getBinder(configurationProperties);
		var context = new GenericApplicationContext();
		context.refresh();

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel output = createBindableChannel("output",
				createConsumerBindingProperties(consumerProperties));
		// this consumer must consume from partition 2
		consumerProperties.setInstanceCount(3);
		consumerProperties.setInstanceIndex(2);
		consumerProperties.getExtension().setAutoRebalanceEnabled(false);
		Assertions.assertThatThrownBy(() -> {
			Binding binding = binder.bindConsumer(testTopicName, "test", output,
					consumerProperties);
			if (binding != null) {
				binding.unbind();
			}
		}).isInstanceOf(ProvisioningException.class);
	}

	@Test
	@SuppressWarnings("unchecked")
	void autoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly()
			throws Throwable {
		Binding<?> binding = null;
		try {
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

			String testTopicName = "existing" + System.currentTimeMillis();
			invokeCreateTopic(testTopicName, 6, 1);
			configurationProperties.setAutoAddPartitions(false);
			Binder binder = getBinder(configurationProperties);
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();

			DirectChannel input = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			// this consumer must consume from partition 2
			consumerProperties.setInstanceCount(3);
			consumerProperties.setInstanceIndex(2);
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);

			binding = binder.bindConsumer(testTopicName, "test-x", input,
					consumerProperties);

			ContainerProperties containerProps = TestUtils.getPropertyValue(
					binding,
					"lifecycle.messageListenerContainer.containerProperties",
					ContainerProperties.class);
			TopicPartitionOffset[] listenedPartitions = containerProps.getTopicPartitions();
			assertThat(listenedPartitions).hasSize(2);
			assertThat(listenedPartitions).contains(
					new TopicPartitionOffset(testTopicName, 2),
					new TopicPartitionOffset(testTopicName, 5));
			int partitions = invokePartitionSize(testTopicName);
			assertThat(partitions).isEqualTo(6);
		}
		finally {
			if (binding != null) {
				binding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void partitionCountNotReduced() throws Throwable {
		String testTopicName = "existing" + System.currentTimeMillis();

		KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();

		invokeCreateTopic(testTopicName, 6, 1);
		configurationProperties.setAutoAddPartitions(true);
		Binder binder = getBinder(configurationProperties);
		var context = new GenericApplicationContext();
		context.refresh();

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		DirectChannel input = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));

		Binding<?> binding = binder.bindConsumer(testTopicName, "test", input,
				consumerProperties);
		binding.unbind();

		assertThat(partitionSize(testTopicName)).isEqualTo(6);
	}

	@Test
	@SuppressWarnings({"unchecked", "rawtypes"})
	void consumerDefaultDeserializer() throws Throwable {
		Binding<?> binding = null;
		try {
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			String testTopicName = "existing" + System.currentTimeMillis();
			invokeCreateTopic(testTopicName, 5, 1);
			configurationProperties.setAutoCreateTopics(false);
			Binder binder = getBinder(configurationProperties);

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			DirectChannel input = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			binding = binder.bindConsumer(testTopicName, "test", input,
					consumerProperties);
			var bindingAccessor = new DirectFieldAccessor(binding);
			KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor
				.getPropertyValue("lifecycle");
			var adapterAccessor = new DirectFieldAccessor(adapter);
			ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor
				.getPropertyValue("messageListenerContainer");
			var containerAccessor = new DirectFieldAccessor(
				messageListenerContainer);
			DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor
				.getPropertyValue("consumerFactory");

			Map<String, Object> configProps = consumerFactory.getConfigurationProperties();

			assertThat(configProps.get("key.deserializer")).isEqualTo(org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
			assertThat(configProps.get("value.deserializer")).isEqualTo(org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
		}
		finally {
			if (binding != null) {
				binding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings({"unchecked", "rawtypes"})
	void consumerCustomDeserializer() throws Exception {
		Binding<?> binding = null;
		try {
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			Map<String, String> propertiesToOverride = configurationProperties
					.getConfiguration();
			propertiesToOverride.put("key.deserializer",
					"org.apache.kafka.common.serialization.StringDeserializer");
			propertiesToOverride.put("value.deserializer",
					"org.apache.kafka.common.serialization.LongDeserializer");
			configurationProperties.setConfiguration(propertiesToOverride);
			String testTopicName = "existing" + System.currentTimeMillis();
			configurationProperties.setAutoCreateTopics(false);
			Binder binder = getBinder(configurationProperties);

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			DirectChannel input = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			binding = binder.bindConsumer(testTopicName, "test", input,
					consumerProperties);

			var bindingAccessor = new DirectFieldAccessor(binding);
			KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor
				.getPropertyValue("lifecycle");
			var adapterAccessor = new DirectFieldAccessor(adapter);
			ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor
				.getPropertyValue("messageListenerContainer");
			var containerAccessor = new DirectFieldAccessor(
				messageListenerContainer);
			DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor
				.getPropertyValue("consumerFactory");

			Map<String, Object> configProps = consumerFactory.getConfigurationProperties();

			assertThat(configProps.get("key.deserializer")).isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");
			assertThat(configProps.get("value.deserializer")).isEqualTo("org.apache.kafka.common.serialization.LongDeserializer");

		}
		finally {
			if (binding != null) {
				binding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void nativeSerializationWithCustomSerializerDeserializer()
			throws Exception {
		Binding<?> producerBinding = null;
		Binding<?> consumerBinding = null;
		try {
			Integer testPayload = 10;
			Message<?> message = MessageBuilder.withPayload(testPayload).build();
			SubscribableChannel moduleOutputChannel = new DirectChannel();
			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			configurationProperties.setAutoAddPartitions(true);
			Binder binder = getBinder(configurationProperties);
			var moduleInputChannel = new QueueChannel();
			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
			producerProperties.setUseNativeEncoding(true);
			producerProperties.getExtension().getConfiguration().put("value.serializer",
					"org.apache.kafka.common.serialization.IntegerSerializer");
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);
			consumerProperties.getExtension().getConfiguration().put("value.deserializer",
					"org.apache.kafka.common.serialization.IntegerDeserializer");
			consumerProperties.getExtension()
					.setStandardHeaders(KafkaConsumerProperties.StandardHeaders.both);
			consumerBinding = binder.bindConsumer(testTopicName, "test",
					moduleInputChannel, consumerProperties);
			// Let the consumer actually bind to the producer before sending a msg
			binderBindUnbindLatency();
			moduleOutputChannel.send(message);
			Message<?> inbound = receive(moduleInputChannel, 500);
			assertThat(inbound).isNotNull();
			assertThat(inbound.getPayload()).isEqualTo(10);
			assertThat(inbound.getHeaders()).doesNotContainKey("contentType");
			assertThat(inbound.getHeaders().getId()).isNotNull();
			assertThat(inbound.getHeaders().getTimestamp()).isNotNull();
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void nativeSerializationWithCustomSerializerDeserializerBytesPayload()
			throws Exception {
		Binding<?> producerBinding = null;
		Binding<?> consumerBinding = null;
		try {
			var testPayload = new byte[1];
			Message<?> message = MessageBuilder.withPayload(testPayload)
					.setHeader(MessageHeaders.CONTENT_TYPE, "something/funky").build();
			SubscribableChannel moduleOutputChannel = new DirectChannel();
			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			configurationProperties.setAutoAddPartitions(true);
			Binder binder = getBinder(configurationProperties);
			ConfigurableApplicationContext context = TestUtils.getPropertyValue(binder,
					"binder.applicationContext", ConfigurableApplicationContext.class);
			var converter = new MessagingMessageConverter();
			converter.setGenerateMessageId(true);
			converter.setGenerateTimestamp(true);
			context.getBeanFactory().registerSingleton("testConverter", converter);
			var moduleInputChannel = new QueueChannel();
			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
			producerProperties.setUseNativeEncoding(true);
			producerProperties.getExtension().getConfiguration().put(
					ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
					ByteArraySerializer.class.getName());
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);
			consumerProperties.getExtension().getConfiguration().put(
					ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
					ByteArrayDeserializer.class.getName());
			consumerProperties.getExtension().setConverterBeanName("testConverter");
			consumerBinding = binder.bindConsumer(testTopicName, "test",
					moduleInputChannel, consumerProperties);
			// Let the consumer actually bind to the producer before sending a msg
			binderBindUnbindLatency();
			moduleOutputChannel.send(message);
			Message<?> inbound = receive(moduleInputChannel, 500);
			assertThat(inbound).isNotNull();
			assertThat(inbound.getPayload()).isEqualTo(new byte[1]);
			assertThat(inbound.getHeaders()).containsKey("contentType");
			assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString())
					.isEqualTo("something/funky");
			assertThat(inbound.getHeaders().getId()).isNotNull();
			assertThat(inbound.getHeaders().getTimestamp()).isNotNull();
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void builtinSerialization() throws Exception {
		Binding<?> producerBinding = null;
		Binding<?> consumerBinding = null;
		try {
			String testPayload = "test";
			Message<?> message = MessageBuilder.withPayload(testPayload)
					.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
					.build();

			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

			DirectChannel moduleOutputChannel = createBindableChannel("output",
					createProducerBindingProperties(producerProperties));

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.getExtension().setAutoRebalanceEnabled(false);

			DirectChannel moduleInputChannel = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			configurationProperties.setAutoAddPartitions(true);
			Binder binder = getBinder(configurationProperties);
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);

			consumerBinding = binder.bindConsumer(testTopicName, "test",
					moduleInputChannel, consumerProperties);
			// Let the consumer actually bind to the producer before sending a msg
			binderBindUnbindLatency();
			moduleOutputChannel.send(message);
			var latch = new CountDownLatch(1);
			AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
			moduleInputChannel.subscribe(message1 -> {
				try {
					inboundMessageRef.set((Message<byte[]>) message1);
				}
				finally {
					latch.countDown();
				}
			});
			Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");

			assertThat(inboundMessageRef.get()).isNotNull();
			assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
			assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
					MimeTypeUtils.TEXT_PLAIN);
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
		}
	}

	/*
	 * Verify that a consumer configured to handle embedded headers can handle all three
	 * variants.
	 */
	@Test
	@SuppressWarnings({ "unchecked", "rawtypes" })
	void testSendAndReceiveWithMixedMode() throws Exception {
		KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
		binderConfiguration.setHeaders("foo");
		Binder binder = getBinder(binderConfiguration);
		var moduleOutputChannel1 = new DirectChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties1 = createProducerProperties();
		producerProperties1.setHeaderMode(HeaderMode.embeddedHeaders);
		Binding<MessageChannel> producerBinding1 = binder.bindProducer("mixed.0",
				moduleOutputChannel1, producerProperties1);

		var moduleOutputChannel2 = new DirectChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties2 = createProducerProperties();
		producerProperties2.setHeaderMode(HeaderMode.headers);
		Binding<MessageChannel> producerBinding2 = binder.bindProducer("mixed.0",
				moduleOutputChannel2, producerProperties2);

		var moduleOutputChannel3 = new DirectChannel();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties3 = createProducerProperties();
		producerProperties3.setHeaderMode(HeaderMode.none);
		Binding<MessageChannel> producerBinding3 = binder.bindProducer("mixed.0",
				moduleOutputChannel3, producerProperties3);

		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
		DirectChannel moduleInputChannel = createBindableChannel("input",
				createConsumerBindingProperties(consumerProperties));
		var bridged = new QueueChannel();
		var bridge = new BridgeHandler();
		bridge.setOutputChannel(bridged);
		moduleInputChannel.subscribe(bridge);
		Binding<MessageChannel> consumerBinding = binder.bindConsumer("mixed.0", "test",
				moduleInputChannel, consumerProperties);
		Message<?> message = org.springframework.integration.support.MessageBuilder
				.withPayload("testSendAndReceiveWithMixedMode".getBytes())
				.setHeader("foo", "bar").build();
		// Let the consumer actually bind to the producer before sending a msg
		binderBindUnbindLatency();
		moduleOutputChannel1.send(message);
		moduleOutputChannel2.send(message);
		moduleOutputChannel3.send(message);
		Message<?> inbound = receive(bridged, 10_000);
		assertThat(inbound).isNotNull();
		assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo("testSendAndReceiveWithMixedMode");
		assertThat(inbound.getHeaders().get("foo")).isEqualTo("bar");
		assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT))
				.isNull();
		inbound = receive(bridged);
		assertThat(inbound).isNotNull();
		assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo("testSendAndReceiveWithMixedMode");
		assertThat(inbound.getHeaders().get("foo")).isEqualTo("bar");
		assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT))
				.isEqualTo(Boolean.TRUE);
		inbound = receive(bridged);
		assertThat(inbound).isNotNull();
		assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8))
				.isEqualTo("testSendAndReceiveWithMixedMode");
		assertThat(inbound.getHeaders().get("foo")).isNull();
		assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT))
				.isNull();

		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
				"testSendAndReceiveWithMixedMode", "false",
				embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		var cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		Consumer consumer = cf.createConsumer();
		consumer.subscribe(Collections.singletonList("mixed.0"));

		ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
		Iterator<ConsumerRecord> iterator = records.iterator();
		ConsumerRecord record = iterator.next();
		byte[] value = (byte[]) record.value();
		assertThat(value[0] & 0xff).isEqualTo(0xff);
		assertThat(record.headers().toArray().length).isEqualTo(0);
		record = iterator.next();
		value = (byte[]) record.value();
		assertThat(value[0] & 0xff).isNotEqualTo(0xff);
		assertThat(record.headers().toArray().length).isEqualTo(2);
		record = iterator.next();
		value = (byte[]) record.value();
		assertThat(value[0] & 0xff).isNotEqualTo(0xff);
		assertThat(record.headers().toArray().length).isEqualTo(0);
		consumer.close();

		producerBinding1.unbind();
		producerBinding2.unbind();
		producerBinding3.unbind();
		consumerBinding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void polledConsumer() throws Exception {
		KafkaTestBinder binder = getBinder();
		PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
				this.messageConverter);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProps = createConsumerProperties();
		consumerProps.setMultiplex(true);
		consumerProps.getExtension().setPollTimeout(1);
		Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
				"pollable,anotherOne", "group-polledConsumer", inboundBindTarget,
				consumerProps);
		Map<String, Object> producerProps = KafkaTestUtils
				.producerProps(embeddedKafka);
		var template = new KafkaTemplate(
				new DefaultKafkaProducerFactory<>(producerProps));
		template.send("pollable", "testPollable");
		boolean polled = inboundBindTarget.poll(m -> {
			assertThat(m.getPayload()).isEqualTo("testPollable");
		});
		int n = 0;
		while (n++ < 100 && !polled) {
			polled = inboundBindTarget.poll(m -> {
				assertThat(m.getPayload()).isEqualTo("testPollable".getBytes());
			});
			Thread.sleep(100);
		}
		assertThat(polled).isTrue();

		template.send("anotherOne", "testPollable2");
		polled = inboundBindTarget.poll(m -> {
			assertThat(m.getPayload()).isEqualTo("testPollable2");
		});
		n = 0;
		while (n++ < 100 && !polled) {
			polled = inboundBindTarget.poll(m -> {
				assertThat(m.getPayload()).isEqualTo("testPollable2".getBytes());
			});
			Thread.sleep(100);
		}
		assertThat(polled).isTrue();
		// Bind a second pollable consumer GH-521
		consumerProps.getExtension().getConfiguration()
				.put(ConsumerConfig.CLIENT_ID_CONFIG, "pollable2");
		PollableSource<MessageHandler> second = new DefaultPollableMessageSource(
				this.messageConverter);
		Binding<PollableSource<MessageHandler>> binding2 = binder.bindPollableConsumer(
				"pollable2", "group-polledConsumer2", second, consumerProps);
		second.poll(m -> {
		});
		binding.unbind();
		binding2.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void polledConsumerRequeue() throws Exception {
		KafkaTestBinder binder = getBinder();
		PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
				this.messageConverter);
		ExtendedConsumerProperties<KafkaConsumerProperties> properties = createConsumerProperties();
		Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
				"pollableRequeue", "group", inboundBindTarget, properties);
		Map<String, Object> producerProps = KafkaTestUtils
				.producerProps(embeddedKafka);
		var template = new KafkaTemplate(
				new DefaultKafkaProducerFactory<>(producerProps));
		template.send("pollableRequeue", "testPollable");
		try {
			boolean polled = false;
			int n = 0;
			while (n++ < 100 && !polled) {
				polled = inboundBindTarget.poll(m -> {
					assertThat(m.getPayload()).isEqualTo("testPollable".getBytes());
					throw new RequeueCurrentMessageException();
				});
			}
		}
		catch (MessageHandlingException e) {
			assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);
		}
		boolean polled = inboundBindTarget.poll(m -> {
			assertThat(m.getPayload()).isEqualTo("testPollable".getBytes());
		});
		assertThat(polled).isTrue();
		binding.unbind();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void polledConsumerWithDlq() throws Exception {
		KafkaTestBinder binder = getBinder();
		PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
				this.messageConverter);
		ExtendedConsumerProperties<KafkaConsumerProperties> properties = createConsumerProperties();
		properties.getExtension().setPollTimeout(1);
		properties.setMaxAttempts(2);
		properties.setBackOffInitialInterval(0);
		properties.getExtension().setEnableDlq(true);
		Map<String, Object> producerProps = KafkaTestUtils
				.producerProps(embeddedKafka);
		Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
				"pollableDlq", "group-pcWithDlq", inboundBindTarget, properties);
		var template = new KafkaTemplate(
				new DefaultKafkaProducerFactory<>(producerProps));
		template.send("pollableDlq", "testPollableDLQ");
		try {
			int n = 0;
			while (n++ < 100) {
				inboundBindTarget.poll(m -> {
					throw new RuntimeException("test DLQ");
				});
				Thread.sleep(100);
			}
		}
		catch (MessageHandlingException e) {
			assertThat(e.getCause().getMessage()).isEqualTo("test DLQ");
		}
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("dlq", "false",
				embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		Consumer consumer = cf.createConsumer();
		embeddedKafka.consumeFromAnEmbeddedTopic(consumer,
				"error.pollableDlq.group-pcWithDlq");
		ConsumerRecord deadLetter = KafkaTestUtils.getSingleRecord(consumer,
				"error.pollableDlq.group-pcWithDlq");
		assertThat(deadLetter).isNotNull();
		assertThat(deadLetter.value()).isEqualTo("testPollableDLQ");
		binding.unbind();
		consumer.close();
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Test
	void topicPatterns() throws Exception {
		try (AdminClient admin = AdminClient.create(
				Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
						embeddedKafka.getBrokersAsString()))) {
			admin.createTopics(Collections
					.singletonList(new NewTopic("topicPatterns.1", 1, (short) 1))).all()
					.get();
			Binder binder = getBinder();
			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.getExtension().setDestinationIsPattern(true);
			DirectChannel moduleInputChannel = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));
			final var latch = new CountDownLatch(1);
			final AtomicReference<String> topic = new AtomicReference<>();
			moduleInputChannel.subscribe(m -> {
				topic.set(m.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class));
				latch.countDown();
			});
			Binding<MessageChannel> consumerBinding = binder.bindConsumer(
					"topicPatterns\\..*", "testTopicPatterns", moduleInputChannel,
					consumerProperties);
			var pf = new DefaultKafkaProducerFactory(
					KafkaTestUtils.producerProps(embeddedKafka));
			var template = new KafkaTemplate(pf);
			template.send("topicPatterns.1", "foo");
			assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
			assertThat(topic.get()).isEqualTo("topicPatterns.1");
			consumerBinding.unbind();
			pf.destroy();
		}
	}

	@Test
	void sameTopicCannotBeProvisionedAgain() throws Throwable {
		var latch = new CountDownLatch(1);
		try (AdminClient admin = AdminClient.create(
				Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
						embeddedKafka.getBrokersAsString()))) {
			admin.createTopics(Collections
					.singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1))).all()
					.get();
			try {
				admin.createTopics(Collections
						.singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1)))
						.all().get();
				fail("Expecting TopicExistsException");
			}
			catch (Exception ex) {
				assertThat(ex.getCause() instanceof TopicExistsException).isTrue();
				latch.countDown();
			}
			latch.await(1, TimeUnit.SECONDS);
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void resetOffsets() throws Exception {
		Binding<?> producerBinding = null;
		Binding<?> consumerBinding = null;
		try {
			String testPayload = "test";

			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();

			DirectChannel moduleOutputChannel = createBindableChannel("output",
					createProducerBindingProperties(producerProperties));

			ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
			consumerProperties.setConcurrency(2);
			consumerProperties.setInstanceCount(5); // 10 partitions across 2 threads
			consumerProperties.getExtension().setResetOffsets(true);

			DirectChannel moduleInputChannel = createBindableChannel("input",
					createConsumerBindingProperties(consumerProperties));

			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
			configurationProperties.setAutoAddPartitions(true);
			Binder binder = getBinder(configurationProperties);
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);

			consumerBinding = binder.bindConsumer(testTopicName, "testReset",
					moduleInputChannel, consumerProperties);
			// Let the consumer actually bind to the producer before sending a msg
			binderBindUnbindLatency();
			IntStream.range(0, 10).forEach(i -> moduleOutputChannel.send(MessageBuilder.withPayload(testPayload)
					.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
					.setHeader(KafkaHeaders.PARTITION, i)
					.build()));
			var latch1 = new CountDownLatch(10);
			var latch2 = new CountDownLatch(20);
			AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
			var received = new AtomicInteger();
			moduleInputChannel.subscribe(message1 -> {
				try {
					inboundMessageRef.set((Message<byte[]>) message1);
				}
				finally {
					received.incrementAndGet();
					latch1.countDown();
					latch2.countDown();
				}
			});
			assertThat(latch1.await(10, TimeUnit.SECONDS)).as("Failed to receive messages").isTrue();
			consumerBinding.unbind();
			consumerBinding = binder.bindConsumer(testTopicName, "testReset",
					moduleInputChannel, consumerProperties);
			assertThat(latch2.await(10, TimeUnit.SECONDS)).as("Failed to receive message").isTrue();
			binder.bindConsumer(testTopicName + "-x", "testReset",
					moduleInputChannel, consumerProperties).unbind(); // cause another rebalance
			assertThat(received.get()).as("Unexpected reset").isEqualTo(20);

			assertThat(inboundMessageRef.get()).isNotNull();
			assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
			assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
					MimeTypeUtils.TEXT_PLAIN);
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
			if (consumerBinding != null) {
				consumerBinding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void recordMetadata() throws Exception {
		Binding<?> producerBinding = null;
		try {
			String testPayload = "test";

			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
			producerProperties.getExtension().setRecordMetadataChannel("metaChannel");
			var metaChannel = new QueueChannel();

			DirectChannel moduleOutputChannel = createBindableChannel("output",
					createProducerBindingProperties(producerProperties));

			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaTestBinder binder = getBinder();
			((GenericApplicationContext) binder.getApplicationContext()).registerBean("metaChannel",
					MessageChannel.class, () -> metaChannel);
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);
			moduleOutputChannel
					.send(new GenericMessage<>("foo", Collections.singletonMap(KafkaHeaders.PARTITION, 0)));
			Message<?> sendResult = metaChannel.receive(10_000);
			assertThat(sendResult).isNotNull();
			RecordMetadata meta = sendResult.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class);
			assertThat(meta).isNotNull()
				.hasFieldOrPropertyWithValue("offset", 0L);
			assertThat(meta.topic()).isEqualTo(testTopicName);
			assertThat(meta.partition()).isEqualTo(0);
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
		}
	}

	@Test
	@SuppressWarnings("unchecked")
	void messageKeyInPayload() throws Exception {
		Binding<?> producerBinding = null;
		try {
			String testPayload = "test";

			ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
			producerProperties.getExtension()
					.setMessageKeyExpression(spelExpressionParser.parseExpression("payload.field.bytes"));
			DirectChannel moduleOutputChannel = createBindableChannel("output",
					createProducerBindingProperties(producerProperties));

			String testTopicName = "existing" + System.currentTimeMillis();
			KafkaTestBinder binder = getBinder();
			producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
					producerProperties);
			moduleOutputChannel.addInterceptor(new ChannelInterceptor() {

				@Override
				public Message<?> preSend(Message<?> message, MessageChannel channel) {
					assertThat(message.getHeaders()
							.get(KafkaExpressionEvaluatingInterceptor.MESSAGE_KEY_HEADER))
									.isEqualTo("foo".getBytes());
					return message;
				}

			});
			moduleOutputChannel.send(
					new GenericMessage<>(new Pojo("foo"), Collections.singletonMap(KafkaHeaders.PARTITION, 0)));
		}
		finally {
			if (producerBinding != null) {
				producerBinding.unbind();
			}
		}
	}

	@Test
	void internalHeadersNotPropagated() throws Exception {
		testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
	}

	@Test
	void internalHeadersNotPropagatedCustomHeader() throws Exception {
		testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
	}

	@Test
	void internalHeadersNotPropagatedCustomMapper() throws Exception {
		testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
	}

	public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
			KafkaHeaderMapper mapper) throws Exception {

		KafkaTestBinder binder;
		if (mapper == null) {
			binder = getBinder();
		}
		else {
			KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
			binderConfiguration.setHeaderMapperBeanName("headerMapper");

			KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
					binderConfiguration, new TestKafkaProperties(), prop -> {
			});
			try {
				kafkaTopicProvisioner.afterPropertiesSet();
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
			binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
			((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
					KafkaHeaderMapper.class, () -> mapper);
		}
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.getExtension().setHeaderPatterns(headerPatterns);

		DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
		output.setBeanName(name + ".out");
		Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);

		QueueChannel input = new QueueChannel();
		input.setBeanName(name + ".in");
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
		Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
		var template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
		template.send(MessageBuilder.withPayload("internalHeaderPropagation")
				.setHeader(KafkaHeaders.TOPIC, name + ".0")
				.setHeader("someHeader", "someValue")
				.build());

		Message<?> consumed = input.receive(10_000);
		if (headerPatterns != null) {
			consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
		}
		output.send(consumed);

		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
				embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
		var cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		Consumer consumer = cf.createConsumer();
		consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
		ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
		assertThat(records.count()).isEqualTo(1);
		ConsumerRecord<?, ?> received = records.iterator().next();
		assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
		Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
		assertThat(header).isNull();
		header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
		assertThat(header).isNull();
		header = received.headers().lastHeader(MessageHeaders.ID);
		assertThat(header).isNull();
		header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
		assertThat(header).isNull();
		assertThat(received.headers().lastHeader("someHeader")).isNotNull();
		if (headerPatterns != null) {
			assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
		}

		producerBinding.unbind();
		consumerBinding.unbind();
		consumer.close();
	}

	@Test
	void noBrokerOverride() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
		producerProperties.getExtension().getConfiguration().put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "foo");
		BindingProperties outputBindingProperties = createProducerBindingProperties(producerProperties);
		DirectChannel moduleOutputChannel = createBindableChannel("output", outputBindingProperties);
		ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
		consumerProperties.getExtension().getConfiguration().put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "foo");
		BindingProperties consumerBindingProperties = createConsumerBindingProperties(consumerProperties);
		DirectChannel moduleInputChannel = createBindableChannel("input", consumerBindingProperties);

		assertThatExceptionOfType(BinderException.class).isThrownBy(() -> binder.bindProducer("foo.bar",
					moduleOutputChannel, outputBindingProperties.getProducer()))
				.withCauseExactlyInstanceOf(IllegalStateException.class);
		assertThatExceptionOfType(BinderException.class).isThrownBy(() -> binder.bindConsumer("foo.bar",
					"testSendAndReceive", moduleInputChannel, consumerProperties))
				.withCauseExactlyInstanceOf(IllegalStateException.class);
	}

	@Test
	void observationEnabledOnTheBinder() throws Exception {
		KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = createConfigurationProperties();
		kafkaBinderConfigurationProperties.setEnableObservation(true);
		AbstractKafkaTestBinder binder = getBinder(kafkaBinderConfigurationProperties);

		setupBindingAndAssert("enable-observation.1", binder);
	}

	@SuppressWarnings("rawtypes")
	@Test
	void observationEnabledThroughProducerMessageHandlerCustomizer() throws Exception {
		AbstractKafkaTestBinder binder = getBinder();
		KafkaMessageChannelBinder kafkaMessageChannelBinder = binder.getCoreBinder();
		kafkaMessageChannelBinder.setProducerMessageHandlerCustomizer(
			(ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler>) (handler, destinationName) ->
				handler.getKafkaTemplate().setObservationEnabled(true));

		setupBindingAndAssert("enable-observation.2", binder);
	}

	@Test
	void dynamicPartitionUpdates() throws Exception {
		Binder binder = getBinder();
		ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
		properties.setPartitionKeyExpression(
			spelExpressionParser.parseExpression("headers['partitionKey']"));
		properties.setDynamicPartitionUpdatesEnabled(true);
		properties.getExtension().getConfiguration().put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "1000");

		DirectChannel outputChannel = createBindableChannel("output",
			createProducerBindingProperties(createProducerProperties()));

		invokeCreateTopic("partitionTopic", 7, 1);

		Binding<MessageChannel> producerBinding = binder.bindProducer("partitionTopic",
			outputChannel, properties);

		KafkaMessageChannelBinder.ProducerConfigurationMessageHandler kafkaProducerMessageHandler =
			(KafkaMessageChannelBinder.ProducerConfigurationMessageHandler) TestUtils.getPropertyValue(
				producerBinding, "lifecycle", KafkaProducerMessageHandler.class);

		Field kafkaPartitionHandlerField = ReflectionUtils.findField(
			KafkaMessageChannelBinder.ProducerConfigurationMessageHandler.class, "kafkaPartitionHandler");

		PartitionHandler partitionHandler =
			(PartitionHandler) kafkaPartitionHandlerField.get(kafkaProducerMessageHandler);

		assertThat(partitionHandler).isNotNull();
		PartitionHandler kafkaPartitionHandlerSpy = spy(partitionHandler);

		kafkaPartitionHandlerField.set(kafkaProducerMessageHandler, kafkaPartitionHandlerSpy);

		// send message with initial partition size
		Message<?> message = MessageBuilder
			.withPayload("partitionTopic").setHeader("partitionKey", "123").build();
		outputChannel.send(message);

		// change partition size
		Map<String, NewPartitions> counts = new HashMap<>();
		counts.put("partitionTopic", NewPartitions.increaseTo(11));
		adminClient.createPartitions(counts);

		// wait until metadata is processed in the background
		Thread.sleep(2000);

		// send message again with new partition size
		Message<?> message2 = MessageBuilder
			.withPayload("partitionTopic").setHeader("partitionKey", "456").build();
		outputChannel.send(message2);

		verify(kafkaPartitionHandlerSpy).setPartitionCount(7);
		verify(kafkaPartitionHandlerSpy).setPartitionCount(11);
		verify(kafkaPartitionHandlerSpy, times(2)).determinePartition(ArgumentMatchers.any());
	}

	private void setupBindingAndAssert(String bindingName, AbstractKafkaTestBinder binder) throws Exception {
		ConfigurableApplicationContext applicationContext = (ConfigurableApplicationContext) binder.getApplicationContext();
		TestObservationRegistry observationRegistry = TestObservationRegistry.create();

		applicationContext.getBeanFactory().registerSingleton("test-registry", observationRegistry);

		DirectChannel moduleOutputChannel = createBindableChannel("output",
			new BindingProperties());
		ExtendedProducerProperties<KafkaProducerProperties> producerProps = new ExtendedProducerProperties<>(
			new KafkaProducerProperties());
		Binding<MessageChannel> producerBinding = binder.bindProducer(bindingName,
			moduleOutputChannel, producerProps);

		assertionsOnKafkaTemplate(observationRegistry, producerBinding);
	}

	@SuppressWarnings("rawtypes")
	private static void assertionsOnKafkaTemplate(TestObservationRegistry observationRegistry, Binding<MessageChannel> producerBinding) {
		KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding,
			"lifecycle", KafkaProducerMessageHandler.class);

		final var kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate");
		assertThat(kafkaTemplate).isNotNull();
		Boolean observationEnabled = (Boolean) new DirectFieldAccessor(kafkaTemplate).getPropertyValue("observationEnabled");
		assertThat(observationEnabled).isTrue();

		ObservationRegistry observationRegistry1 = (ObservationRegistry) new DirectFieldAccessor(kafkaTemplate).getPropertyValue("observationRegistry");
		assertThat(observationRegistry).isSameAs(observationRegistry1);

		producerBinding.unbind();
	}

	private final class FailingInvocationCountingMessageHandler
			implements MessageHandler {

		private final Supplier<? extends RuntimeException> exceptionProvider;

		private volatile int invocationCount;

		private final LinkedHashMap<Long, Message<?>> receivedMessages = new LinkedHashMap<>();

		private final CountDownLatch latch;

		private FailingInvocationCountingMessageHandler(int latchSize,
				Supplier<? extends RuntimeException> exceptionProvider) {
			latch = new CountDownLatch(latchSize);
			this.exceptionProvider = exceptionProvider;
		}

		private FailingInvocationCountingMessageHandler(
				Supplier<? extends RuntimeException> exceptionProvider) {
			this(1, exceptionProvider);
		}

		private FailingInvocationCountingMessageHandler(int latchSize) {
			this(latchSize, () -> new RuntimeException("fail"));
		}

		private FailingInvocationCountingMessageHandler() {
			this(1);
		}

		@Override
		public void handleMessage(Message<?> message) throws MessagingException {
			invocationCount++;
			Long offset = message.getHeaders()
					.get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
			// using the offset as key allows to ensure that we don't store duplicate
			// messages on retry
			if (!receivedMessages.containsKey(offset)) {
				receivedMessages.put(offset, message);
				latch.countDown();
			}
			throw exceptionProvider.get();
		}

		public LinkedHashMap<Long, Message<?>> getReceivedMessages() {
			return receivedMessages;
		}

		public int getInvocationCount() {
			return invocationCount;
		}

		public CountDownLatch getLatch() {
			return latch;
		}

	}

	static class Pojo {

		private String field;

		Pojo() {
			super();
		}

		Pojo(String field) {
			this.field = field;
		}

		public String getField() {
			return this.field;
		}

		public void setField(String field) {
			this.field = field;
		}

	}

}