ProducerConfigProperties.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar.properties;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.pulsar.client.api.ProducerCryptoFailureAction;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.pulsar.autoconfigure.PulsarProperties;
import org.springframework.lang.Nullable;
import org.springframework.util.unit.DataSize;

/**
 * Configuration properties used to specify Pulsar producers.
 *
 * @author Chris Bono
 */
public class ProducerConfigProperties extends PulsarProperties.Producer {

	/**
	 * Whether the "send" and "sendAsync" methods should block if the outgoing message
	 * queue is full.
	 */
	private Boolean blockIfQueueFull = false;

	/**
	 * Maximum number of pending messages for the producer.
	 */
	private Integer maxPendingMessages = 1000;

	/**
	 * Maximum number of pending messages across all the partitions.
	 */
	private Integer maxPendingMessagesAcrossPartitions = 50000;

	/**
	 * Action the producer will take in case of encryption failure.
	 */
	private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;

	/**
	 * Names of the public encryption keys to use when encrypting data.
	 */
	private Set<String> encryptionKeys = new HashSet<>();

	/**
	 * Baseline for the sequence ids for messages published by the producer.
	 */
	@Nullable
	private Long initialSequenceId;

	/**
	 * Whether partitioned producer automatically discover new partitions at runtime.
	 */
	private Boolean autoUpdatePartitions = true;

