RabbitExchangeQueueProvisioner.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.provisioning;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.DeclarableCustomizer;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.DeclarationExceptionEvent;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitCommonProperties.QuorumConfig;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.AlternateExchange;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.config.SuperStream;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* AMQP implementation for {@link ProvisioningProvider}.
*
* @author Soby Chacko
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Michael Michailidis
* @author Byungjun You
* @author Omer Celik
*/
// @checkstyle:off
public class RabbitExchangeQueueProvisioner
implements ApplicationListener<DeclarationExceptionEvent>,
ProvisioningProvider<ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>> {
// @checkstyle:on
/**
* The delimiter between a group and index when constructing a binder
* consumer/producer.
*/
private static final String GROUP_INDEX_DELIMITER = ".";
protected final Log logger = LogFactory.getLog(getClass());
private final RabbitAdmin rabbitAdmin;
private boolean notOurAdminException;
private final GenericApplicationContext autoDeclareContext = new GenericApplicationContext();
private final List<DeclarableCustomizer> customizers;
private final AtomicInteger producerExchangeBeanNameQualifier = new AtomicInteger();
private final ReentrantLock autoDeclareContextLock = new ReentrantLock();
public RabbitExchangeQueueProvisioner(ConnectionFactory connectionFactory) {
this(connectionFactory, Collections.emptyList());
}
public RabbitExchangeQueueProvisioner(ConnectionFactory connectionFactory,
List<DeclarableCustomizer> customizers) {
this.rabbitAdmin = new RabbitAdmin(connectionFactory);
this.autoDeclareContext.refresh();
this.rabbitAdmin.setApplicationContext(this.autoDeclareContext);
this.rabbitAdmin.afterPropertiesSet();
this.customizers = customizers;
}
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {
final String exchangeName = applyPrefix(
producerProperties.getExtension().getPrefix(), name);
String beanNameQualifier = "prod" + this.producerExchangeBeanNameQualifier.incrementAndGet();
Exchange exchange = buildExchange(producerProperties.getExtension(),
exchangeName, producerProperties.getExtension().getAlternateExchange(), beanNameQualifier);
if (producerProperties.getExtension().isDeclareExchange()) {
declareExchange(exchangeName, beanNameQualifier, exchange);
}
Binding binding = null;
for (String requiredGroupName : producerProperties.getRequiredGroups()) {
String baseQueueName = producerProperties.getExtension()
.isQueueNameGroupOnly() ? requiredGroupName
: (exchangeName + "." + requiredGroupName);
if (!producerProperties.isPartitioned()) {
autoBindDLQ(baseQueueName, baseQueueName, requiredGroupName,
producerProperties.getExtension());
if (producerProperties.getExtension().isBindQueue()) {
Queue queue = new Queue(baseQueueName, true, false, false, queueArgs(
baseQueueName, producerProperties.getExtension(), false));
declareQueue(baseQueueName, queue);
String[] routingKeys = bindingRoutingKeys(producerProperties.getExtension());
if (ObjectUtils.isEmpty(routingKeys)) {
binding = notPartitionedBinding(exchange, queue, null, producerProperties.getExtension());
}
else {
for (String routingKey : routingKeys) {
binding = notPartitionedBinding(exchange, queue, routingKey,
producerProperties.getExtension());
}
}
}
}
else {
// if the stream is partitioned, create one queue for each target
// partition for the default group
for (int i = 0; i < producerProperties.getPartitionCount(); i++) {
String partitionSuffix = "-" + i;
String partitionQueueName = baseQueueName + partitionSuffix;
autoBindDLQ(baseQueueName, baseQueueName + partitionSuffix, requiredGroupName,
producerProperties.getExtension());
if (producerProperties.getExtension().isBindQueue()) {
Queue queue = new Queue(partitionQueueName, true, false, false,
queueArgs(partitionQueueName,
producerProperties.getExtension(), false));
declareQueue(queue.getName(), queue);
String prefix = producerProperties.getExtension().getPrefix();
String destination = !StringUtils.hasText(prefix) ? exchangeName
: exchangeName.substring(prefix.length());
String[] routingKeys = bindingRoutingKeys(producerProperties.getExtension());
if (ObjectUtils.isEmpty(routingKeys)) {
binding = partitionedBinding(destination, exchange, queue, null,
producerProperties.getExtension(), i);
}
else {
for (String routingKey : routingKeys) {
binding = partitionedBinding(destination, exchange, queue, routingKey,
producerProperties.getExtension(), i);
}
}
}
}
}
}
return new RabbitProducerDestination(exchange, binding, beanNameQualifier);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
ConsumerDestination consumerDestination;
if (!properties.isMultiplex()) {
consumerDestination = doProvisionConsumerDestination(name, group, properties);
}
else {
String[] provisionedDestinations = Stream
.of(StringUtils.tokenizeToStringArray(name, ",", true, true))
.flatMap(destination -> {
if (properties.isPartitioned() && !ObjectUtils.isEmpty(properties.getInstanceIndexList())) {
List<String> consumerDestinationNames = new ArrayList<>();
for (Integer index : properties.getInstanceIndexList()) {
ExtendedConsumerProperties<RabbitConsumerProperties> temporaryProperties =
new ExtendedConsumerProperties<>(properties.getExtension());
BeanUtils.copyProperties(properties, temporaryProperties);
temporaryProperties.setInstanceIndex(index);
consumerDestinationNames.add(doProvisionConsumerDestination(destination, group,
temporaryProperties).getName());
}
return consumerDestinationNames.stream();
}
else {
return Stream.of(doProvisionConsumerDestination(destination, group,
properties).getName());
}
})
.toArray(String[]::new);
consumerDestination = new RabbitConsumerDestination(
StringUtils.arrayToCommaDelimitedString(provisionedDestinations),
null, group, name);
}
return consumerDestination;
}
private ConsumerDestination doProvisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
boolean anonymous = !StringUtils.hasText(group);
String anonymousGroup = null;
if (anonymous) {
anonymousGroup = new Base64UrlNamingStrategy(
properties.getExtension().getAnonymousGroupPrefix() == null
? ""
: properties.getExtension().getAnonymousGroupPrefix()).generateName();
}
String baseQueueName;
if (properties.getExtension().isQueueNameGroupOnly()) {
baseQueueName = anonymous ? anonymousGroup : group;
}
else {
baseQueueName = groupedName(name, anonymous ? anonymousGroup : group);
}
if (this.logger.isInfoEnabled()) {
this.logger.info("declaring queue for inbound: " + baseQueueName
+ ", bound to: " + name);
}
String prefix = properties.getExtension().getPrefix();
String exchangeName = applyPrefix(prefix, name);
ContainerType containerType = properties.getExtension().getContainerType();
boolean superStream = containerType.equals(ContainerType.STREAM) && properties.getExtension().isSuperStream();
Exchange exchange = buildExchange(properties.getExtension(), exchangeName, null, null);
if (!superStream && properties.getExtension().isDeclareExchange()) {
declareExchange(exchangeName, anonymous ? anonymousGroup : group, exchange);
}
String queueName = applyPrefix(prefix, baseQueueName);
boolean partitioned = !anonymous && properties.isPartitioned();
boolean durable = !anonymous && properties.getExtension().isDurableSubscription();
Queue queue;
if (anonymous) {
String anonQueueName = queueName;
queue = new AnonymousQueue((org.springframework.amqp.core.NamingStrategy) () -> anonQueueName,
queueArgs(queueName, properties.getExtension(), false));
queueName = queue.getName();
}
else {
if (partitioned) {
String partitionSuffix = "-" + properties.getInstanceIndex();
queueName += partitionSuffix;
}
if (durable) {
queue = new Queue(queueName, true, false, false,
queueArgs(queueName, properties.getExtension(), false));
}
else {
queue = new Queue(queueName, false, false, true,
queueArgs(queueName, properties.getExtension(), false));
}
}
Binding binding = null;
if (properties.getExtension().isBindQueue()) {
if (superStream) {
provisionSuperStream(properties, name);
}
else {
if (containerType.equals(ContainerType.STREAM)) {
queue.getArguments().put("x-queue-type", "stream");
}
declareQueue(queueName, queue);
String[] routingKeys = bindingRoutingKeys(properties.getExtension());
if (ObjectUtils.isEmpty(routingKeys)) {
binding = declareConsumerBindings(name, null, properties, exchange, partitioned, queue);
}
else {
for (String routingKey : routingKeys) {
binding = declareConsumerBindings(name, routingKey, properties, exchange, partitioned, queue);
}
}
}
}
if (durable && !superStream) {
autoBindDLQ(applyPrefix(properties.getExtension().getPrefix(), baseQueueName),
queueName, group, properties.getExtension());
}
if (superStream) {
queueName = name; // group is used in the consumer for super streams so not part of the name.
}
return new RabbitConsumerDestination(queueName, binding, anonymous ? baseQueueName : group, name);
}
private void provisionSuperStream(ExtendedConsumerProperties<RabbitConsumerProperties> properties,
String name) {
String routingKey = properties.getExtension().getBindingRoutingKey();
String rk = routingKey == null ? name : routingKey;
SuperStream ss = new SuperStream(name, properties.getInstanceCount() * properties.getConcurrency(),
(q, i) -> IntStream.range(0, i)
.mapToObj(j -> rk + "-" + j)
.collect(Collectors.toList()));
try {
this.autoDeclareContextLock.lock();
if (!this.autoDeclareContext.containsBean(name + ".superStream")) {
this.autoDeclareContext.getBeanFactory().registerSingleton(name + ".superStream", ss);
}
}
finally {
this.autoDeclareContextLock.unlock();
}
try {
ss.getDeclarables().forEach(dec -> {
if (dec instanceof Exchange exch) {
this.rabbitAdmin.declareExchange(exch);
}
else if (dec instanceof Queue queue) {
this.rabbitAdmin.declareQueue(queue);
}
else if (dec instanceof Binding binding) {
this.rabbitAdmin.declareBinding(binding);
}
});
}
catch (AmqpConnectException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Declaration of super stream: " + name
+ " deferred - connection not available");
}
}
}
/**
* Construct a name comprised of the name and group.
* @param name the name.
* @param group the group.
* @return the constructed name.
*/
protected final String groupedName(String name, String group) {
return name + GROUP_INDEX_DELIMITER
+ (StringUtils.hasText(group) ? group : "default");
}
private Binding declareConsumerBindings(String name, String routingKey,
ExtendedConsumerProperties<RabbitConsumerProperties> properties,
Exchange exchange, boolean partitioned, Queue queue) {
if (partitioned) {
return partitionedBinding(name, exchange, queue, routingKey, properties.getExtension(),
properties.getInstanceIndex());
}
else {
return notPartitionedBinding(exchange, queue, routingKey, properties.getExtension());
}
}
private Binding partitionedBinding(String destination, Exchange exchange, Queue queue, String rk,
RabbitCommonProperties extendedProperties, int index) {
String bindingKey = rk;
if (bindingKey == null) {
bindingKey = destination;
}
bindingKey += "-" + index;
Map<String, Object> arguments = new HashMap<>();
arguments.putAll(extendedProperties.getQueueBindingArguments());
if (exchange instanceof TopicExchange topicExchange) {
Binding binding = BindingBuilder.bind(queue).to(topicExchange)
.with(bindingKey);
declareBinding(queue.getName(), binding);
return binding;
}
else if (exchange instanceof DirectExchange directExchange) {
Binding binding = BindingBuilder.bind(queue).to(directExchange)
.with(bindingKey);
declareBinding(queue.getName(), binding);
return binding;
}
else if (exchange instanceof FanoutExchange) {
throw new ProvisioningException(
"A fanout exchange is not appropriate for partitioned apps");
}
else if (exchange instanceof HeadersExchange) {
Binding binding = new Binding(queue.getName(), DestinationType.QUEUE, exchange.getName(), "", arguments);
declareBinding(queue.getName(), binding);
return binding;
}
else {
throw new ProvisioningException(
"Cannot bind to a " + exchange.getType() + " exchange");
}
}
private Binding notPartitionedBinding(Exchange exchange, Queue queue, String rk,
RabbitCommonProperties extendedProperties) {
String routingKey = rk;
if (routingKey == null) {
routingKey = "#";
}
Map<String, Object> arguments = new HashMap<>(extendedProperties.getQueueBindingArguments());
return createBinding(exchange, queue, routingKey, arguments, queue.getName());
}
private Binding createBinding(Exchange exchange, Queue queue, String routingKey,
@Nullable Map<String, Object> arguments, String beanName) {
if (exchange instanceof TopicExchange) {
Binding binding = BindingBuilder.bind(queue).to((TopicExchange) exchange)
.with(routingKey);
declareBinding(beanName, binding);
return binding;
}
else if (exchange instanceof DirectExchange) {
Binding binding = BindingBuilder.bind(queue).to((DirectExchange) exchange)
.with(routingKey);
declareBinding(beanName, binding);
return binding;
}
else if (exchange instanceof FanoutExchange) {
Binding binding = BindingBuilder.bind(queue).to((FanoutExchange) exchange);
declareBinding(beanName, binding);
return binding;
}
else if (exchange instanceof HeadersExchange) {
Binding binding = new Binding(beanName, DestinationType.QUEUE, exchange.getName(), "", arguments);
declareBinding(queue.getName(), binding);
return binding;
}
else {
throw new ProvisioningException(
"Cannot bind to a " + exchange.getType() + " exchange");
}
}
private String[] bindingRoutingKeys(RabbitCommonProperties extendedProperties) {
/*
* When the delimiter is null, we get a String[1] containing the original.
*/
return StringUtils.delimitedListToStringArray(extendedProperties.getBindingRoutingKey(),
extendedProperties.getBindingRoutingKeyDelimiter());
}
/**
* If so requested, declare the DLX/DLQ and bind it. The DLQ is bound to the DLX with
* a routing key of the original queue name because we use default exchange routing by
* queue name for the original message.
* @param baseQueueName The base name for the queue (including the binder prefix, if
* any).
* @param routingKey The routing key for the queue.
* @param group The consumer group.
* @param properties the properties.
*/
private void autoBindDLQ(final String baseQueueName, String routingKey, String group,
RabbitCommonProperties properties) {
boolean autoBindDlq = properties.isAutoBindDlq();
if (this.logger.isDebugEnabled()) {
this.logger.debug("autoBindDLQ=" + autoBindDlq + " for: " + baseQueueName);
}
if (autoBindDlq) {
String dlqName;
if (properties.getDeadLetterQueueName() == null) {
dlqName = constructDLQName(baseQueueName);
}
else {
dlqName = properties.getDeadLetterQueueName();
}
Queue dlq = new Queue(dlqName, true, false, false,
queueArgs(dlqName, properties, true));
declareQueue(dlqName, dlq);
String dlxName = deadLetterExchangeName(properties);
if (properties.isDeclareDlx()) {
declareExchange(dlxName, group,
new ExchangeBuilder(dlxName,
properties.getDeadLetterExchangeType()).durable(true)
.build());
}
Map<String, Object> arguments = new HashMap<>(properties.getDlqBindingArguments());
Binding dlqBinding = new Binding(dlq.getName(), DestinationType.QUEUE,
dlxName, properties.getDeadLetterRoutingKey() == null ? routingKey
: properties.getDeadLetterRoutingKey(),
arguments);
declareBinding(dlqName, dlqBinding);
if (properties instanceof RabbitConsumerProperties rabbitConsumerProperties
&& rabbitConsumerProperties.isRepublishToDlq()) {
/*
* Also bind with the base queue name when republishToDlq is used, which
* does not know about partitioning
*/
declareBinding(dlqName + ".2", new Binding(dlq.getName(), DestinationType.QUEUE,
dlxName, baseQueueName, arguments));
}
}
}
/**
* For binder implementations that support dead lettering, construct the name of the
* dead letter entity for the underlying pipe name.
* @param name the name.
* @return constructDLQName
*/
public static String constructDLQName(String name) {
return name + ".dlq";
}
private String deadLetterExchangeName(RabbitCommonProperties properties) {
if (properties.getDeadLetterExchange() == null) {
return properties.getPrefix() + RabbitCommonProperties.DEAD_LETTER_EXCHANGE;
}
else {
return properties.getDeadLetterExchange();
}
}
private void declareQueue(String beanName, Queue queueArg) {
Queue queue = queueArg;
for (DeclarableCustomizer customizer : this.customizers) {
queue = (Queue) customizer.apply(queue);
}
try {
this.rabbitAdmin.declareQueue(queue);
}
catch (AmqpConnectException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Declaration of queue: " + queue.getName()
+ " deferred - connection not available");
}
}
catch (RuntimeException e) {
if (this.notOurAdminException) {
this.notOurAdminException = false;
throw e;
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Declaration of queue: " + queue.getName() + " deferred", e);
}
}
addToAutoDeclareContext(beanName, queue);
}
private Map<String, Object> queueArgs(String queueName,
RabbitCommonProperties properties, boolean isDlq) {
Map<String, Object> args = new HashMap<>();
if (!isDlq) {
if (properties.isAutoBindDlq()) {
String dlx;
if (properties.getDeadLetterExchange() != null) {
dlx = properties.getDeadLetterExchange();
}
else {
dlx = applyPrefix(properties.getPrefix(), "DLX");
}
args.put("x-dead-letter-exchange", dlx);
String dlRk;
if (properties.getDeadLetterRoutingKey() != null) {
dlRk = properties.getDeadLetterRoutingKey();
}
else {
dlRk = queueName;
}
args.put("x-dead-letter-routing-key", dlRk);
}
}
else {
if (properties.getDlqDeadLetterExchange() != null) {
args.put("x-dead-letter-exchange", properties.getDlqDeadLetterExchange());
}
if (properties.getDlqDeadLetterRoutingKey() != null) {
args.put("x-dead-letter-routing-key",
properties.getDlqDeadLetterRoutingKey());
}
}
additionalArgs(args, properties, isDlq);
return args;
}
private void additionalArgs(Map<String, Object> args, RabbitCommonProperties properties, boolean isDlq) {
Integer expires = isDlq ? properties.getDlqExpires() : properties.getExpires();
Integer maxLength = isDlq ? properties.getDlqMaxLength()
: properties.getMaxLength();
Integer maxLengthBytes = isDlq ? properties.getDlqMaxLengthBytes()
: properties.getMaxLengthBytes();
Integer maxPriority = isDlq ? properties.getDlqMaxPriority()
: properties.getMaxPriority();
// Add queue max priority for consumer priority support
if (!isDlq && properties instanceof RabbitConsumerProperties) {
RabbitConsumerProperties consumerProps = (RabbitConsumerProperties) properties;
if (consumerProps.getQueueMaxPriority() > 0) {
maxPriority = consumerProps.getQueueMaxPriority();
}
}
Integer ttl = isDlq ? properties.getDlqTtl() : properties.getTtl();
boolean lazy = isDlq ? properties.isDlqLazy() : properties.isLazy();
String overflow = isDlq ? properties.getDlqOverflowBehavior()
: properties.getOverflowBehavior();
QuorumConfig quorum = isDlq ? properties.getDlqQuorum() : properties.getQuorum();
boolean singleActive = isDlq ? properties.isDlqSingleActiveConsumer() : properties.isSingleActiveConsumer();
if (expires != null) {
args.put("x-expires", expires);
}
if (maxLength != null) {
args.put("x-max-length", maxLength);
}
if (maxLengthBytes != null) {
args.put("x-max-length-bytes", maxLengthBytes);
}
if (maxPriority != null) {
args.put("x-max-priority", maxPriority);
}
if (ttl != null) {
args.put("x-message-ttl", ttl);
}
if (lazy) {
args.put("x-queue-mode", "lazy");
}
if (StringUtils.hasText(overflow)) {
args.put("x-overflow", overflow);
}
if (quorum != null && quorum.isEnabled()) {
args.put("x-queue-type", "quorum");
if (quorum.getDeliveryLimit() != null) {
args.put("x-delivery-limit", quorum.getDeliveryLimit());
}
if (quorum.getInitialGroupSize() != null) {
args.put("x-quorum-initial-group-size", quorum.getInitialGroupSize());
}
}
if (singleActive) {
args.put("x-single-active-consumer", true);
}
}
public static String applyPrefix(String prefix, String name) {
return prefix + name;
}
private Exchange buildExchange(RabbitCommonProperties properties, String exchangeName,
@Nullable AlternateExchange alternate, @Nullable String beanNameQualifier) {
try {
ExchangeBuilder builder = new ExchangeBuilder(exchangeName,
properties.getExchangeType());
builder.durable(properties.isExchangeDurable());
if (properties.isExchangeAutoDelete()) {
builder.autoDelete();
}
if (properties.isDelayedExchange()) {
builder.delayed();
}
if (alternate != null && !alternate.isExists()) {
builder.alternate(alternate.getName());
configureAlternate(alternate, beanNameQualifier);
}
return builder.build();
}
catch (Exception e) {
throw new ProvisioningException("Failed to create exchange object", e);
}
}
private void configureAlternate(AlternateExchange alternate, String beanNameQualifier) {
Exchange exchange = customizeAndDeclare(new ExchangeBuilder(alternate.getName(), alternate.getType())
.durable(true)
.build());
addToAutoDeclareContext(alternate.getName() + "." + beanNameQualifier + ".exchange", exchange);
AlternateExchange.Binding binding = alternate.getBinding();
if (binding != null) {
Queue queue = new Queue(binding.getQueue());
String beanName = alternate.getName() + "." + binding.getQueue() + "." + beanNameQualifier;
declareQueue(beanName, queue);
Binding toBind = createBinding(exchange, queue, binding.getRoutingKey(), null, beanName);
}
}
private void declareExchange(final String rootName, String group, final Exchange exchangeArg) {
Exchange exchange = customizeAndDeclare(exchangeArg);
addToAutoDeclareContext(rootName + "." + group + ".exchange", exchange);
}
private Exchange customizeAndDeclare(Exchange exchange) {
for (DeclarableCustomizer customizer : this.customizers) {
exchange = (Exchange) customizer.apply(exchange);
}
try {
this.rabbitAdmin.declareExchange(exchange);
}
catch (AmqpConnectException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Declaration of exchange: " + exchange.getName()
+ " deferred - connection not available");
}
}
catch (RuntimeException e) {
if (this.notOurAdminException) {
this.notOurAdminException = false;
throw e;
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Declaration of exchange: " + exchange.getName() + " deferred",
e);
}
}
return exchange;
}
private void addToAutoDeclareContext(String name, Declarable bean) {
try {
this.autoDeclareContextLock.lock();
if (!this.autoDeclareContext.containsBean(name)) {
this.autoDeclareContext.getBeanFactory().registerSingleton(name, new Declarables(bean));
}
else {
this.autoDeclareContext.getBean(name, Declarables.class).getDeclarables().add(bean);
}
}
finally {
this.autoDeclareContextLock.unlock();
}
}
private void declareBinding(String rootName, org.springframework.amqp.core.Binding bindingArg) {
Binding binding = bindingArg;
for (DeclarableCustomizer customizer : this.customizers) {
binding = (Binding) customizer.apply(binding);
}
try {
this.rabbitAdmin.declareBinding(binding);
}
catch (AmqpConnectException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Declaration of binding: " + rootName
+ ".binding deferred - connection not available");
}
}
catch (RuntimeException e) {
if (this.notOurAdminException) {
this.notOurAdminException = false;
throw e;
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Declaration of binding: " + rootName + ".binding deferred", e);
}
}
addToAutoDeclareContext(rootName + ".binding", binding);
}
public void cleanAutoDeclareContext(ConsumerDestination destination,
ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties) {
try {
this.autoDeclareContextLock.lock();
Stream.of(StringUtils.tokenizeToStringArray(destination.getName(), ",", true,
true)).forEach(name -> {
String group = null;
String bindingName = null;
if (destination instanceof RabbitConsumerDestination rabbitConsumerDestination) {
group = rabbitConsumerDestination.getGroup();
bindingName = rabbitConsumerDestination.getBindingName();
}
RabbitConsumerProperties properties = consumerProperties.getExtension();
String toRemove = properties.isQueueNameGroupOnly() ? bindingName + "." + group : name.trim();
boolean partitioned = consumerProperties.isPartitioned();
if (partitioned) {
toRemove = removePartitionPart(toRemove);
}
removeSingleton(toRemove + ".exchange");
removeQueueAndBindingBeans(properties, name.trim(), "", group, partitioned);
});
}
finally {
this.autoDeclareContextLock.unlock();
}
}
public void cleanAutoDeclareContext(ProducerDestination dest,
ExtendedProducerProperties<RabbitProducerProperties> properties) {
try {
this.autoDeclareContextLock.lock();
if (dest instanceof RabbitProducerDestination rabbitProducerDestination) {
String qual = rabbitProducerDestination.getBeanNameQualifier();
removeSingleton(dest.getName() + "." + qual + ".exchange");
String[] requiredGroups = properties.getRequiredGroups();
if (!ObjectUtils.isEmpty(requiredGroups)) {
for (String group : requiredGroups) {
if (properties.isPartitioned()) {
for (int i = 0; i < properties.getPartitionCount(); i++) {
removeQueueAndBindingBeans(properties.getExtension(),
properties.getExtension().isQueueNameGroupOnly() ? "" : dest.getName(),
group + "-" + i, group, true);
}
}
else {
removeQueueAndBindingBeans(properties.getExtension(), dest.getName() + "." + group, "",
group, false);
}
}
}
AlternateExchange alternate = properties.getExtension().getAlternateExchange();
if (alternate != null) {
removeSingleton(alternate.getName() + "." + qual + ".exchange");
RabbitProducerProperties.AlternateExchange.Binding binding = alternate.getBinding();
if (binding != null) {
removeSingleton(alternate.getName() + "." + binding.getQueue() + "." + qual);
removeSingleton(alternate.getName() + "." + binding.getQueue() + "." + qual + ".binding");
}
}
}
}
finally {
this.autoDeclareContextLock.unlock();
}
}
private void removeQueueAndBindingBeans(RabbitCommonProperties properties, String name, String suffix,
String group, boolean partitioned) {
boolean suffixPresent = StringUtils.hasText(suffix);
String withSuffix = name + (suffixPresent ? ("." + suffix) : "");
String nameDotOptional = name;
if (!StringUtils.hasText(name)) {
withSuffix = suffix;
}
else {
nameDotOptional = name + ".";
}
removeSingleton(withSuffix + ".binding");
removeSingleton(withSuffix);
String dlq = (suffixPresent ? nameDotOptional + group : withSuffix) + ".dlq"; // only one DLQ when partitioned
if (StringUtils.hasText(properties.getDeadLetterQueueName())) {
dlq = properties.getDeadLetterQueueName();
}
else if (partitioned) {
String removedPart = removePartitionPart(dlq);
if (!removedPart.endsWith(".dlq")) {
dlq = removedPart + ".dlq";
}
}
removeSingleton(dlq + ".binding");
removeSingleton(dlq + ".2.binding");
removeSingleton(dlq);
removeSingleton(deadLetterExchangeName(properties) + "." + group + ".exchange");
}
private String removePartitionPart(String toRemove) {
int finalHyphen = toRemove.lastIndexOf("-");
if (finalHyphen > 0) {
return toRemove.substring(0, finalHyphen);
}
return toRemove;
}
private void removeSingleton(String name) {
if (this.autoDeclareContext.containsBean(name)) {
ConfigurableListableBeanFactory beanFactory = this.autoDeclareContext
.getBeanFactory();
if (beanFactory instanceof DefaultListableBeanFactory defaultListableBeanFactory) {
defaultListableBeanFactory.destroySingleton(name);
}
}
}
@Override
public void onApplicationEvent(DeclarationExceptionEvent event) {
this.notOurAdminException = true; // our admin doesn't have an event publisher
}
private static final class RabbitProducerDestination implements ProducerDestination {
private final Exchange exchange;
private final Binding binding;
private final String beanNameQualifier;
RabbitProducerDestination(Exchange exchange, Binding binding, String beanNameQualifier) {
Assert.notNull(exchange, "exchange must not be null");
this.exchange = exchange;
this.binding = binding;
this.beanNameQualifier = beanNameQualifier;
}
@Override
public String getName() {
return this.exchange.getName();
}
@Override
public String getNameForPartition(int partition) {
return this.exchange.getName();
}
@Nullable
String getBeanNameQualifier() {
return this.beanNameQualifier;
}
@Override
public String toString() {
return "RabbitProducerDestination{" + "exchange=" + this.exchange + ", binding="
+ this.binding + '}';
}
}
private static final class RabbitConsumerDestination implements ConsumerDestination {
private final String queue;
private final Binding binding;
private final String group;
private final String bindingName;
RabbitConsumerDestination(String queue, Binding binding, String group, String bindingName) {
Assert.notNull(queue, "queue must not be null");
this.queue = queue;
this.binding = binding;
this.group = group;
this.bindingName = bindingName;
}
@Override
public String getName() {
return this.queue;
}
String getGroup() {
return this.group;
}
String getBindingName() {
return this.bindingName;
}
@Override
public String toString() {
return "RabbitConsumerDestination{" + "queue=" + this.queue + ", binding="
+ this.binding + ", group=" + this.group + ", bindingName=" + this.bindingName + '}';
}
}
}