PulsarBinderUtils.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.ProducerBuilder;

import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.core.log.LogAccessor;
import org.springframework.util.StringUtils;

/**
 * Binder utility methods.
 *
 * @author Soby Chacko
 * @author Chris Bono
 */
final class PulsarBinderUtils {

	private static final LogAccessor LOGGER = new LogAccessor(PulsarBinderUtils.class);

	private static final String SUBSCRIPTION_NAME_FORMAT_STR = "%s-anon-subscription-%s";

	private PulsarBinderUtils() {
	}

	/**
	 * Gets the subscription name to use for the binder.
	 * @param consumerProps the pulsar consumer props
	 * @param consumerDestination the destination being subscribed to
	 * @return the subscription name from the consumer properties or a generated name in
	 * the format {@link #SUBSCRIPTION_NAME_FORMAT_STR} when the name is not set on the
	 * consumer properties
	 */
	static String subscriptionName(PulsarConsumerProperties consumerProps, ConsumerDestination consumerDestination) {
		if (StringUtils.hasText(consumerProps.getSubscription().getName())) {
			return consumerProps.getSubscription().getName();
		}
		return SUBSCRIPTION_NAME_FORMAT_STR.formatted(consumerDestination.getName(), UUID.randomUUID());
	}

	/**
	 * Merges base and extended producer properties defined at the binder and binding
	 * level.
	 * <p>Only base properties whose value has changed from the default are included in
	 * result map. All extended properties are included, regardless of their value,
	 * which ensures that the extended property defaults are respected.
	 * <p>If a property is defined at both the binder and binding level, the binding level
	 * property value takes precedence.
	 * @param binderProducerProps the binder level producer config properties (eg.
	 * 'spring.cloud.stream.pulsar.binder.producer.*')
	 * @param bindingProducerProps the binding level config properties (eg.
	 * 'spring.cloud.stream.pulsar.bindings.myBinding-out-0.producer.*')
	 * @return map of merged binder and binding producer properties
	 */
	static Map<String, Object> mergeModifiedProducerProperties(ProducerConfigProperties binderProducerProps,
			ProducerConfigProperties bindingProducerProps) {
		// Layer the base props for global -> binder -> bindings
		var globalProducerProps = new ProducerConfigProperties().toBaseProducerPropertiesMap();
		var binderBaseProducerProps = binderProducerProps.toBaseProducerPropertiesMap();
		var bindingBaseProducerProps = bindingProducerProps.toBaseProducerPropertiesMap();
		var layeredBaseProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(globalProducerProps,
				binderBaseProducerProps, bindingBaseProducerProps, false);
		// Layer the extended props for global -> binder -> bindings
		var globalExtProducerProps = new ProducerConfigProperties().toExtendedProducerPropertiesMap();
		var binderExtProducerProps = binderProducerProps.toExtendedProducerPropertiesMap();
		var bindingExtProducerProps = bindingProducerProps.toExtendedProducerPropertiesMap();
		var layeredExtProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(globalExtProducerProps,
				binderExtProducerProps, bindingExtProducerProps, true);
		// Combine both base and extended layers
		var layeredProducerProps = new HashMap<>(layeredBaseProducerProps);
		layeredProducerProps.putAll(layeredExtProducerProps);
		return layeredProducerProps;
	}

	/**
	 * Merges base and extended consumer properties defined at the binder and binding
	 * level.
	 * <p>Only base properties whose value has changed from the default are included in
	 * result map. All extended properties are included, regardless of their value,
	 * which ensures that the extended property defaults are respected.
	 * <p>If a property is defined at both the binder and binding level, the binding level
	 * property value takes precedence.
	 * @param binderConsumerProps the binder level consumer config properties (eg.
	 * 'spring.cloud.stream.pulsar.binder.consumer.*')
	 * @param bindingConsumerProps the binding level config properties (eg.
	 * 'spring.cloud.stream.pulsar.bindings.myBinding-in-0.consumer.*')
	 * @return map of merged binder and binding consumer properties
	 */
	static Map<String, Object> mergeModifiedConsumerProperties(ConsumerConfigProperties binderConsumerProps,
			ConsumerConfigProperties bindingConsumerProps) {
		// Layer the base props for global -> binder -> bindings
		var globalBaseConsumerProps = new ConsumerConfigProperties().toBaseConsumerPropertiesMap();
		var binderBaseConsumerProps = binderConsumerProps.toBaseConsumerPropertiesMap();
		var bindingBaseConsumerProps = bindingConsumerProps.toBaseConsumerPropertiesMap();
		var layeredBaseConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(globalBaseConsumerProps,
				binderBaseConsumerProps, bindingBaseConsumerProps, false);
		// Layer the extended props for global -> binder -> bindings
		var globalExtConsumerProps = new ConsumerConfigProperties().toExtendedConsumerPropertiesMap();
		var binderExtConsumerProps = binderConsumerProps.toExtendedConsumerPropertiesMap();
		var bindingExtConsumerProps = bindingConsumerProps.toExtendedConsumerPropertiesMap();
		var layeredExtConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(globalExtConsumerProps,
				binderExtConsumerProps, bindingExtConsumerProps, true);
		// Combine both base and extended layers
		var layeredConsumerProps = new HashMap<>(layeredBaseConsumerProps);
		layeredConsumerProps.putAll(layeredExtConsumerProps);
		return layeredConsumerProps;
	}

