RabbitMessageChannelBinder.java

/*
 * Copyright 2013-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.rabbit;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import io.micrometer.observation.ObservationRegistry;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Retry;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.ProducerType;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
 * A {@link org.springframework.cloud.stream.binder.Binder} implementation backed by RabbitMQ.
 *
 * @author Mark Fisher
 * @author Gary Russell
 * @author Jennifer Hickey
 * @author Gunnar Hillert
 * @author Ilayaperumal Gopinathan
 * @author David Turanski
 * @author Marius Bogoevici
 * @author Artem Bilan
 * @author Soby Chacko
 * @author Oleg Zhurakousky
 * @author Christian Tzolov
 * @author Byungjun You
 */
// @checkstyle:off
public class RabbitMessageChannelBinder extends
		AbstractMessageChannelBinder<ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>, RabbitExchangeQueueProvisioner>
		implements
		ExtendedPropertiesBinder<MessageChannel, RabbitConsumerProperties, RabbitProducerProperties>,
		DisposableBean {

	private static final SimplePassthroughMessageConverter passThoughConverter = new SimplePassthroughMessageConverter();

	private static final AmqpMessageHeaderErrorMessageStrategy errorMessageStrategy = new AmqpMessageHeaderErrorMessageStrategy();

	private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() {
		@Override
		public MessageProperties toMessageProperties(AMQP.BasicProperties source,
				Envelope envelope, String charset) {
			MessageProperties properties = super.toMessageProperties(source, envelope,
					charset);
			properties.setDeliveryMode(null);
			return properties;
		}
	};

	private static final Pattern interceptorNeededPattern = Pattern.compile("(payload|#root|#this)");

	// @checkstyle:on

	private final RabbitProperties rabbitProperties;

	private boolean destroyConnectionFactory;

	private ConnectionFactory connectionFactory;

	private MessagePostProcessor decompressingPostProcessor = new DelegatingDecompressingPostProcessor();

	private MessagePostProcessor compressingPostProcessor = new GZipPostProcessor();

	private volatile String[] adminAddresses;

	private volatile String[] nodes;

	private volatile boolean clustered;

	private RabbitExtendedBindingProperties extendedBindingProperties = new RabbitExtendedBindingProperties();

	public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
			RabbitProperties rabbitProperties,
			RabbitExchangeQueueProvisioner provisioningProvider) {

		this(connectionFactory, rabbitProperties, provisioningProvider, null, null);
	}

	public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
			RabbitProperties rabbitProperties,
			RabbitExchangeQueueProvisioner provisioningProvider,
			ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer) {

		this(connectionFactory, rabbitProperties, provisioningProvider, containerCustomizer, null);
	}

	public RabbitMessageChannelBinder(ConnectionFactory connectionFactory,
			RabbitProperties rabbitProperties,
			RabbitExchangeQueueProvisioner provisioningProvider,
			ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer,
			MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer) {

		super(new String[0], provisioningProvider, containerCustomizer, sourceCustomizer);
		Assert.notNull(connectionFactory, "connectionFactory must not be null");
		Assert.notNull(rabbitProperties, "rabbitProperties must not be null");
		this.connectionFactory = connectionFactory;
		this.rabbitProperties = rabbitProperties;
	}

	/**
	 * Set a {@link MessagePostProcessor} to decompress messages. Defaults to a {@link
	 * DelegatingDecompressingPostProcessor} with its default delegates.
	 *
	 * @param decompressingPostProcessor the post processor.
	 */
	public void setDecompressingPostProcessor(
			MessagePostProcessor decompressingPostProcessor) {
		this.decompressingPostProcessor = decompressingPostProcessor;
	}

	/**
	 * Set a {@link org.springframework.amqp.core.MessagePostProcessor} to compress messages. Defaults to a {@link
	 * org.springframework.amqp.support.postprocessor.GZipPostProcessor}.
	 *
	 * @param compressingPostProcessor the post processor.
	 */
	public void setCompressingPostProcessor(
			MessagePostProcessor compressingPostProcessor) {
		this.compressingPostProcessor = compressingPostProcessor;
	}

	public void setAdminAddresses(String[] adminAddresses) {
		this.adminAddresses = Arrays.copyOf(adminAddresses, adminAddresses.length);
	}

	public void setNodes(String[] nodes) {
		this.nodes = Arrays.copyOf(nodes, nodes.length);
		this.clustered = nodes.length > 1;
	}

	public void setExtendedBindingProperties(
			RabbitExtendedBindingProperties extendedBindingProperties) {
		this.extendedBindingProperties = extendedBindingProperties;
	}

	@Override
	public void onInit() throws Exception {
		super.onInit();
		if (this.clustered) {
			String[] addresses = StringUtils.commaDelimitedListToStringArray(
				StringUtils.collectionToDelimitedString(this.rabbitProperties.getAddresses(), ","));

			Assert.state(
					addresses.length == this.adminAddresses.length
							&& addresses.length == this.nodes.length,
					"'addresses', 'adminAddresses', and 'nodes' properties must have equal length");

			this.connectionFactory = new LocalizedQueueConnectionFactory(
					this.connectionFactory, addresses, this.adminAddresses, this.nodes,
					this.rabbitProperties.getVirtualHost(),
					this.rabbitProperties.getUsername(),
					this.rabbitProperties.getPassword(),
					this.rabbitProperties.getSsl().getEnabled(),
					this.rabbitProperties.getSsl().getKeyStore(),
					this.rabbitProperties.getSsl().getTrustStore(),
					this.rabbitProperties.getSsl().getKeyStorePassword(),
					this.rabbitProperties.getSsl().getTrustStorePassword());
			this.destroyConnectionFactory = true;
		}
	}

	@Override
	public void destroy() throws Exception {
		if (this.connectionFactory instanceof DisposableBean disposableConnectionFactory) {
			if (this.destroyConnectionFactory) {
				disposableConnectionFactory.destroy();
			}
		}
	}

	/**
	 * Get the underlying {@link ConnectionFactory} instance to allow manually altering the connection lifecycle.
	 */
	public ConnectionFactory getConnectionFactory() {
		return this.connectionFactory;
	}

	@Override
	public RabbitConsumerProperties getExtendedConsumerProperties(String channelName) {
		return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
	}

	@Override
	public RabbitProducerProperties getExtendedProducerProperties(String channelName) {
		return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
	}

	@Override
	public String getDefaultsPrefix() {
		return this.extendedBindingProperties.getDefaultsPrefix();
	}

	@Override
	public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
		return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
	}

	@Override
	public String getBinderIdentity() {
		return "rabbit-" + super.getBinderIdentity();
	}

	@Override
	protected MessageHandler createProducerMessageHandler(
			final ProducerDestination producerDestination,
			ExtendedProducerProperties<RabbitProducerProperties> producerProperties,
			MessageChannel errorChannel) {
		Assert.state(
				!HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()),
				"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
		String prefix = producerProperties.getExtension().getPrefix();
		String exchangeName = producerDestination.getName();
		String destination = !StringUtils.hasText(prefix) ? exchangeName
				: exchangeName.substring(prefix.length());
		RabbitProducerProperties extendedProperties = producerProperties.getExtension();
		final MessageHandler endpoint;
		if (!ProducerType.AMQP.equals(producerProperties.getExtension().getProducerType())) {
			endpoint = StreamUtils.createStreamMessageHandler(producerDestination, producerProperties, errorChannel,
					destination, extendedProperties, getApplicationContext(), this::configureHeaderMapper);
		}
		else {
			endpoint = amqpHandler(producerDestination, producerProperties, errorChannel,
					destination, extendedProperties);
		}
		return endpoint;
	}

	private AmqpOutboundEndpoint amqpHandler(final ProducerDestination producerDestination,
			ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel,
			String destination, RabbitProducerProperties extendedProperties) {
		final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
				buildRabbitTemplate(extendedProperties,
						errorChannel != null || extendedProperties.isUseConfirmHeader()));
		endpoint.setExchangeName(producerDestination.getName());
		boolean expressionInterceptorNeeded = expressionInterceptorNeeded(
				extendedProperties);
		Expression routingKeyExpression = extendedProperties.getRoutingKeyExpression();
		if (!producerProperties.isPartitioned()) {
			if (routingKeyExpression == null) {
				endpoint.setRoutingKey(destination);
			}
			else {
				if (expressionInterceptorNeeded) {
					endpoint.setRoutingKeyExpressionString("headers['"
							+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
							+ "']");
				}
				else {
					endpoint.setRoutingKeyExpression(routingKeyExpression);
				}
			}
		}
		else {
			if (routingKeyExpression == null) {
				endpoint.setRoutingKeyExpression(
						buildPartitionRoutingExpression(destination, false));
			}
			else {
				if (expressionInterceptorNeeded) {
					endpoint.setRoutingKeyExpression(
							buildPartitionRoutingExpression("headers['"
									+ RabbitExpressionEvaluatingInterceptor.ROUTING_KEY_HEADER
									+ "']", true));
				}
				else {
					endpoint.setRoutingKeyExpression(buildPartitionRoutingExpression(
							routingKeyExpression.getExpressionString(), true));
				}
			}
		}
		if (extendedProperties.getDelayExpression() != null) {
			if (expressionInterceptorNeeded) {
				endpoint.setDelayExpressionString("headers['"
						+ RabbitExpressionEvaluatingInterceptor.DELAY_HEADER + "']");
			}
			else {
				endpoint.setDelayExpression(extendedProperties.getDelayExpression());
			}
		}
		endpoint.setHeaderMapper(configureHeaderMapper(extendedProperties));
		endpoint.setDefaultDeliveryMode(extendedProperties.getDeliveryMode());
		endpoint.setBeanFactory(this.getBeanFactory());
		if (errorChannel != null) {
			checkConnectionFactoryIsErrorCapable();
			endpoint.setReturnChannel(errorChannel);
			if (!extendedProperties.isUseConfirmHeader()) {
				endpoint.setConfirmNackChannel(errorChannel);
				String ackChannelBeanName = StringUtils
						.hasText(extendedProperties.getConfirmAckChannel())
						? extendedProperties.getConfirmAckChannel()
						: IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME;
				if (!ackChannelBeanName.equals(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
						&& !getApplicationContext().containsBean(ackChannelBeanName)) {
					GenericApplicationContext context = (GenericApplicationContext) getApplicationContext();
					context.registerBean(ackChannelBeanName, DirectChannel.class,
							() -> new DirectChannel());
				}
				endpoint.setConfirmAckChannelName(ackChannelBeanName);
				endpoint.setConfirmCorrelationExpressionString("#root");
			}
			else {
				Assert.state(!StringUtils.hasText(extendedProperties.getConfirmAckChannel()),
						"You cannot specify a 'confirmAckChannel' when 'useConfirmHeader' is true");
			}
			endpoint.setErrorMessageStrategy(new DefaultErrorMessageStrategy());
		}
		endpoint.setHeadersMappedLast(true);
		return endpoint;
	}

	private AmqpHeaderMapper configureHeaderMapper(RabbitProducerProperties extendedProperties) {
		DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.outboundMapper();
		List<String> headerPatterns = new ArrayList<>(extendedProperties.getHeaderPatterns().length + 3);
		if (!extendedProperties.isSuperStream()) {
			// need to keep this header until later
			headerPatterns.add("!" + BinderHeaders.PARTITION_HEADER);
		}
		headerPatterns.add("!" + IntegrationMessageHeaderAccessor.SOURCE_DATA);
		headerPatterns.add("!" + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
		headerPatterns.add("!rabbitmq_streamContext");
		headerPatterns.addAll(Arrays.asList(extendedProperties.getHeaderPatterns()));
		mapper.setRequestHeaderNames(
				headerPatterns.toArray(new String[headerPatterns.size()]));
		return mapper;
	}

	@Override
	protected void postProcessOutputChannel(MessageChannel outputChannel,
			ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {
		RabbitProducerProperties extendedProperties = producerProperties.getExtension();
		if (expressionInterceptorNeeded(extendedProperties)) {
			((AbstractMessageChannel) outputChannel).addInterceptor(0,
					new RabbitExpressionEvaluatingInterceptor(
							extendedProperties.getRoutingKeyExpression(),
							extendedProperties.getDelayExpression(),
							getEvaluationContext()));
		}
	}

	private boolean expressionInterceptorNeeded(RabbitProducerProperties extendedProperties) {
		Expression rkExpression = extendedProperties.getRoutingKeyExpression();
		Expression delayExpression = extendedProperties.getDelayExpression();
		return (rkExpression != null && interceptorNeededPattern.matcher(rkExpression.getExpressionString()).find())
				|| (delayExpression != null
				&& interceptorNeededPattern.matcher(delayExpression.getExpressionString()).find());
	}

	private void checkConnectionFactoryIsErrorCapable() {
		if (!(this.connectionFactory instanceof CachingConnectionFactory)) {
			logger.warn(
					"Unknown connection factory type, cannot determine error capabilities: "
							+ this.connectionFactory.getClass());
		}
		else {
			CachingConnectionFactory ccf = (CachingConnectionFactory) this.connectionFactory;
			if (!ccf.isPublisherConfirms() && !ccf.isPublisherReturns()) {
				logger.warn(
						"Producer error channel is enabled, but the connection factory is not configured for "
								+ "returns or confirms; the error channel will receive no messages");
			}
			else if (!ccf.isPublisherConfirms()) {
				logger.info(
						"Producer error channel is enabled, but the connection factory is only configured to "
								+ "handle returned messages; negative acks will not be reported");
			}
			else if (!ccf.isPublisherReturns()) {
				logger.info(
						"Producer error channel is enabled, but the connection factory is only configured to "
								+ "handle negatively acked messages; returned messages will not be reported");
			}
		}
	}

	private Expression buildPartitionRoutingExpression(String expressionRoot,
			boolean rootIsExpression) {
		String partitionRoutingExpression = rootIsExpression
				? expressionRoot + " + '-' + headers['" + BinderHeaders.PARTITION_HEADER
				+ "']"
				: "'" + expressionRoot + "-' + headers['" + BinderHeaders.PARTITION_HEADER
				+ "']";
		return new SpelExpressionParser().parseExpression(partitionRoutingExpression);
	}

	@Override
	protected MessageProducer createConsumerEndpoint(
			ConsumerDestination consumerDestination, String group,
			ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
		Assert.state(!HeaderMode.embeddedHeaders.equals(properties.getHeaderMode()),
				"the RabbitMQ binder does not support embedded headers since RabbitMQ supports headers natively");
		String destination = consumerDestination.getName();
		RabbitConsumerProperties extension = properties.getExtension();
		ObservableListenerContainer listenerContainer = createAndConfigureContainer(consumerDestination, group,
				properties, destination, extension);
		String[] queues = StringUtils.tokenizeToStringArray(destination, ",", true, true);
		if (properties.getExtension().getContainerType() != ContainerType.STREAM
				|| !properties.getExtension().isSuperStream()) {
			listenerContainer.setQueueNames(queues);
		}
		getContainerCustomizer().configure(listenerContainer,
				consumerDestination.getName(), group);
		// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
		getApplicationContext().getBeanProvider(ObservationRegistry.class)
				.ifAvailable((observationRegistry) -> listenerContainer.setObservationEnabled(true));

		listenerContainer.afterPropertiesSet();

		AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
		adapter.setBindSourceMessage(true);
		adapter.setBeanFactory(this.getBeanFactory());
		adapter.setBeanName("inbound." + destination);
		DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
		mapper.setRequestHeaderNames(extension.getHeaderPatterns());
		adapter.setHeaderMapper(mapper);
		ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(
				consumerDestination, group, properties);
		if (properties.getMaxAttempts() > 1) {
			adapter.setRetryTemplate(buildRetryTemplate(properties));
			adapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
		}
		else {
			adapter.setErrorMessageStrategy(errorMessageStrategy);
			adapter.setErrorChannel(errorInfrastructure.getErrorChannel());
		}
		adapter.setMessageConverter(passThoughConverter);
		ContainerType containerType = extension.getContainerType();
		if (properties.isBatchMode() && extension.isEnableBatching()
				&& ContainerType.SIMPLE.equals(containerType)) {
			adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
		}
		if (containerType.equals(ContainerType.STREAM)) {
			StreamUtils.configureAdapter(adapter);
		}
		return adapter;
	}

	private ObservableListenerContainer createAndConfigureContainer(ConsumerDestination consumerDestination,
			String group, ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
			RabbitConsumerProperties extension) {

		if (extension.getContainerType().equals(ContainerType.STREAM)) {
			return StreamUtils.createContainer(consumerDestination, group, properties, destination,
					getApplicationContext());
		}
		boolean directContainer = extension.getContainerType()
				.equals(ContainerType.DIRECT);
		AbstractMessageListenerContainer listenerContainer = directContainer
				? new DirectMessageListenerContainer(this.connectionFactory)
				: new SimpleMessageListenerContainer(this.connectionFactory);
		listenerContainer.setBeanName(consumerDestination.getName() + "." + group + ".container");
		listenerContainer
				.setAcknowledgeMode(extension.getAcknowledgeMode());
		listenerContainer.setChannelTransacted(extension.isTransacted());
		listenerContainer
				.setDefaultRequeueRejected(extension.isRequeueRejected());
		int concurrency = properties.getConcurrency();
		concurrency = concurrency > 0 ? concurrency : 1;
		if (directContainer) {
			setDMLCProperties(properties,
					(DirectMessageListenerContainer) listenerContainer, concurrency);
		}
		else {
			setSMLCProperties(properties,
					(SimpleMessageListenerContainer) listenerContainer, concurrency);
		}
		listenerContainer.setPrefetchCount(extension.getPrefetch());
		listenerContainer
				.setRecoveryInterval(extension.getRecoveryInterval());
		listenerContainer.setTaskExecutor(
				new SimpleAsyncTaskExecutor(consumerDestination.getName() + "-"));
		listenerContainer.setAfterReceivePostProcessors(this.decompressingPostProcessor);
		listenerContainer.setMessagePropertiesConverter(
				RabbitMessageChannelBinder.inboundMessagePropertiesConverter);
		listenerContainer.setExclusive(extension.isExclusive());
		listenerContainer
				.setMissingQueuesFatal(extension.getMissingQueuesFatal());
		if (extension.getFailedDeclarationRetryInterval() != null) {
			listenerContainer.setFailedDeclarationRetryInterval(
					extension.getFailedDeclarationRetryInterval());
		}
		if (getApplicationEventPublisher() != null) {
			listenerContainer
					.setApplicationEventPublisher(getApplicationEventPublisher());
		}
		else if (getApplicationContext() != null) {
			listenerContainer.setApplicationEventPublisher(getApplicationContext());
		}
		if (StringUtils.hasText(extension.getConsumerTagPrefix())) {
			final AtomicInteger index = new AtomicInteger();
			listenerContainer.setConsumerTagStrategy(
					q -> extension.getConsumerTagPrefix() + "#"
							+ index.getAndIncrement());
		}
		// Set consumer priority if configured
		if (extension.getConsumerPriority() >= 0) {
			Map<String, Object> consumerArgs = new HashMap<>();
			consumerArgs.put("x-priority", extension.getConsumerPriority());
			listenerContainer.setConsumerArguments(consumerArgs);
		}
		listenerContainer.setApplicationContext(getApplicationContext());
		return listenerContainer;
	}

	private void setSMLCProperties(
			ExtendedConsumerProperties<RabbitConsumerProperties> properties,
			SimpleMessageListenerContainer listenerContainer, int concurrency) {

		RabbitConsumerProperties extension = properties.getExtension();
		listenerContainer.setConcurrentConsumers(concurrency);
		int maxConcurrency = extension.getMaxConcurrency();
		if (maxConcurrency > concurrency) {
			listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
		}
		listenerContainer.setDeBatchingEnabled(!properties.isBatchMode());
		listenerContainer.setBatchSize(extension.getBatchSize());
		if (extension.getQueueDeclarationRetries() != null) {
			listenerContainer.setDeclarationRetries(
					extension.getQueueDeclarationRetries());
		}
		if (properties.isBatchMode() && extension.isEnableBatching()) {
			listenerContainer.setConsumerBatchEnabled(true);
			listenerContainer.setDeBatchingEnabled(true);
		}
		if (extension.getReceiveTimeout() != null) {
			listenerContainer.setReceiveTimeout(extension.getReceiveTimeout());
		}
	}

	private void setDMLCProperties(
			ExtendedConsumerProperties<RabbitConsumerProperties> properties,
			DirectMessageListenerContainer listenerContainer, int concurrency) {

		listenerContainer.setConsumersPerQueue(concurrency);
		if (properties.getExtension().getMaxConcurrency() > concurrency) {
			this.logger
					.warn("maxConcurrency is not supported by the direct container type");
		}
		if (properties.getExtension().getBatchSize() > 1) {
			this.logger.warn("batchSize is not supported by the direct container type");
		}
		if (properties.getExtension().getQueueDeclarationRetries() != null) {
			this.logger.warn(
					"queueDeclarationRetries is not supported by the direct container type");
		}
	}

	@Override
	protected PolledConsumerResources createPolledConsumerResources(String name,
			String group, ConsumerDestination destination,
			ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {

		Assert.isTrue(!consumerProperties.isMultiplex(),
				"The Spring Integration polled MessageSource does not currently support muiltiple queues");
		AmqpMessageSource source = new AmqpMessageSource(this.connectionFactory,
				destination.getName());
		source.setRawMessageHeader(true);
		getMessageSourceCustomizer().configure(source, destination.getName(), group);
		return new PolledConsumerResources(source, registerErrorInfrastructure(
				destination, group, consumerProperties, true));
	}

	@Override
	protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget) {
		bindingTarget.setAttributesProvider((accessor, message) -> {
			Object rawMessage = message.getHeaders()
					.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
			if (rawMessage != null) {
				accessor.setAttribute(
						AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE,
						rawMessage);
			}
		});
	}

	@Override
	protected ErrorMessageStrategy getErrorMessageStrategy() {
		return errorMessageStrategy;
	}

	@Override
	protected MessageHandler getErrorMessageHandler(ConsumerDestination destination,
			String group,
			final ExtendedConsumerProperties<RabbitConsumerProperties> properties) {

		if (properties.getExtension().isRepublishToDlq()) {
			return new MessageHandler() {

				private static final long ACK_TIMEOUT = 10_000;

				private final RabbitTemplate template = new RabbitTemplate(
						RabbitMessageChannelBinder.this.connectionFactory);

				private final ConfirmType confirmType;



				{
					this.template.setUsePublisherConnection(true);
					this.template.setChannelTransacted(properties.getExtension().isTransacted());
					this.template.setMandatory(RabbitMessageChannelBinder.this.connectionFactory.isPublisherReturns());
					if (RabbitMessageChannelBinder.this.connectionFactory.isSimplePublisherConfirms()) {
						this.confirmType = ConfirmType.SIMPLE;
					}
					else if (RabbitMessageChannelBinder.this.connectionFactory.isPublisherConfirms()) {
						this.confirmType = ConfirmType.CORRELATED;
					}
					else {
						this.confirmType = ConfirmType.NONE;
					}
				}

				private final String exchange = deadLetterExchangeName(properties.getExtension());

				private final String routingKey = properties.getExtension()
						.getDeadLetterRoutingKey();

				private final int frameMaxHeadroom = properties.getExtension()
						.getFrameMaxHeadroom();

				private int maxStackTraceLength = -1;

				private Boolean dlxPresent;

				@Override
				public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
					List<Message> amqpMessages = extractAmqpMessages(message, properties);

					if (!(message instanceof ErrorMessage)) {
						logger.error("Expected an ErrorMessage, not a "
								+ message.getClass().toString() + " for: " + message);
					}
					else if (amqpMessages == null) {
						logger.error("No raw message header in " + message);
					}
					else {
						if (!checkDlx()) {
							return;
						}
						Throwable cause = (Throwable) message.getPayload();
						if (!shouldRepublish(cause)) {
							if (logger.isDebugEnabled()) {
								logger.debug("Skipping republish of: " + message);
							}
							return;
						}

						String stackTraceAsString = getStackTraceAsString(cause);
						if (this.maxStackTraceLength < 0) {
							int rabbitMaxStackTraceLength = RabbitUtils
									.getMaxFrame(this.template.getConnectionFactory());
							if (rabbitMaxStackTraceLength > 0) {
								// maxStackTraceLength -= this.frameMaxHeadroom;
								this.maxStackTraceLength = rabbitMaxStackTraceLength
										- this.frameMaxHeadroom;
							}
						}
						if (this.maxStackTraceLength > 0 && stackTraceAsString
								.length() > this.maxStackTraceLength) {
							stackTraceAsString = stackTraceAsString.substring(0,
									this.maxStackTraceLength);
							logger.warn(
									"Stack trace in republished message header truncated due to frame_max limitations; "
											+ "consider increasing frame_max on the broker or reduce the stack trace depth",
									cause);
						}
						for (Message amqpMessage : amqpMessages) {
							MessageProperties messageProperties = adjustMessagePropertiesHeader(cause,
									stackTraceAsString,
									amqpMessage);
							doSend(this.exchange,
									this.routingKey != null ? this.routingKey
											: messageProperties.getConsumerQueue(),
									amqpMessage);
						}
						if (properties.getExtension().getAcknowledgeMode().equals(AcknowledgeMode.MANUAL)) {
							org.springframework.messaging.Message<?> original =
									((ErrorMessage) message).getOriginalMessage();
							if (original != null) {
								// If we are using manual acks, ack the original message.
								try {
									original.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
											.basicAck(original.getHeaders()
													.get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
								}
								catch (IOException e) {
									logger.debug("Failed to ack original message", e);
								}
							}
						}
					}
				}

				//@NotNull
				private MessageProperties adjustMessagePropertiesHeader(Throwable cause, String stackTraceAsString, Message amqpMessage) {
					MessageProperties messageProperties = amqpMessage
							.getMessageProperties();
					Map<String, Object> headers = messageProperties.getHeaders();
					headers.put(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE,
							stackTraceAsString);
					headers.put(RepublishMessageRecoverer.X_EXCEPTION_MESSAGE,
							cause.getCause() != null ? cause.getCause().getMessage()
									: cause.getMessage());
					headers.put(RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE,
							messageProperties.getReceivedExchange());
					headers.put(RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY,
							messageProperties.getReceivedRoutingKey());
					if (properties.getExtension().getRepublishDeliveyMode() != null) {
						messageProperties.setDeliveryMode(
								properties.getExtension().getRepublishDeliveyMode());
					}
					messageProperties.incrementRetryCount();
					return messageProperties;
				}

				private void doSend(String exchange, String routingKey, Message amqpMessage) {
					if (ConfirmType.SIMPLE.equals(this.confirmType)) {
						this.template.invoke(temp -> {
							temp.send(exchange, routingKey, amqpMessage);
							if (!temp.waitForConfirms(ACK_TIMEOUT)) {
								throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
							}
							return null;
						});
					}
					else if (ConfirmType.CORRELATED.equals(this.confirmType)) {
						CorrelationData corr = new CorrelationData();
						this.template.send(exchange, routingKey, amqpMessage, corr);
						try {
							Confirm confirm = corr.getFuture().get(ACK_TIMEOUT, TimeUnit.MILLISECONDS);
							if (!confirm.isAck()) {
								throw new AmqpRejectAndDontRequeueException("Negative ack for DLQ message received");
							}
						}
						catch (InterruptedException e) {
							Thread.currentThread().interrupt();
							throw new AmqpRejectAndDontRequeueException(e);
						}
						catch (ExecutionException e) {
							throw new AmqpRejectAndDontRequeueException(e.getCause());
						}
						catch (TimeoutException e) {
							throw new AmqpRejectAndDontRequeueException(e);
						}
						if (corr.getReturned() != null) {
							RabbitMessageChannelBinder.this.logger.error("DLQ message was returned: " + amqpMessage);
							throw new AmqpRejectAndDontRequeueException("DLQ message was returned");
						}
					}
					else {
						this.template.send(exchange, routingKey, amqpMessage);
					}
				}

				private boolean checkDlx() {
					if (this.dlxPresent == null) {
						if (properties.getExtension().isAutoBindDlq()) {
							this.dlxPresent = Boolean.TRUE;
						}
						else {
							this.dlxPresent = this.template.execute(channel -> {
								String dlx = deadLetterExchangeName(properties.getExtension());
								try {
									channel.exchangeDeclarePassive(dlx);
									return Boolean.TRUE;
								}
								catch (IOException e) {
									logger.warn("'republishToDlq' is true, but the '"
											+ dlx
											+ "' dead letter exchange is not present; disabling 'republishToDlq'");
									return Boolean.FALSE;
								}
							});
						}
					}
					return this.dlxPresent;
				}

				/**
				 * Traverse the cause tree, stopping at AmqpRejectAndDontRequeueException
				 * or ImmediateAcknowledgeAmqpException.
				 * @param throwable the throwable.
				 * @return true if neither found or AmqpRejectAndDontRequeueException is
				 * found first.
				 */
				private boolean shouldRepublish(Throwable throwable) {
					Throwable cause = throwable;
					while (cause != null
							&& !(cause instanceof AmqpRejectAndDontRequeueException)
							&& !(cause instanceof ImmediateAcknowledgeAmqpException)) {
						cause = cause.getCause();
					}
					return !(cause instanceof ImmediateAcknowledgeAmqpException);
				}
			};
		}
		else if (properties.getMaxAttempts() > 1) {
			return new MessageHandler() {
				private final BatchCapableRejectAndDontRequeueRecoverer recoverer = new BatchCapableRejectAndDontRequeueRecoverer();

				@Override
				public void handleMessage(
						org.springframework.messaging.Message<?> message)
						throws MessagingException {
					List<Message> amqpMessages = extractAmqpMessages(message, properties);
					/*
					 * NOTE: The following IF and subsequent ELSE IF should never happen
					 * under normal interaction and it should always go to the last ELSE
					 * However, given that this is a handler subscribing to the public
					 * channel and that we can't control what type of Message may be sent
					 * to that channel (user decides to send a Message manually) the
					 * 'IF/ELSE IF' provides a safety net to handle any message properly.
					 */
					if (!(message instanceof ErrorMessage)) {
						logger.error("Expected an ErrorMessage, not a "
								+ message.getClass().toString() + " for: " + message);
						throw new ListenerExecutionFailedException(
								"Unexpected error message " + message,
								new AmqpRejectAndDontRequeueException(""), (Message[]) null);
					}
					else if (amqpMessages == null || amqpMessages.isEmpty()) {
						logger.error("No raw message header in " + message);
						throw new ListenerExecutionFailedException(
								"Unexpected error message " + message,
								new AmqpRejectAndDontRequeueException(""), amqpMessages.toArray(Message[]::new));
					}
					else {
						this.recoverer.recover(amqpMessages,
								(Throwable) message.getPayload());
					}
				}

			};
		}
		else {
			return super.getErrorMessageHandler(destination, group, properties);
		}
	}

	private List<Message> extractAmqpMessages(org.springframework.messaging.Message<?> message, ExtendedConsumerProperties<RabbitConsumerProperties> properties) {

		if (properties.isBatchMode() || properties.getExtension().isEnableBatching()) {
			logger.debug("Batch mode enabled: Extract list instead of single message");
			return StaticMessageHeaderAccessor.getSourceData(message);
		}
		else {
			Message amqpMessage = StaticMessageHeaderAccessor.getSourceData(message);
			return List.of(amqpMessage);
		}
	}

	@Override
	protected MessageHandler getPolledConsumerErrorMessageHandler(
			ConsumerDestination destination, String group,
			ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
		MessageHandler handler = getErrorMessageHandler(destination, group, properties);
		if (handler != null) {
			return handler;
		}
		final MessageHandler superHandler = super.getErrorMessageHandler(destination,
				group, properties);
		return message -> {
			Message amqpMessage = (Message) message.getHeaders()
					.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
			if (!(message instanceof ErrorMessage)) {
				logger.error("Expected an ErrorMessage, not a "
						+ message.getClass().toString() + " for: " + message);
			}
			else if (amqpMessage == null) {
				if (superHandler != null) {
					superHandler.handleMessage(message);
				}
			}
			else {
				if (message.getPayload() instanceof MessagingException messagingException) {
					AcknowledgmentCallback ack = StaticMessageHeaderAccessor
							.getAcknowledgmentCallback(messagingException.getFailedMessage());
					if (ack != null) {
						if (properties.getExtension().isRequeueRejected()) {
							ack.acknowledge(Status.REQUEUE);
						}
						else {
							ack.acknowledge(Status.REJECT);
						}
					}
				}
			}
		};
	}

	//	@Override
	//	protected String errorsBaseName(ConsumerDestination destination, String group,
	//			ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {
	//		return destination.getName() + ".errors";
	//	}

	private String deadLetterExchangeName(RabbitCommonProperties properties) {
		if (properties.getDeadLetterExchange() == null) {
			return applyPrefix(properties.getPrefix(),
					RabbitCommonProperties.DEAD_LETTER_EXCHANGE);
		}
		else {
			return properties.getDeadLetterExchange();
		}
	}

	@Override
	protected void afterUnbindConsumer(ConsumerDestination consumerDestination, String group,
			ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {

		this.provisioningProvider.cleanAutoDeclareContext(consumerDestination,
				consumerProperties);
	}

	@Override
	protected void afterUnbindProducer(ProducerDestination destination,
			ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {

		this.provisioningProvider.cleanAutoDeclareContext(destination, producerProperties);
	}

	private RabbitTemplate buildRabbitTemplate(RabbitProducerProperties properties, boolean mandatory) {
		RabbitTemplate rabbitTemplate;
		if (properties.isBatchingEnabled()) {
			BatchingStrategy batchingStrategy = getBatchingStrategy(properties);
			TaskScheduler taskScheduler = getApplicationContext()
					.getBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
			rabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
		}
		else {
			rabbitTemplate = new RabbitTemplate();
		}
		rabbitTemplate.setMessageConverter(passThoughConverter);
		rabbitTemplate.setChannelTransacted(properties.isTransacted());
		rabbitTemplate.setConnectionFactory(this.connectionFactory);
		rabbitTemplate.setUsePublisherConnection(true);
		if (properties.isCompress()) {
			rabbitTemplate.setBeforePublishPostProcessors(this.compressingPostProcessor);
		}
		rabbitTemplate.setMandatory(mandatory); // returned messages
		if (rabbitProperties != null
				&& rabbitProperties.getTemplate().getRetry().isEnabled()) {
			Retry retry = rabbitProperties.getTemplate().getRetry();
			RetryPolicy retryPolicy = RetryPolicy.builder()
				.maxRetries(retry.getMaxRetries())
				.delay(retry.getInitialInterval())
				.multiplier(retry.getMultiplier())
				.maxDelay(retry.getMaxInterval())
				.build();
			RetryTemplate retryTemplate = new RetryTemplate();
			retryTemplate.setRetryPolicy(retryPolicy);
			rabbitTemplate.setRetryTemplate(retryTemplate);
		}
		// TODO until https://github.com/spring-cloud/spring-cloud-stream/issues/2902
		AbstractApplicationContext applicationContext = getApplicationContext();
		applicationContext.getBeanProvider(ObservationRegistry.class)
				.ifAvailable((observationRegistry) -> rabbitTemplate.setObservationEnabled(true));
		rabbitTemplate.setApplicationContext(applicationContext);
		rabbitTemplate.afterPropertiesSet();
		return rabbitTemplate;
	}

	private BatchingStrategy getBatchingStrategy(RabbitProducerProperties properties) {
		BatchingStrategy batchingStrategy;
		if (properties.getBatchingStrategyBeanName() != null) {
			batchingStrategy = getApplicationContext()
					.getBean(properties.getBatchingStrategyBeanName(), BatchingStrategy.class);
		}
		else {
			batchingStrategy = new SimpleBatchingStrategy(
					properties.getBatchSize(),
					properties.getBatchBufferLimit(),
					properties.getBatchTimeout()
			);
		}
		return batchingStrategy;
	}

	private String getStackTraceAsString(Throwable cause) {
		StringWriter stringWriter = new StringWriter();
		PrintWriter printWriter = new PrintWriter(stringWriter, true);
		cause.printStackTrace(printWriter);
		return stringWriter.getBuffer().toString();
	}

	private static final class SimplePassthroughMessageConverter
			extends AbstractMessageConverter {

		private static final SimpleMessageConverter converter = new SimpleMessageConverter();

		SimplePassthroughMessageConverter() {
			super();
		}

		@Override
		protected Message createMessage(Object object,
				MessageProperties messageProperties) {
			if (object instanceof byte[] bytes) {
				return new Message(bytes, messageProperties);
			}
			else {
				// just for safety (backwards compatibility)
				return converter.toMessage(object, messageProperties);
			}
		}

		@Override
		public Object fromMessage(Message message) throws MessageConversionException {
			return message.getBody();
		}

	}

}