ConsumerConfigProperties.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar.properties;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.MessageId;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.pulsar.autoconfigure.PulsarProperties;
import org.springframework.util.Assert;

/**
 * Configuration properties used to specify Pulsar consumers.
 *
 * @author Chris Bono
 */
public class ConsumerConfigProperties extends PulsarProperties.Consumer {

	private final Acknowledgement ack = new Acknowledgement();

	private final Chunking chunk = new Chunking();

	private final Subscription subscription = new Subscription();

	/**
	 * Number of messages that can be accumulated before the consumer calls "receive".
	 */
	private Integer receiverQueueSize = 1000;

	/**
	 * Maximum number of messages that a consumer can be pushed at once from a broker
	 * across all partitions.
	 */
	private Integer maxTotalReceiverQueueSizeAcrossPartitions = 50000;

	/**
	 * Action the consumer will take in case of decryption failure.
	 */
	private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

	/**
	 * Map of properties to add to the consumer.
	 */
	private SortedMap<String, String> properties = new TreeMap<>();

	/**
	 * Auto-discovery period for topics when topic pattern is used in minutes.
	 */
	private Integer patternAutoDiscoveryPeriod = 1;

	/**
	 * Whether the consumer auto-subscribes for partition increase. This is only for
	 * partitioned consumers.
	 */
	private Boolean autoUpdatePartitions = true;

