KafkaConsumerProperties.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.properties;

import java.util.HashMap;
import java.util.Map;

import org.springframework.kafka.listener.ContainerProperties;

/**
 * Extended consumer properties for Kafka binder.
 *
 * @author Marius Bogoevici
 * @author Ilayaperumal Gopinathan
 * @author Soby Chacko
 * @author Gary Russell
 * @author Aldo Sinanaj
 *
 * <p>
 * Thanks to Laszlo Szabo for providing the initial patch for generic property support.
 * </p>
 */
public class KafkaConsumerProperties {

	/**
	 * Enumeration for starting consumer offset.
	 */
	public enum StartOffset {

		/**
		 * Starting from earliest offset.
		 */
		earliest(-2L),
		/**
		 * Starting from latest offset.
		 */
		latest(-1L);

		private final long referencePoint;

		StartOffset(long referencePoint) {
			this.referencePoint = referencePoint;
		}

		public long getReferencePoint() {
			return this.referencePoint;
		}

	}

	/**
	 * Standard headers for the message.
	 */
	public enum StandardHeaders {

		/**
		 * No headers.
		 */
		none,
		/**
		 * Message header representing ID.
		 */
		id,
		/**
		 * Message header representing timestamp.
		 */
		timestamp,
		/**
		 * Indicating both ID and timestamp headers.
		 */
		both

	}


	/**
	 * When true, topic partitions is automatically rebalanced between the members of a consumer group.
	 * When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex.
	 */
	private boolean autoRebalanceEnabled = true;

	/**
	 * Controlling the container acknowledgement mode. This is the preferred way to control the ack mode on the
	 * container instead of the deprecated autoCommitOffset property.
	 */
	private ContainerProperties.AckMode ackMode;

	/**
	 * Flag to enable auto commit on error in polled consumers.
	 */
	private Boolean autoCommitOnError;

	/**
	 * The starting offset for new groups. Allowed values: earliest and latest.
	 */
	private StartOffset startOffset;

	/**
	 * Whether to reset offsets on the consumer to the value provided by startOffset.
	 * Must be false if a KafkaRebalanceListener is provided.
	 */
	private boolean resetOffsets;

	/**
	 * When set to true, it enables DLQ behavior for the consumer.
	 * By default, messages that result in errors are forwarded to a topic named error.name-of-destination.name-of-group.
	 * The DLQ topic name can be configurable by setting the dlqName property.
	 */
	private boolean enableDlq;

	/**
	 * The name of the DLQ topic to receive the error messages.
	 */
	private String dlqName;

	/**
	 * Number of partitions to use on the DLQ.
	 */
	private Integer dlqPartitions;

	/**
	 * Using this, DLQ-specific producer properties can be set.
	 * All the properties available through kafka producer properties can be set through this property.
	 */
	private KafkaProducerProperties dlqProducerProperties = new KafkaProducerProperties();

	/**
	 * List of trusted packages to provide the header mapper.
	 */
	private String[] trustedPackages;

	/**
	 * Indicates which standard headers are populated by the inbound channel adapter.
	 * Allowed values: none, id, timestamp, or both.
	 */
	private StandardHeaders standardHeaders = StandardHeaders.none;

	/**
	 * The name of a bean that implements RecordMessageConverter.
	 */
	private String converterBeanName;

	/**
	 * The interval, in milliseconds, between events indicating that no messages have recently been received.
	 */
	private long idleEventInterval = 30_000;

	/**
	 * When true, the destination is treated as a regular expression Pattern used to match topic names by the broker.
	 */
	private boolean destinationIsPattern;

	/**
	 * Map with a key/value pair containing generic Kafka consumer properties.
	 * In addition to having Kafka consumer properties, other configuration properties can be passed here.
	 */
	private Map<String, String> configuration = new HashMap<>();

	/**
	 * Various topic level properties. @see {@link KafkaTopicProperties} for more details.
	 */
	private KafkaTopicProperties topic = new KafkaTopicProperties();

	/**
	 * Timeout used for polling in pollable consumers.
	 */
	private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;

