ProducerConfigPropertiesTests.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar;

import java.util.HashMap;
import java.util.Map;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;

/**
 * Unit tests for {@link ProducerConfigProperties}.
 *
 * @author Soby Chacko
 * @author Chris Bono
 */
class ProducerConfigPropertiesTests {

	@Test
	void basePropsCanBeExtractedToMap() {
		var inputProps = basePropsInputMap();
		var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps);
		var outputProps = producerConfigProps.toBaseProducerPropertiesMap();
		verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps);
		verifyBasePropsInOutputMap(outputProps);
	}

	private Map<String, String> basePropsInputMap() {
		Map<String, String> inputProps = new HashMap<>();
		inputProps.put("spring.pulsar.producer.access-mode", "Exclusive");
		inputProps.put("spring.pulsar.producer.batching-enabled", "true");
		inputProps.put("spring.pulsar.producer.chunking-enabled", "true");
		inputProps.put("spring.pulsar.producer.compression-type", "lz4");
		inputProps.put("spring.pulsar.producer.hashing-scheme", "murmur3_32hash");
		inputProps.put("spring.pulsar.producer.message-routing-mode", "custompartition");
		inputProps.put("spring.pulsar.producer.name", "my-producer");
		inputProps.put("spring.pulsar.producer.send-timeout", "2s");
		inputProps.put("spring.pulsar.producer.topic-name", "my-topic");
		return inputProps;
	}

	private void verifyBasePropsInOutputMap(Map<String, Object> outputProps) {
		assertThat(outputProps).containsEntry("accessMode", ProducerAccessMode.Exclusive)
				.containsEntry("batchingEnabled", true).containsEntry("chunkingEnabled", true)
				.containsEntry("compressionType", CompressionType.LZ4)
				.containsEntry("hashingScheme", HashingScheme.Murmur3_32Hash)
				.containsEntry("messageRoutingMode", MessageRoutingMode.CustomPartition)
				.containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000)
				.containsEntry("topicName", "my-topic");
	}

	@Test
	void extendedPropsCanBeExtractedToMap() {
		var inputProps = basePropsInputMap();
		inputProps.putAll(extendedPropsInputMap());
		var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps);
		var outputProps = producerConfigProps.toExtendedProducerPropertiesMap();
		verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps);
		verifyExtendedPropsInOutputMap(outputProps);
	}

	@Test
	void allPropsCanBeExtractedToMap() {
		var inputProps = basePropsInputMap();
		inputProps.putAll(extendedPropsInputMap());
		var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps);
		var outputProps = producerConfigProps.toAllProducerPropertiesMap();
		verifyOutputPropsCanBeLoadedInProducerBuilder(outputProps);
		verifyBasePropsInOutputMap(outputProps);
		verifyExtendedPropsInOutputMap(outputProps);
	}

	@Test
	void batchPropsSkippedWhenBatchDisabled() {
		var inputProps = basePropsInputMap();
		inputProps.putAll(extendedPropsInputMap());
		inputProps.put("spring.pulsar.producer.batching-enabled", "false");
		var producerConfigProps = bindInputPropsToProducerConfigProps(inputProps);
		var outputProps = producerConfigProps.toAllProducerPropertiesMap();
		assertThat(outputProps).doesNotContainKey("batchingMaxPublishDelayMicros")
				.doesNotContainKey("batchingPartitionSwitchFrequencyByPublishDelay")
				.doesNotContainKey("batchingMaxMessages").doesNotContainKey("batchingMaxBytes");
	}

	private Map<String, String> extendedPropsInputMap() {
		Map<String, String> inputProps = new HashMap<>();
		inputProps.put("spring.pulsar.producer.auto-update-partitions", "true");
		inputProps.put("spring.pulsar.producer.auto-update-partitions-interval", "15s");
		inputProps.put("spring.pulsar.producer.block-if-queue-full", "true");
		inputProps.put("spring.pulsar.producer.crypto-failure-action", "send");
		inputProps.put("spring.pulsar.producer.encryption-keys[0]", "my-key");
		inputProps.put("spring.pulsar.producer.initial-sequence-id", "9");
		inputProps.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true");
		inputProps.put("spring.pulsar.producer.max-pending-messages", "3");
		inputProps.put("spring.pulsar.producer.max-pending-messages-across-partitions", "4");
		inputProps.put("spring.pulsar.producer.multi-schema", "true");
		inputProps.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value");
		inputProps.put("spring.pulsar.producer.batch.max-publish-delay", "5s");
		inputProps.put("spring.pulsar.producer.batch.partition-switch-frequency-by-publish-delay", "6");
		inputProps.put("spring.pulsar.producer.batch.max-messages", "7");
		inputProps.put("spring.pulsar.producer.batch.max-bytes", "8");
		return inputProps;
	}

	private void verifyExtendedPropsInOutputMap(Map<String, Object> outputProps) {
		assertThat(outputProps).containsEntry("autoUpdatePartitions", true)
				.containsEntry("autoUpdatePartitionsIntervalSeconds", 15L).containsEntry("blockIfQueueFull", true)
				.containsEntry("cryptoFailureAction", ProducerCryptoFailureAction.SEND)
				.hasEntrySatisfying("encryptionKeys",
						keys -> assertThat(keys).asInstanceOf(InstanceOfAssertFactories.collection(String.class))
								.containsExactly("my-key"))
				.containsEntry("initialSequenceId", 9L).containsEntry("lazyStartPartitionedProducers", true)
				.containsEntry("maxPendingMessages", 3).containsEntry("maxPendingMessagesAcrossPartitions", 4)
				.containsEntry("multiSchema", true)
				.hasEntrySatisfying("properties",
						properties -> assertThat(properties)
								.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
								.containsEntry("my-prop", "my-prop-value"))
				.containsEntry("batchingMaxPublishDelayMicros", 5_000_000L)
				.containsEntry("batchingPartitionSwitchFrequencyByPublishDelay", 6)
				.containsEntry("batchingMaxMessages", 7).containsEntry("batchingMaxBytes", 8);
	}

	private void verifyOutputPropsCanBeLoadedInProducerBuilder(Map<String, Object> outputProps) {
		assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(outputProps,
				new ProducerConfigurationData(), ProducerConfigurationData.class));
	}

	private ProducerConfigProperties bindInputPropsToProducerConfigProps(Map<String, String> inputProps) {
		return new Binder(new MapConfigurationPropertySource(inputProps))
				.bind("spring.pulsar.producer", Bindable.ofInstance(new ProducerConfigProperties())).get();
	}

}