	/**
	 * Interval of partitions discovery updates.
	 */
	private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);

	/**
	 * Whether to include the given position of any reset operation like
	 * {@link org.apache.pulsar.client.api.Consumer#seek(long) or
	 * {@link ConsumerConfigProperties#seek(MessageId)}}.
	 */
	private Boolean resetIncludeHead = false;

	/**
	 * Whether pooling of messages and the underlying data buffers is enabled.
	 */
	private Boolean poolMessages = false;

	/**
	 * Whether to start the consumer in a paused state.
	 */
	private Boolean startPaused = false;

	public Acknowledgement getAck() {
		return this.ack;
	}

	public Chunking getChunk() {
		return this.chunk;
	}

	public Subscription getSubscription() {
		return this.subscription;
	}

	public Integer getReceiverQueueSize() {
		return this.receiverQueueSize;
	}

	public void setReceiverQueueSize(Integer receiverQueueSize) {
		this.receiverQueueSize = receiverQueueSize;
	}

	public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() {
		return this.maxTotalReceiverQueueSizeAcrossPartitions;
	}

	public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer maxTotalReceiverQueueSizeAcrossPartitions) {
		this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
	}

	public ConsumerCryptoFailureAction getCryptoFailureAction() {
		return this.cryptoFailureAction;
	}

	public void setCryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) {
		this.cryptoFailureAction = cryptoFailureAction;
	}

	public SortedMap<String, String> getProperties() {
		return this.properties;
	}

	public void setProperties(SortedMap<String, String> properties) {
		this.properties = properties;
	}

	public Integer getPatternAutoDiscoveryPeriod() {
		return this.patternAutoDiscoveryPeriod;
	}

	public void setPatternAutoDiscoveryPeriod(Integer patternAutoDiscoveryPeriod) {
		this.patternAutoDiscoveryPeriod = patternAutoDiscoveryPeriod;
	}

	public Boolean getAutoUpdatePartitions() {
		return this.autoUpdatePartitions;
	}

	public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
		this.autoUpdatePartitions = autoUpdatePartitions;
	}

	public Duration getAutoUpdatePartitionsInterval() {
		return this.autoUpdatePartitionsInterval;
	}

	public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
		this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
	}

	public Boolean getResetIncludeHead() {
		return this.resetIncludeHead;
	}

	public void setResetIncludeHead(Boolean resetIncludeHead) {
		this.resetIncludeHead = resetIncludeHead;
	}

	public Boolean getPoolMessages() {
		return this.poolMessages;
	}

	public void setPoolMessages(Boolean poolMessages) {
		this.poolMessages = poolMessages;
	}

	public Boolean getStartPaused() {
		return this.startPaused;
	}

	public void setStartPaused(Boolean startPaused) {
		this.startPaused = startPaused;
	}

	/**
	 * Gets a map representation of the base consumer properties (those defined in parent
	 * class).
	 * @return map of base consumer properties and associated values.
	 */
	public Map<String, Object> toBaseConsumerPropertiesMap() {
		var consumerProps = new ConsumerConfigProperties.Properties();
		var map = PropertyMapper.get();
		map.from(this::getDeadLetterPolicy).as(this::toPulsarDeadLetterPolicy).to(consumerProps.in("deadLetterPolicy"));
		map.from(this::getName).to(consumerProps.in("consumerName"));
		map.from(this::getPriorityLevel).to(consumerProps.in("priorityLevel"));
		map.from(this::isReadCompacted).to(consumerProps.in("readCompacted"));
		map.from(this::isRetryEnable).to(consumerProps.in("retryEnable"));
		map.from(this::getTopics).to(consumerProps.in("topicNames"));
		map.from(this::getTopicsPattern).to(consumerProps.in("topicsPattern"));
		mapBaseSubscriptionProperties(this.getSubscription(), consumerProps, map);
		return consumerProps;
	}

	private org.apache.pulsar.client.api.DeadLetterPolicy toPulsarDeadLetterPolicy(DeadLetterPolicy policy) {
		Assert.state(policy.getMaxRedeliverCount() > 0,
				"Pulsar DeadLetterPolicy must have a positive 'max-redelivery-count' property value");
		PropertyMapper map = PropertyMapper.get();
		org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder builder = org.apache.pulsar.client.api.DeadLetterPolicy
				.builder();
		map.from(policy::getMaxRedeliverCount).to(builder::maxRedeliverCount);
		map.from(policy::getRetryLetterTopic).to(builder::retryLetterTopic);
		map.from(policy::getDeadLetterTopic).to(builder::deadLetterTopic);
		map.from(policy::getInitialSubscriptionName).to(builder::initialSubscriptionName);
		return builder.build();
	}

	private void mapBaseSubscriptionProperties(PulsarProperties.Consumer.Subscription subscription, Properties consumerProps, PropertyMapper map) {
		map.from(subscription::getName).to(consumerProps.in("subscriptionName"));
		map.from(subscription::getType).to(consumerProps.in("subscriptionType"));
		map.from(subscription::getMode).to(consumerProps.in("subscriptionMode"));
		map.from(subscription::getInitialPosition).to(consumerProps.in("subscriptionInitialPosition"));
		map.from(subscription::getTopicsMode).to(consumerProps.in("regexSubscriptionMode"));
	}

	/**
	 * Gets a map representation of the extended consumer properties (those defined in
	 * this class).
	 * @return map of extended consumer properties and associated values.
	 */
	public Map<String, Object> toExtendedConsumerPropertiesMap() {
		var consumerProps = new ConsumerConfigProperties.Properties();
		var map = PropertyMapper.get();
		map.from(this::getAutoUpdatePartitions).to(consumerProps.in("autoUpdatePartitions"));
		map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
				.to(consumerProps.in("autoUpdatePartitionsIntervalSeconds"));
		map.from(this::getCryptoFailureAction).to(consumerProps.in("cryptoFailureAction"));
		map.from(this::getMaxTotalReceiverQueueSizeAcrossPartitions)
				.to(consumerProps.in("maxTotalReceiverQueueSizeAcrossPartitions"));
		map.from(this::getPatternAutoDiscoveryPeriod).to(consumerProps.in("patternAutoDiscoveryPeriod"));
		map.from(this::getPoolMessages).to(consumerProps.in("poolMessages"));
		map.from(this::getProperties).to(consumerProps.in("properties"));
		map.from(this::getReceiverQueueSize).to(consumerProps.in("receiverQueueSize"));
		map.from(this::getResetIncludeHead).to(consumerProps.in("resetIncludeHead"));
		map.from(this::getStartPaused).to(consumerProps.in("startPaused"));
		this.mapAcknowledgementProperties(this.getAck(), consumerProps, map);
		this.mapChunkProperties(this.getChunk(), consumerProps, map);
		this.mapExtendedSubscriptionProperties(this.getSubscription(), consumerProps, map);
		return consumerProps;
	}

	private void mapAcknowledgementProperties(Acknowledgement ack, Properties consumerProps, PropertyMapper map) {
		map.from(ack::getGroupTime).as(it -> it.toNanos() / 1000)
				.to(consumerProps.in("acknowledgementsGroupTimeMicros"));
		map.from(ack::getRedeliveryDelay).as(it -> it.toNanos() / 1000)
			.to(consumerProps.in("negativeAckRedeliveryDelayMicros"));
		map.from(ack::getTimeout).as(Duration::toMillis)
				.to(consumerProps.in("ackTimeoutMillis"));
		map.from(ack::getTimeoutTickDuration).as(Duration::toMillis)
				.to(consumerProps.in("tickDurationMillis"));
		map.from(ack::getBatchIndexEnabled).to(consumerProps.in("batchIndexAckEnabled"));
		map.from(ack::getReceiptEnabled).to(consumerProps.in("ackReceiptEnabled"));
	}

	private void mapChunkProperties(Chunking chunk, Properties consumerProps, PropertyMapper map) {
		map.from(chunk::getExpireTimeIncomplete).as(Duration::toMillis)
				.to(consumerProps.in("expireTimeOfIncompleteChunkedMessageMillis"));
		map.from(chunk::getAutoAckOldestOnQueueFull)
				.to(consumerProps.in("autoAckOldestChunkedMessageOnQueueFull"));
		map.from(chunk::getMaxPendingMessages).to(consumerProps.in("maxPendingChunkedMessage"));
	}

	private void mapExtendedSubscriptionProperties(Subscription subscription, Properties consumerProps, PropertyMapper map) {
		map.from(subscription::getProperties).to(consumerProps.in("subscriptionProperties"));
		map.from(subscription::getReplicateState).to(consumerProps.in("replicateSubscriptionState"));
	}

	/**
	 * Gets a map representation of base and extended consumer properties.
	 * @return map of base and extended consumer properties and associated values.
	 */
	public Map<String, Object> toAllConsumerPropertiesMap() {
		var consumerProps = this.toBaseConsumerPropertiesMap();
		consumerProps.putAll(this.toExtendedConsumerPropertiesMap());
		return consumerProps;
	}

	public static class Acknowledgement {

		/**
		 * Whether the batching index acknowledgment is enabled.
		 */
		private Boolean batchIndexEnabled = false;

		/**
		 * Time to group acknowledgements before sending them to the broker.
		 */
		private Duration groupTime = Duration.ofMillis(100);

		/**
		 * Whether an acknowledgement receipt is enabled.
		 */
		private Boolean receiptEnabled = false;

		/**
		 * Delay before re-delivering messages that have failed to be processed.
		 */
		private Duration redeliveryDelay = Duration.ofMinutes(1);

		/**
		 * Timeout for unacked messages to be redelivered.
		 */
		private Duration timeout = Duration.ZERO;

		/**
		 * Precision for the ack timeout messages tracker.
		 */
		private Duration timeoutTickDuration = Duration.ofSeconds(1);

		public Boolean getBatchIndexEnabled() {
			return this.batchIndexEnabled;
		}

		public void setBatchIndexEnabled(Boolean batchIndexEnabled) {
			this.batchIndexEnabled = batchIndexEnabled;
		}

		public Duration getGroupTime() {
			return this.groupTime;
		}

		public void setGroupTime(Duration groupTime) {
			this.groupTime = groupTime;
		}

		public Boolean getReceiptEnabled() {
			return this.receiptEnabled;
		}

		public void setReceiptEnabled(Boolean receiptEnabled) {
			this.receiptEnabled = receiptEnabled;
		}

		public Duration getRedeliveryDelay() {
			return this.redeliveryDelay;
		}

		public void setRedeliveryDelay(Duration redeliveryDelay) {
			this.redeliveryDelay = redeliveryDelay;
		}

		public Duration getTimeout() {
			return this.timeout;
		}

		public void setTimeout(Duration timeout) {
			this.timeout = timeout;
		}

		public Duration getTimeoutTickDuration() {
			return this.timeoutTickDuration;
		}

		public void setTimeoutTickDuration(Duration timeoutTickDuration) {
			this.timeoutTickDuration = timeoutTickDuration;
		}

	}

	public static class Chunking {

		/**
		 * Whether to automatically drop outstanding uncompleted chunked messages once the
		 * consumer queue reaches the threshold set by the 'maxPendingMessages' property.
		 */
		private Boolean autoAckOldestOnQueueFull = true;

		/**
		 * The maximum time period for a consumer to receive all chunks of a message - if
		 * this threshold is exceeded the consumer will expire the incomplete chunks.
		 */
		private Duration expireTimeIncomplete = Duration.ofMinutes(1);

		/**
		 * Maximum number of chunked messages to be kept in memory.
		 */
		private Integer maxPendingMessages = 10;

		public Boolean getAutoAckOldestOnQueueFull() {
			return this.autoAckOldestOnQueueFull;
		}

		public void setAutoAckOldestOnQueueFull(Boolean autoAckOldestOnQueueFull) {
			this.autoAckOldestOnQueueFull = autoAckOldestOnQueueFull;
		}

		public Duration getExpireTimeIncomplete() {
			return this.expireTimeIncomplete;
		}

		public void setExpireTimeIncomplete(Duration expireTimeIncomplete) {
			this.expireTimeIncomplete = expireTimeIncomplete;
		}

		public Integer getMaxPendingMessages() {
			return this.maxPendingMessages;
		}

		public void setMaxPendingMessages(Integer maxPendingMessages) {
			this.maxPendingMessages = maxPendingMessages;
		}

	}

	public static class Subscription extends PulsarProperties.Consumer.Subscription {

		/**
		 * Map of properties to add to the subscription.
		 */
		private Map<String, String> properties = new HashMap<>();

		/**
		 * Whether to replicate subscription state.
		 */
		private Boolean replicateState = false;

		public Map<String, String> getProperties() {
			return this.properties;
		}

		public void setProperties(Map<String, String> properties) {
			this.properties = properties;
		}

		public Boolean getReplicateState() {
			return this.replicateState;
		}

		public void setReplicateState(Boolean replicateState) {
			this.replicateState = replicateState;
		}

	}

	static class Properties extends HashMap<String, Object> {

		<V> java.util.function.Consumer<V> in(String key) {
			return (value) -> put(key, value);
		}

	}

}