	/**
	 * Merges properties defined at the binder and binding level with binding properties
	 * overriding binder properties.
	 * <p>
	 * @param globalProps the map of global level properties (eg. 'spring.pulsar.consumer.*')
	 * @param binderProps the map of binder level properties (eg.
	 * 'spring.cloud.stream.pulsar.binder.consumer.*')
	 * @param bindingProps the map of binding level properties (eg.
	 * 'spring.cloud.stream.pulsar.bindings.myBinding-in-0.consumer.*')
	 * @param includeDefaults whether to also include unmodified properties with their default values
	 * @return map of merged binder and binding properties with binding properties overriding binder properties
	 */
	static Map<String, Object> mergePropertiesWithPrecedence(Map<String, Object> globalProps,
			Map<String, Object> binderProps, Map<String, Object> bindingProps, boolean includeDefaults) {
		Objects.requireNonNull(globalProps, "globalProps must be specified");
		Objects.requireNonNull(binderProps, "binderProps must be specified");
		Objects.requireNonNull(bindingProps, "bindingProps must be specified");

		Map<String, Object> newOrModifiedBinderProps = extractNewOrModifiedProperties(binderProps, globalProps);
		LOGGER.trace(() -> "New or modified binder props: %s".formatted(newOrModifiedBinderProps));

		Map<String, Object> newOrModifiedBindingProps = extractNewOrModifiedProperties(bindingProps, globalProps);
		LOGGER.trace(() -> "New or modified binding props: %s".formatted(newOrModifiedBindingProps));

		Map<String, Object> mergedProps = new HashMap<>(newOrModifiedBinderProps);
		mergedProps.putAll(newOrModifiedBindingProps);

		// Add in default properties for any props not customized by the user
		if (includeDefaults) {
			globalProps.forEach(mergedProps::putIfAbsent);
		}
		LOGGER.trace(() -> "Final merged props: %s".formatted(mergedProps));
		return mergedProps;
	}

	private static Map<String, Object> extractNewOrModifiedProperties(Map<String, Object> candidateProps,
			Map<String, Object> baseProps) {
		Map<String, Object> newOrModifiedProps = new HashMap<>();
		candidateProps.forEach((propName, propValue) -> {
			if (!baseProps.containsKey(propName) || (!Objects.equals(propValue, baseProps.get(propName)))) {
				newOrModifiedProps.put(propName, propValue);
			}
		});
		return newOrModifiedProps;
	}

	/**
	 * Configures the specified properties onto the specified builder in a manner that
	 * loads non-serializable properties. See
	 * <a href="https://github.com/apache/pulsar/pull/18344">Pulsar PR</a>.
	 * @param builder the builder
	 * @param properties the properties to set on the builder
	 * @param <T> the payload type
	 */
	static <T> void loadConf(ProducerBuilder<T> builder, Map<String, Object> properties) {
		builder.loadConf(properties);
		// Set fields that are not loaded by loadConf
		if (properties.containsKey("encryptionKeys")) {
			@SuppressWarnings("unchecked")
			Collection<String> keys = (Collection<String>) properties.get("encryptionKeys");
			keys.forEach(builder::addEncryptionKey);
		}
		if (properties.containsKey("customMessageRouter")) {
			builder.messageRouter((MessageRouter) properties.get("customMessageRouter"));
		}
		if (properties.containsKey("batcherBuilder")) {
			builder.batcherBuilder((BatcherBuilder) properties.get("batcherBuilder"));
		}
		if (properties.containsKey("cryptoKeyReader")) {
			builder.cryptoKeyReader((CryptoKeyReader) properties.get("cryptoKeyReader"));
		}
	}
}