	/**
	 * Transaction manager bean name - overrides the binder's transaction configuration.
	 */
	private String transactionManager;

	/**
	 * Set to false to NOT commit the offset of a successfully recovered recovered in the after rollback processor.
	 */
	private boolean txCommitRecovered = true;

	/**
	 * CommonErrorHandler bean name per consumer binding.
	 * @since 3.2
	 */
	private String commonErrorHandlerBeanName;

	/**
	 * When using the reactive binder, automatically commit offsets when records received
	 * by the poll have been processed.
	 * @since 4.0.2
	 */
	private boolean reactiveAutoCommit;

	/**
	 * When using the reactive binder, automatically commit offsets of records before they
	 * are passed to the user code.
	 * @since 4.0.3
	 */
	private boolean reactiveAtMostOnce;



	/**
	 * @return Container's ack mode.
	 */
	public ContainerProperties.AckMode getAckMode() {
		return this.ackMode;
	}

	public void setAckMode(ContainerProperties.AckMode ackMode) {
		this.ackMode = ackMode;
	}

	/**
	 * @return start offset
	 *
	 * The starting offset for new groups. Allowed values: earliest and latest.
	 */
	public StartOffset getStartOffset() {
		return this.startOffset;
	}

	public void setStartOffset(StartOffset startOffset) {
		this.startOffset = startOffset;
	}

	/**
	 * @return if resetting offset is enabled
	 *
	 * Whether to reset offsets on the consumer to the value provided by startOffset.
	 * Must be false if a KafkaRebalanceListener is provided.
	 */
	public boolean isResetOffsets() {
		return this.resetOffsets;
	}

	public void setResetOffsets(boolean resetOffsets) {
		this.resetOffsets = resetOffsets;
	}

	/**
	 * @return is DLQ enabled.
	 *
	 * When set to true, it enables DLQ behavior for the consumer.
	 * By default, messages that result in errors are forwarded to a topic named error.name-of-destination.name-of-group.
	 * The DLQ topic name can be configurable by setting the dlqName property.
	 */
	public boolean isEnableDlq() {
		return this.enableDlq;
	}

	public void setEnableDlq(boolean enableDlq) {
		this.enableDlq = enableDlq;
	}

	/**
	 * @return is autocommit on error in polled consumers.
	 *
	 * This property accessor is only used in polled consumers.
	 */
	public Boolean getAutoCommitOnError() {
		return this.autoCommitOnError;
	}

	/**
	 *
	 * @param autoCommitOnError commit on error in polled consumers.
	 *
	 */
	public void setAutoCommitOnError(Boolean autoCommitOnError) {
		this.autoCommitOnError = autoCommitOnError;
	}

	/**
	 * @return is auto rebalance enabled
	 *
	 * When true, topic partitions is automatically rebalanced between the members of a consumer group.
	 * When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex.
	 */
	public boolean isAutoRebalanceEnabled() {
		return this.autoRebalanceEnabled;
	}

	public void setAutoRebalanceEnabled(boolean autoRebalanceEnabled) {
		this.autoRebalanceEnabled = autoRebalanceEnabled;
	}

	/**
	 * @return a map of configuration
	 *
	 * Map with a key/value pair containing generic Kafka consumer properties.
	 * In addition to having Kafka consumer properties, other configuration properties can be passed here.
	 */
	public Map<String, String> getConfiguration() {
		return this.configuration;
	}

	public void setConfiguration(Map<String, String> configuration) {
		this.configuration = configuration;
	}

	/**
	 * @return dlq name
	 *
	 * The name of the DLQ topic to receive the error messages.
	 */
	public String getDlqName() {
		return this.dlqName;
	}

	public void setDlqName(String dlqName) {
		this.dlqName = dlqName;
	}

	/**
	 * @return number of partitions on the DLQ topic
	 *
	 * Number of partitions to use on the DLQ.
	 */
	public Integer getDlqPartitions() {
		return this.dlqPartitions;
	}

	public void setDlqPartitions(Integer dlqPartitions) {
		this.dlqPartitions = dlqPartitions;
	}

