RabbitProducerProperties.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.properties;
import java.util.Optional;
import jakarta.validation.constraints.Min;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
public class RabbitProducerProperties extends RabbitCommonProperties {
/**
* Determines the producer type.
* @since 3.2
*/
public enum ProducerType {
/**
* RabbitMQ Stream producer - blocks until confirm received.
*/
STREAM_SYNC,
/**
* RabbitMQ Stream producer - does not block.
*/
STREAM_ASYNC,
/**
* Classic AMQP producer.
*/
AMQP
}
/**
* true to compress messages.
*/
private boolean compress;
/**
* true to batch multiple messages into one.
*/
private boolean batchingEnabled;
/**
* the number of messages to batch, when enabled.
*/
private int batchSize = 100;
/**
* the size limit for batched messages.
*/
private int batchBufferLimit = 10000;
/**
* the time after which an incomplete batch will be sent.
*/
private int batchTimeout = 5000;
/**
* the bean name of a custom batching strategy to use instead of the
* {@link org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy}.
*/
private String batchingStrategyBeanName;
/**
* true to use transacted channels.
*/
private boolean transacted;
/**
* the delivery mode for published messages.
*/
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
/**
* patterns to match which headers are mapped (inbound).
*/
private String[] headerPatterns = new String[] { "*" };
/**
* when using a delayed message exchange, a SpEL expression to determine the delay to
* apply to messages.
*/
private Expression delayExpression;
/**
* a static routing key when publishing messages; default is the destination name;
* suffixed by "-partition" when partitioned. This is only used if `routingKeyExpression` is null
*/
private String routingKey;
/**
* a custom routing key when publishing messages; default is the destination name;
* suffixed by "-partition" when partitioned.
*/
private Expression routingKeyExpression;
/**
* the channel name to which to send publisher confirms (acks) if the connection
* factory is so configured; default 'nullChannel'; requires
* 'errorChannelEnabled=true'.
*/
private String confirmAckChannel;
/**
* When true, the binding will complete the {@link java.util.concurrent.Future} field
* in a {@link org.springframework.amqp.rabbit.connection.CorrelationData} contained
* in the
* {@link org.springframework.amqp.support.AmqpHeaders#PUBLISH_CONFIRM_CORRELATION}
* header when the confirmation is received.
*/
private boolean useConfirmHeader;
/**
* When STREAM_SYNC or STREAM_ASYNC, create a RabbitMQ Stream producer instead of an
* AMQP producer.
* @since 3.2
*/
private ProducerType producerType = ProducerType.AMQP;
/**
* The bean name of a message converter to convert from spring-messaging Message to
* a Spring AMQP Message.
* @since 3.2
*/
private String streamMessageConverterBeanName;
/**
* Configure an alternate exchange for when no queues are bound.
* @since 4.0
*/
private AlternateExchange alternateExchange;
/**
* When the producer type is STREAM_*, set this to true to publish to a super stream.
* Also requires a partition key.
*/
private boolean superStream;
public void setCompress(boolean compress) {
this.compress = compress;
}
public boolean isCompress() {
return compress;
}
public void setDeliveryMode(MessageDeliveryMode deliveryMode) {
this.deliveryMode = deliveryMode;
}
public MessageDeliveryMode getDeliveryMode() {
return deliveryMode;
}
public String[] getHeaderPatterns() {
return headerPatterns;
}
public void setHeaderPatterns(String[] replyHeaderPatterns) {
this.headerPatterns = replyHeaderPatterns;
}
public boolean isBatchingEnabled() {
return batchingEnabled;
}
public void setBatchingEnabled(boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}
@Min(value = 1, message = "Batch Size should be greater than zero.")
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
@Min(value = 1, message = "Batch Buffer Limit should be greater than zero.")
public int getBatchBufferLimit() {
return batchBufferLimit;
}
public void setBatchBufferLimit(int batchBufferLimit) {
this.batchBufferLimit = batchBufferLimit;
}
@Min(value = 1, message = "Batch Timeout should be greater than zero.")
public int getBatchTimeout() {
return batchTimeout;
}
public void setBatchTimeout(int batchTimeout) {
this.batchTimeout = batchTimeout;
}
public boolean isTransacted() {
return this.transacted;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public Expression getDelayExpression() {
return this.delayExpression;
}
public void setDelayExpression(Expression delayExpression) {
this.delayExpression = delayExpression;
}
public Expression getRoutingKeyExpression() {
return Optional.ofNullable(this.routingKeyExpression)
.orElseGet(() -> Optional.ofNullable(this.routingKey)
.map(LiteralExpression::new)
.orElse(null));
}
public void setRoutingKeyExpression(Expression routingKeyExpression) {
this.routingKeyExpression = routingKeyExpression;
}
public String getRoutingKey() {
return this.routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getConfirmAckChannel() {
return this.confirmAckChannel;
}
public void setConfirmAckChannel(String confirmAckChannel) {
this.confirmAckChannel = confirmAckChannel;
}
public String getBatchingStrategyBeanName() {
return batchingStrategyBeanName;
}
public void setBatchingStrategyBeanName(String batchingStrategyBeanName) {
this.batchingStrategyBeanName = batchingStrategyBeanName;
}
public boolean isUseConfirmHeader() {
return this.useConfirmHeader;
}
public void setUseConfirmHeader(boolean useConfirmHeader) {
this.useConfirmHeader = useConfirmHeader;
}
public ProducerType getProducerType() {
return this.producerType;
}
public void setProducerType(ProducerType producerType) {
Assert.notNull(producerType, "'producerType' cannot be null");
this.producerType = producerType;
}
public String getStreamMessageConverterBeanName() {
return this.streamMessageConverterBeanName;
}
public void setStreamMessageConverterBeanName(String streamMessageConverterBeanName) {
this.streamMessageConverterBeanName = streamMessageConverterBeanName;
}
@Nullable
public AlternateExchange getAlternateExchange() {
return this.alternateExchange;
}
public void setAlternateExchange(AlternateExchange alternate) {
this.alternateExchange = alternate;
}
public boolean isSuperStream() {
return this.superStream;
}
public void setSuperStream(boolean superStream) {
this.superStream = superStream;
}
public static class AlternateExchange {
/**
* The alternate exchange name.
*/
private String name;
/**
* Whether the exchange exists or should be provisioned.
*/
private boolean exists = false;
/**
* The alternate exchange type.
*/
private String type = ExchangeTypes.TOPIC;
/**
* Bind a durable queue to the alternate exchange.
*/
private Binding binding;
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public boolean isExists() {
return this.exists;
}
public void setExists(boolean exists) {
this.exists = exists;
}
public String getType() {
return this.type;
}
public void setType(String type) {
this.type = type;
}
public Binding getBinding() {
return this.binding;
}
public void setBinding(Binding binding) {
this.binding = binding;
}
public static class Binding {
/**
* The routing key.
*/
private String routingKey = "#";
/**
* The queue name.
*/
private String queue;
public String getRoutingKey() {
return this.routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getQueue() {
return this.queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
}
}
}