RabbitConsumerProperties.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.properties;
import jakarta.validation.constraints.Min;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.util.Assert;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
public class RabbitConsumerProperties extends RabbitCommonProperties {
/**
* true to use transacted channels.
*/
private boolean transacted;
/**
* container acknowledge mode.
*/
private AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
/**
* maxumum concurrency of this consumer (threads).
*/
private int maxConcurrency = 1;
/**
* number of prefetched messages pre consumer thread.
*/
private int prefetch = 1;
/**
* messages per acknowledgment (and commit when transacted).
*/
private int batchSize = 1;
/**
* true for a durable subscription.
*/
private boolean durableSubscription = true;
/**
* republish failures to the DLQ with diagnostic headers.
*/
private boolean republishToDlq = true;
/**
* when republishing to the DLQ, the delivery mode to use.
*/
private MessageDeliveryMode republishDeliveyMode = MessageDeliveryMode.PERSISTENT;
/**
* true to requeue rejected messages, false to discard (or route to DLQ).
*/
private boolean requeueRejected = false;
/**
* patterns to match which headers are mapped (inbound).
*/
private String[] headerPatterns = new String[] { "*" };
/**
* interval between reconnection attempts.
*/
private long recoveryInterval = 5000;
/**
* true if the consumer is exclusive.
*/
private boolean exclusive;
/**
* when true, stop the container instead of retrying queue declarations.
*/
private boolean missingQueuesFatal = false;
/**
* how many times to attempt passive queue declaration.
*/
private Integer queueDeclarationRetries;
/**
* interval between attempts to passively declare missing queues.
*/
private Long failedDeclarationRetryInterval;
/**
* Used to create the consumer tags; will be appended by '#n' where 'n' increments for
* each consumer created.
*/
private String consumerTagPrefix;
/**
* Room to leave for other headers after adding the stack trace to a DLQ message.
*/
private int frameMaxHeadroom = 20_000;
/**
* The container type, SIMPLE, DIRECT, or STREAM.
*/
private ContainerType containerType = ContainerType.SIMPLE;
/**
* Prefix for anonymous queue names (when no group is provided).
*/
private String anonymousGroupPrefix = "anonymous.";
/**
* When true, the listener container will assemble a list from multiple messages,
* according to the batchSize and receiveTimeout properties. Only applies with
* {@link ContainerType#SIMPLE}.
*/
private boolean enableBatching;
/**
* How long to block waiting to receive messages; increasing from the default 1 second
* will make the binding less responsive to stop requests. When enableConsumerBatching
* is true, a short batch may be emitted if this time elapses before the batchSize is
* satisfied. Only applies with {@link ContainerType#SIMPLE}.
*/
private Long receiveTimeout;
/**
* When the container type is STREAM, set this to true to create a super stream with
* competing consumers.
*/
private boolean superStream;
/**
* Consumer priority for this consumer. Higher values indicate higher priority.
* Requires the queue to be declared with x-max-priority argument.
* Valid range: 0-255. Default: -1 (no priority set).
*/
private int consumerPriority = -1;
/**
* Maximum priority for the queue. When set, the queue will be declared with
* x-max-priority argument. Valid range: 1-255. Default: -1 (not set).
*/
private int queueMaxPriority = -1;
public boolean isTransacted() {
return transacted;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public AcknowledgeMode getAcknowledgeMode() {
return acknowledgeMode;
}
public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
Assert.notNull(acknowledgeMode, "Acknowledge mode cannot be null");
this.acknowledgeMode = acknowledgeMode;
}
@Min(value = 1, message = "Max Concurrency should be greater than zero.")
public int getMaxConcurrency() {
return maxConcurrency;
}
public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}
@Min(value = 1, message = "Prefetch should be greater than zero.")
public int getPrefetch() {
return prefetch;
}
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
@Min(value = 1, message = "Batch Size should be greater than zero.")
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public boolean isDurableSubscription() {
return durableSubscription;
}
public void setDurableSubscription(boolean durableSubscription) {
this.durableSubscription = durableSubscription;
}
public boolean isRepublishToDlq() {
return republishToDlq;
}
public void setRepublishToDlq(boolean republishToDlq) {
this.republishToDlq = republishToDlq;
}
public boolean isRequeueRejected() {
return requeueRejected;
}
public MessageDeliveryMode getRepublishDeliveyMode() {
return this.republishDeliveyMode;
}
public void setRepublishDeliveyMode(MessageDeliveryMode republishDeliveyMode) {
this.republishDeliveyMode = republishDeliveyMode;
}
public void setRequeueRejected(boolean requeueRejected) {
this.requeueRejected = requeueRejected;
}
public String[] getHeaderPatterns() {
return headerPatterns;
}
public void setHeaderPatterns(String[] replyHeaderPatterns) {
this.headerPatterns = replyHeaderPatterns;
}
public long getRecoveryInterval() {
return recoveryInterval;
}
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
public boolean isExclusive() {
return this.exclusive;
}
public void setExclusive(boolean exclusive) {
this.exclusive = exclusive;
}
public boolean getMissingQueuesFatal() {
return this.missingQueuesFatal;
}
public void setMissingQueuesFatal(boolean missingQueuesFatal) {
this.missingQueuesFatal = missingQueuesFatal;
}
public Integer getQueueDeclarationRetries() {
return this.queueDeclarationRetries;
}
public void setQueueDeclarationRetries(Integer queueDeclarationRetries) {
this.queueDeclarationRetries = queueDeclarationRetries;
}
public Long getFailedDeclarationRetryInterval() {
return this.failedDeclarationRetryInterval;
}
public void setFailedDeclarationRetryInterval(Long failedDeclarationRetryInterval) {
this.failedDeclarationRetryInterval = failedDeclarationRetryInterval;
}
public String getConsumerTagPrefix() {
return this.consumerTagPrefix;
}
public void setConsumerTagPrefix(String consumerTagPrefix) {
this.consumerTagPrefix = consumerTagPrefix;
}
public int getFrameMaxHeadroom() {
return this.frameMaxHeadroom;
}
public void setFrameMaxHeadroom(int frameMaxHeadroom) {
this.frameMaxHeadroom = frameMaxHeadroom;
}
public ContainerType getContainerType() {
return this.containerType;
}
public void setContainerType(ContainerType containerType) {
this.containerType = containerType;
}
public String getAnonymousGroupPrefix() {
return this.anonymousGroupPrefix;
}
public void setAnonymousGroupPrefix(String anonymousGroupPrefix) {
this.anonymousGroupPrefix = anonymousGroupPrefix;
}
public boolean isEnableBatching() {
return this.enableBatching;
}
public void setEnableBatching(boolean enableBatching) {
this.enableBatching = enableBatching;
}
public Long getReceiveTimeout() {
return this.receiveTimeout;
}
public void setReceiveTimeout(Long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public boolean isSuperStream() {
return this.superStream;
}
public void setSuperStream(boolean superStream) {
this.superStream = superStream;
}
public int getConsumerPriority() {
return this.consumerPriority;
}
public void setConsumerPriority(int consumerPriority) {
this.consumerPriority = consumerPriority;
}
public int getQueueMaxPriority() {
return this.queueMaxPriority;
}
public void setQueueMaxPriority(int queueMaxPriority) {
this.queueMaxPriority = queueMaxPriority;
}
/**
* Container type.
* @author Gary Russell
* @since 3.2
*
*/
public enum ContainerType {
/**
* Container where the RabbitMQ consumer dispatches messages to an invoker thread.
*/
SIMPLE,
/**
* Container where the listener is invoked directly on the RabbitMQ consumer
* thread.
*/
DIRECT,
/**
* Container that uses the RabbitMQ Stream Client.
*/
STREAM
}
}