	/**
	 * @return trusted packages
	 *
	 * List of trusted packages to provide the header mapper.
	 */
	public String[] getTrustedPackages() {
		return this.trustedPackages;
	}

	public void setTrustedPackages(String[] trustedPackages) {
		this.trustedPackages = trustedPackages;
	}

	/**
	 * @return dlq producer properties
	 *
	 * Using this, DLQ-specific producer properties can be set.
	 * All the properties available through kafka producer properties can be set through this property.
	 */
	public KafkaProducerProperties getDlqProducerProperties() {
		return this.dlqProducerProperties;
	}

	public void setDlqProducerProperties(KafkaProducerProperties dlqProducerProperties) {
		this.dlqProducerProperties = dlqProducerProperties;
	}

	/**
	 * @return standard headers
	 *
	 * Indicates which standard headers are populated by the inbound channel adapter.
	 * Allowed values: none, id, timestamp, or both.
	 */
	public StandardHeaders getStandardHeaders() {
		return this.standardHeaders;
	}

	public void setStandardHeaders(StandardHeaders standardHeaders) {
		this.standardHeaders = standardHeaders;
	}

	/**
	 * @return converter bean name
	 *
	 * The name of a bean that implements RecordMessageConverter.
	 */
	public String getConverterBeanName() {
		return this.converterBeanName;
	}

	public void setConverterBeanName(String converterBeanName) {
		this.converterBeanName = converterBeanName;
	}

	/**
	 * @return idle event interval
	 *
	 * The interval, in milliseconds, between events indicating that no messages have recently been received.
	 */
	public long getIdleEventInterval() {
		return this.idleEventInterval;
	}

	public void setIdleEventInterval(long idleEventInterval) {
		this.idleEventInterval = idleEventInterval;
	}

	/**
	 * @return is destination given through a pattern
	 *
	 * When true, the destination is treated as a regular expression Pattern used to match topic names by the broker.
	 */
	public boolean isDestinationIsPattern() {
		return this.destinationIsPattern;
	}

	public void setDestinationIsPattern(boolean destinationIsPattern) {
		this.destinationIsPattern = destinationIsPattern;
	}

	/**
	 * @return topic properties
	 *
	 * Various topic level properties. @see {@link KafkaTopicProperties} for more details.
	 */
	public KafkaTopicProperties getTopic() {
		return this.topic;
	}

	public void setTopic(KafkaTopicProperties topic) {
		this.topic = topic;
	}

	/**
	 * @return timeout in pollable consumers
	 *
	 * Timeout used for polling in pollable consumers.
	 */
	public long getPollTimeout() {
		return this.pollTimeout;
	}

	public void setPollTimeout(long pollTimeout) {
		this.pollTimeout = pollTimeout;
	}

	/**
	 * @return the transaction manager bean name.
	 *
	 * Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
	 */
	public String getTransactionManager() {
		return this.transactionManager;
	}

	public void setTransactionManager(String transactionManager) {
		this.transactionManager = transactionManager;
	}

	public boolean isTxCommitRecovered() {
		return this.txCommitRecovered;
	}

	public void setTxCommitRecovered(boolean txCommitRecovered) {
		this.txCommitRecovered = txCommitRecovered;
	}

	public String getCommonErrorHandlerBeanName() {
		return commonErrorHandlerBeanName;
	}

	public void setCommonErrorHandlerBeanName(String commonErrorHandlerBeanName) {
		this.commonErrorHandlerBeanName = commonErrorHandlerBeanName;
	}

	public boolean isReactiveAutoCommit() {
		return this.reactiveAutoCommit;
	}

	public void setReactiveAutoCommit(boolean reactiveAutoCommit) {
		this.reactiveAutoCommit = reactiveAutoCommit;
	}

	public boolean isReactiveAtMostOnce() {
		return this.reactiveAtMostOnce;
	}

	public void setReactiveAtMostOnce(boolean reactiveAtMostOnce) {
		this.reactiveAtMostOnce = reactiveAtMostOnce;
	}

}