RabbitMessageChannelBinderConfiguration.java
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.config;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.core.DeclarableCustomizer;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.support.postprocessor.DelegatingDecompressingPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.thread.Threading;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.VirtualThreadTaskExecutor;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.CollectionUtils;
/**
* Configuration class for RabbitMQ message channel binder.
*
* @author David Turanski
* @author Vinicius Carvalho
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Ben Blinebury
* @author Byungjun You
*/
@Configuration(proxyBeanMethods = false)
@Import({ PropertyPlaceholderAutoConfiguration.class })
@EnableConfigurationProperties({ RabbitBinderConfigurationProperties.class,
RabbitExtendedBindingProperties.class })
public class RabbitMessageChannelBinderConfiguration {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private RabbitProperties rabbitProperties;
@Autowired
private RabbitBinderConfigurationProperties rabbitBinderConfigurationProperties;
@Autowired
private RabbitExtendedBindingProperties rabbitExtendedBindingProperties;
@Bean
RabbitMessageChannelBinder rabbitMessageChannelBinder(
@Nullable List<ListenerContainerCustomizer<MessageListenerContainer>> listenerContainerCustomizers,
@Nullable MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer,
@Nullable List<ProducerMessageHandlerCustomizer<MessageHandler>> producerMessageHandlerCustomizers,
@Nullable ConsumerEndpointCustomizer<AmqpInboundChannelAdapter> consumerCustomizer,
List<DeclarableCustomizer> declarableCustomizers,
@Nullable ConnectionNameStrategy connectionNameStrategy) {
String connectionNamePrefix = this.rabbitBinderConfigurationProperties.getConnectionNamePrefix();
if (this.rabbitConnectionFactory instanceof AbstractConnectionFactory connectionFactory
&& connectionNamePrefix != null && connectionNameStrategy == null) {
final AtomicInteger nameIncrementer = new AtomicInteger();
connectionFactory.setConnectionNameStrategy(f -> connectionNamePrefix
+ "#" + nameIncrementer.getAndIncrement());
}
ListenerContainerCustomizer<MessageListenerContainer> composedCistomizer = new ListenerContainerCustomizer<>() {
@Override
public void configure(MessageListenerContainer container, String destinationName, String group) {
if (!CollectionUtils.isEmpty(listenerContainerCustomizers)) {
for (ListenerContainerCustomizer<MessageListenerContainer> customizer : listenerContainerCustomizers) {
customizer.configure(container, destinationName, group);
}
}
}
};
ProducerMessageHandlerCustomizer<MessageHandler> producerMessageHandlerCustomizer = new ProducerMessageHandlerCustomizer<>() {
@Override
public void configure(MessageHandler handler, String destinationName) {
if (!CollectionUtils.isEmpty(producerMessageHandlerCustomizers)) {
for (ProducerMessageHandlerCustomizer<MessageHandler> customizer : producerMessageHandlerCustomizers) {
customizer.configure(handler, destinationName);
}
}
}
};
RabbitMessageChannelBinder binder = new RabbitMessageChannelBinder(
this.rabbitConnectionFactory, this.rabbitProperties,
provisioningProvider(declarableCustomizers), composedCistomizer, sourceCustomizer);
binder.setAdminAddresses(
this.rabbitBinderConfigurationProperties.getAdminAddresses());
binder.setCompressingPostProcessor(gZipPostProcessor());
binder.setDecompressingPostProcessor(deCompressingPostProcessor());
binder.setNodes(this.rabbitBinderConfigurationProperties.getNodes());
binder.setExtendedBindingProperties(this.rabbitExtendedBindingProperties);
binder.setProducerMessageHandlerCustomizer(producerMessageHandlerCustomizer);
binder.setConsumerEndpointCustomizer(consumerCustomizer);
return binder;
}
@Bean
MessagePostProcessor deCompressingPostProcessor() {
return new DelegatingDecompressingPostProcessor();
}
@Bean
MessagePostProcessor gZipPostProcessor() {
GZipPostProcessor gZipPostProcessor = new GZipPostProcessor();
if (this.rabbitBinderConfigurationProperties.getCompressionLevel() != null) {
gZipPostProcessor.setLevel(this.rabbitBinderConfigurationProperties.getCompressionLevel());
}
return gZipPostProcessor;
}
@Bean
RabbitExchangeQueueProvisioner provisioningProvider(List<DeclarableCustomizer> customizers) {
return new RabbitExchangeQueueProvisioner(this.rabbitConnectionFactory, customizers);
}
@Bean
@ConditionalOnThreading(Threading.VIRTUAL)
ListenerContainerCustomizer<MessageListenerContainer> listenerContainerVirtualThreadExecutorCustomizer() {
return (container, destinationName, group) -> {
if (container instanceof AbstractMessageListenerContainer listenerContainer) {
listenerContainer.setTaskExecutor(new VirtualThreadTaskExecutor(destinationName + "-"));
}
};
}
}