	/**
	 * Interval of partitions discovery updates.
	 */
	private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);

	/**
	 * Whether the multiple schema mode is enabled.
	 */
	private Boolean multiSchema = true;

	/**
	 * Whether producers in Shared mode register and connect immediately to the owner
	 * broker of each partition or start lazily on demand.
	 */
	private Boolean lazyStartPartitionedProducers = false;

	private final Batching batch = new Batching();

	public Batching getBatch() {
		return this.batch;
	}

	/**
	 * Map of properties to add to the producer.
	 */
	private Map<String, String> properties = new HashMap<>();

	public Boolean getBlockIfQueueFull() {
		return this.blockIfQueueFull;
	}

	public void setBlockIfQueueFull(Boolean blockIfQueueFull) {
		this.blockIfQueueFull = blockIfQueueFull;
	}

	public Integer getMaxPendingMessages() {
		return this.maxPendingMessages;
	}

	public void setMaxPendingMessages(Integer maxPendingMessages) {
		this.maxPendingMessages = maxPendingMessages;
	}

	public Integer getMaxPendingMessagesAcrossPartitions() {
		return this.maxPendingMessagesAcrossPartitions;
	}

	public void setMaxPendingMessagesAcrossPartitions(Integer maxPendingMessagesAcrossPartitions) {
		this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
	}

	public ProducerCryptoFailureAction getCryptoFailureAction() {
		return this.cryptoFailureAction;
	}

	public void setCryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) {
		this.cryptoFailureAction = cryptoFailureAction;
	}

	public Set<String> getEncryptionKeys() {
		return this.encryptionKeys;
	}

	public void setEncryptionKeys(Set<String> encryptionKeys) {
		this.encryptionKeys = encryptionKeys;
	}

	@Nullable
	public Long getInitialSequenceId() {
		return this.initialSequenceId;
	}

	public void setInitialSequenceId(@Nullable Long initialSequenceId) {
		this.initialSequenceId = initialSequenceId;
	}

	public Boolean getAutoUpdatePartitions() {
		return this.autoUpdatePartitions;
	}

	public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
		this.autoUpdatePartitions = autoUpdatePartitions;
	}

	public Duration getAutoUpdatePartitionsInterval() {
		return this.autoUpdatePartitionsInterval;
	}

	public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
		this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
	}

	public Boolean getMultiSchema() {
		return this.multiSchema;
	}

	public void setMultiSchema(Boolean multiSchema) {
		this.multiSchema = multiSchema;
	}

	public Boolean getLazyStartPartitionedProducers() {
		return this.lazyStartPartitionedProducers;
	}

	public void setLazyStartPartitionedProducers(Boolean lazyStartPartitionedProducers) {
		this.lazyStartPartitionedProducers = lazyStartPartitionedProducers;
	}

	public Map<String, String> getProperties() {
		return this.properties;
	}

	public void setProperties(Map<String, String> properties) {
		this.properties = properties;
	}

	/**
	 * Gets a map representation of the base producer properties (those defined in parent
	 * class).
	 * @return map of base producer properties and associated values.
	 */
	public Map<String, Object> toBaseProducerPropertiesMap() {
		var producerProps = new ProducerConfigProperties.Properties();
		var map = PropertyMapper.get();
		map.from(this::getAccessMode).to(producerProps.in("accessMode"));
		map.from(this::isBatchingEnabled).to(producerProps.in("batchingEnabled"));
		map.from(this::isChunkingEnabled).to(producerProps.in("chunkingEnabled"));
		map.from(this::getCompressionType).to(producerProps.in("compressionType"));
		map.from(this::getHashingScheme).to(producerProps.in("hashingScheme"));
		map.from(this::getMessageRoutingMode).to(producerProps.in("messageRoutingMode"));
		map.from(this::getName).to(producerProps.in("producerName"));
		map.from(this::getSendTimeout).asInt(Duration::toMillis).to(producerProps.in("sendTimeoutMs"));
		map.from(this::getTopicName).to(producerProps.in("topicName"));
		return producerProps;
	}

	/**
	 * Gets a map representation of the extended producer properties (those defined in
	 * this class).
	 * @return map of extended producer properties and associated values.
	 */
	public Map<String, Object> toExtendedProducerPropertiesMap() {
		var producerProps = new ProducerConfigProperties.Properties();
		var map = PropertyMapper.get();
		map.from(this::getAutoUpdatePartitions).to(producerProps.in("autoUpdatePartitions"));
		map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
				.to(producerProps.in("autoUpdatePartitionsIntervalSeconds"));
		map.from(this::getBlockIfQueueFull).to(producerProps.in("blockIfQueueFull"));
		map.from(this::getCryptoFailureAction).to(producerProps.in("cryptoFailureAction"));
		map.from(this::getEncryptionKeys).to(producerProps.in("encryptionKeys"));
		map.from(this::getInitialSequenceId).to(producerProps.in("initialSequenceId"));
		map.from(this::getLazyStartPartitionedProducers).to(producerProps.in("lazyStartPartitionedProducers"));
		map.from(this::getMaxPendingMessages).to(producerProps.in("maxPendingMessages"));
		map.from(this::getMaxPendingMessagesAcrossPartitions)
				.to(producerProps.in("maxPendingMessagesAcrossPartitions"));
		map.from(this::getMultiSchema).to(producerProps.in("multiSchema"));
		map.from(this::getProperties).to(producerProps.in("properties"));
		this.mapBatchProperties(this.getBatch(), producerProps, map);
		return producerProps;
	}

	private void mapBatchProperties(Batching batch, Properties producerProps, PropertyMapper map) {
		if (this.isBatchingEnabled()) {
			map.from(batch::getMaxPublishDelay).as(it -> it.toNanos() / 1000)
					.to(producerProps.in("batchingMaxPublishDelayMicros"));
			map.from(batch::getPartitionSwitchFrequencyByPublishDelay)
					.to(producerProps.in("batchingPartitionSwitchFrequencyByPublishDelay"));
			map.from(batch::getMaxMessages).to(producerProps.in("batchingMaxMessages"));
			map.from(batch::getMaxBytes).asInt(DataSize::toBytes)
					.to(producerProps.in("batchingMaxBytes"));
		}
	}

	/**
	 * Gets a map representation of base and extended producer properties.
	 * @return map of base and extended producer properties and associated values.
	 */
	public Map<String, Object> toAllProducerPropertiesMap() {
		var producerProps = this.toBaseProducerPropertiesMap();
		producerProps.putAll(this.toExtendedProducerPropertiesMap());
		return producerProps;
	}

	public static class Batching {

		/**
		 * Time period within which the messages sent will be batched.
		 */
		private Duration maxPublishDelay = Duration.ofMillis(1);

		/**
		 * Partition switch frequency while batching of messages is enabled and using
		 * round-robin routing mode for non-keyed message.
		 */
		private Integer partitionSwitchFrequencyByPublishDelay = 10;

		/**
		 * Maximum number of messages to be batched.
		 */
		private Integer maxMessages = 1000;

		/**
		 * Maximum number of bytes permitted in a batch.
		 */
		private DataSize maxBytes = DataSize.ofKilobytes(128);

		/**
		 * Whether to automatically batch messages.
		 */
		private Boolean enabled = true;

		public Duration getMaxPublishDelay() {
			return this.maxPublishDelay;
		}

		public void setMaxPublishDelay(Duration maxPublishDelay) {
			this.maxPublishDelay = maxPublishDelay;
		}

		public Integer getPartitionSwitchFrequencyByPublishDelay() {
			return this.partitionSwitchFrequencyByPublishDelay;
		}

		public void setPartitionSwitchFrequencyByPublishDelay(Integer partitionSwitchFrequencyByPublishDelay) {
			this.partitionSwitchFrequencyByPublishDelay = partitionSwitchFrequencyByPublishDelay;
		}

		public Integer getMaxMessages() {
			return this.maxMessages;
		}

		public void setMaxMessages(Integer maxMessages) {
			this.maxMessages = maxMessages;
		}

		public DataSize getMaxBytes() {
			return this.maxBytes;
		}

		public void setMaxBytes(DataSize maxBytes) {
			this.maxBytes = maxBytes;
		}

		public Boolean getEnabled() {
			return this.enabled;
		}

		public void setEnabled(Boolean enabled) {
			this.enabled = enabled;
		}

	}

	static class Properties extends HashMap<String, Object> {

		<V> java.util.function.Consumer<V> in(String key) {
			return (value) -> put(key, value);
		}

	}

}