PulsarBinderUtilsTests.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.pulsar.client.api.ProducerAccessMode;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.ProducerConfigProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Unit tests for {@link PulsarBinderUtils}.
 *
 * @author Soby Chacko
 * @author Chris Bono
 */
class PulsarBinderUtilsTests {

	@Nested
	class SubscriptionNameTests {

		@Test
		void respectsValueWhenSetAsProperty() {
			var consumerDestination = mock(ConsumerDestination.class);
			var pulsarConsumerProperties = mock(PulsarConsumerProperties.class, Mockito.RETURNS_DEEP_STUBS);
			when(pulsarConsumerProperties.getSubscription().getName()).thenReturn("my-sub");
			assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination))
					.isEqualTo("my-sub");
		}

		@Test
		void generatesValueWhenNotSetAsProperty() {
			var consumerDestination = mock(ConsumerDestination.class);
			var pulsarConsumerProperties = mock(PulsarConsumerProperties.class, Mockito.RETURNS_DEEP_STUBS);
			when(pulsarConsumerProperties.getSubscription().getName()).thenReturn(null);
			when(consumerDestination.getName()).thenReturn("my-topic");
			assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination))
					.startsWith("my-topic-anon-subscription-");
		}

	}

	@Nested
	class MergedPropertiesTests {

		@ParameterizedTest(name = "{0}")
		@MethodSource("mergePropertiesTestProvider")
		void mergePropertiesTest(String testName, boolean includeDefaults, Map<String, Object> globalProps, Map<String, Object> binderProps,
				Map<String, Object> bindingProps, Map<String, Object> expectedMergedProps) {
			assertThat(PulsarBinderUtils.mergePropertiesWithPrecedence(globalProps, binderProps, bindingProps, includeDefaults))
					.containsExactlyInAnyOrderEntriesOf(expectedMergedProps);
		}

		// @formatter:off
		static Stream<Arguments> mergePropertiesTestProvider() {
			return Stream.of(
					arguments("noProps",
							true,
							Collections.emptyMap(),
							Collections.emptyMap(),
							Collections.emptyMap(),
							Collections.emptyMap()),
					arguments("allSamePropsWithoutDefaults",
							false,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Collections.emptyMap()),
					arguments("allSamePropsWithDefaults",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base")),
					arguments("onlyBasePropsWithoutDefaults",
							false,
							Map.of("foo", "foo-base"),
							Collections.emptyMap(),
							Collections.emptyMap(),
							Collections.emptyMap()),
					arguments("onlyBasePropsWithDefaults",
							true,
							Map.of("foo", "foo-base"),
							Collections.emptyMap(),
							Collections.emptyMap(),
							Map.of("foo", "foo-base")),
					arguments("onlyBinderProps",
							true,
							Collections.emptyMap(),
							Map.of("foo", "foo-binder"),
							Collections.emptyMap(),
							Map.of("foo", "foo-binder")),
					arguments("onlyBindingProps",
							true,
							Collections.emptyMap(),
							Collections.emptyMap(),
							Map.of("foo", "foo-binding"),
							Map.of("foo", "foo-binding")),
					arguments("binderOverridesBaseValue",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-binder"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-binder")),
					arguments("binderContainsNewPropWithoutDefaults",
							false,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base", "bar", "bar-binder"),
							Map.of("foo", "foo-base"),
							Map.of("bar", "bar-binder")),
					arguments("binderContainsNewPropWithDefaults",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base", "bar", "bar-binder"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base", "bar", "bar-binder")),
					arguments("bindingOverridesBaseValue",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-binding"),
							Map.of("foo", "foo-binding")),
					arguments("bindingContainsNewPropWithoutDefaults",
							false,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base", "bar", "bar-binding"),
							Map.of("bar", "bar-binding")),
					arguments("bindingContainsNewPropWithDefaults",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-base", "bar", "bar-binding"),
							Map.of("foo", "foo-base", "bar", "bar-binding")),
					arguments("binderOverridesBaseAndBindingOverridesBinder",
							true,
							Map.of("foo", "foo-base"),
							Map.of("foo", "foo-binder"),
							Map.of("foo", "foo-binding"),
							Map.of("foo", "foo-binding")));
		}
		// @formatter:on

	}

	@Nested
	class MergeProducerPropertiesTests {

		private static final Consumer<ProducerConfigProperties> SET_NO_PROPS = (__) -> {
		};

		@Test
		void noPropsModified() {
			var expectedProps = defaultExtProps();
			doMergeProducerPropertiesTest(SET_NO_PROPS, SET_NO_PROPS, expectedProps);
		}

		// @formatter:off
		@Test
		void basePropModifiedAtBinderLevel() {
			var expectedProps = defaultExtPropsWith("accessMode", ProducerAccessMode.Exclusive);
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Exclusive),
					SET_NO_PROPS,
					expectedProps);
		}

		@Test
		void basePropModifiedAtBindingLevel() {
			var expectedProps = defaultExtPropsWith("accessMode", ProducerAccessMode.Exclusive);
			doMergeProducerPropertiesTest(
					SET_NO_PROPS,
					(bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Exclusive),
					expectedProps);
		}

		@Test
		void basePropModifiedAtBinderAndBindingLevel() {
			var expectedProps = defaultExtPropsWith("accessMode", ProducerAccessMode.Exclusive);
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setAccessMode(ProducerAccessMode.ExclusiveWithFencing),
					(bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Exclusive),
					expectedProps);
		}

		@Test
		void basePropModifiedAtBinderAndBindingLevelWithDefaultValue() {
			var expectedProps = defaultExtProps();
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Shared),
					(bindingProps) -> bindingProps.setAccessMode(ProducerAccessMode.Shared),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setMaxPendingMessages(1200));
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setMaxPendingMessages(1200),
					SET_NO_PROPS,
					expectedProps);
		}

		@Test
		void extPropModifiedAtBindingLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setMaxPendingMessages(1200));
			doMergeProducerPropertiesTest(
					SET_NO_PROPS,
					(binderProps) -> binderProps.setMaxPendingMessages(1200),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderAndBindingLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setMaxPendingMessages(1200));
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setMaxPendingMessages(1100),
					(bindingProps) -> bindingProps.setMaxPendingMessages(1200),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderAndBindingLevelWithDefaultValue() {
			var expectedProps = defaultExtProps();
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setMaxPendingMessages(1000),
					(bindingProps) -> bindingProps.setMaxPendingMessages(1000),
					expectedProps);
		}

		@Test
		void baseAndExtPropsAreCombined() {
			var expectedProps = defaultExtPropsWith((p) -> p.setMaxPendingMessages(1200));
			expectedProps.put("accessMode", ProducerAccessMode.Exclusive);
			doMergeProducerPropertiesTest(
					(binderProps) -> binderProps.setAccessMode(ProducerAccessMode.Exclusive),
					(bindingProps) -> bindingProps.setMaxPendingMessages(1200),
					expectedProps);
		}
		// @formatter:on

		private Map<String, Object> defaultExtProps() {
			return new ProducerConfigProperties().toExtendedProducerPropertiesMap();
		}

		private Map<String, Object> defaultExtPropsWith(String key, Object value) {
			var defaultExtProps = defaultExtProps();
			defaultExtProps.put(key, value);
			return defaultExtProps;
		}

		private Map<String, Object> defaultExtPropsWith(Consumer<ProducerConfigProperties> extPropsCustomizer) {
			var extProps = new ProducerConfigProperties();
			extPropsCustomizer.accept(extProps);
			return extProps.toExtendedProducerPropertiesMap();
		}

		private void doMergeProducerPropertiesTest(Consumer<ProducerConfigProperties> binderPropsCustomizer,
				Consumer<ProducerConfigProperties> bindingPropsCustomizer,
				Map<String, Object> expectedMergedProperties) {
			var binderProducerProps = new ProducerConfigProperties();
			binderPropsCustomizer.accept(binderProducerProps);
			var bindingProducerProps = new ProducerConfigProperties();
			bindingPropsCustomizer.accept(bindingProducerProps);
			var mergedProps = PulsarBinderUtils.mergeModifiedProducerProperties(binderProducerProps, bindingProducerProps);
			assertThat(mergedProps).containsExactlyInAnyOrderEntriesOf(expectedMergedProperties);
		}

	}

	@Nested
	class MergeConsumerPropertiesTests {

		private static final Consumer<ConsumerConfigProperties> SET_NO_PROPS = (__) -> {
		};

		@Test
		void noPropsModified() {
			var expectedProps = defaultExtProps();
			doMergeConsumerPropertiesTest(SET_NO_PROPS, SET_NO_PROPS, expectedProps);
		}

		// @formatter:off
		@Test
		void basePropModifiedAtBinderLevel() {
			var expectedProps = defaultExtPropsWith("priorityLevel", 1000);
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setPriorityLevel(1000),
					SET_NO_PROPS,
					expectedProps);
		}

		@Test
		void basePropModifiedAtBindingLevel() {
			var expectedProps = defaultExtPropsWith("priorityLevel", 1000);
			doMergeConsumerPropertiesTest(
					SET_NO_PROPS,
					(bindingProps) -> bindingProps.setPriorityLevel(1000),
					expectedProps);
		}

		@Test
		void basePropModifiedAtBinderAndBindingLevel() {
			var expectedProps = defaultExtPropsWith("priorityLevel", 1000);
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setPriorityLevel(2000),
					(bindingProps) -> bindingProps.setPriorityLevel(1000),
					expectedProps);
		}

		@Test
		void basePropModifiedAtBinderAndBindingLevelWithDefaultValue() {
			var expectedProps = defaultExtProps();
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setPriorityLevel(0),
					(bindingProps) -> bindingProps.setPriorityLevel(0),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setReceiverQueueSize(1200));
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setReceiverQueueSize(1200),
					SET_NO_PROPS,
					expectedProps);
		}

		@Test
		void extPropModifiedAtBindingLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setReceiverQueueSize(1200));
			doMergeConsumerPropertiesTest(
					SET_NO_PROPS,
					(bindingProps) -> bindingProps.setReceiverQueueSize(1200),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderAndBindingLevel() {
			var expectedProps = defaultExtPropsWith((p) -> p.setReceiverQueueSize(1200));
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setReceiverQueueSize(1100),
					(bindingProps) -> bindingProps.setReceiverQueueSize(1200),
					expectedProps);
		}

		@Test
		void extPropModifiedAtBinderAndBindingLevelWithDefaultValue() {
			var expectedProps = defaultExtProps();
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setReceiverQueueSize(1000),
					(bindingProps) -> bindingProps.setReceiverQueueSize(1000),
					expectedProps);
		}

		@Test
		void baseAndExtPropsAreCombined() {
			var expectedProps = defaultExtPropsWith((p) -> p.setReceiverQueueSize(1200));
			expectedProps.put("priorityLevel", 1000);
			doMergeConsumerPropertiesTest(
					(binderProps) -> binderProps.setPriorityLevel(1000),
					(bindingProps) -> bindingProps.setReceiverQueueSize(1200),
					expectedProps);
		}
		// @formatter:on

		private Map<String, Object> defaultExtProps() {
			return new ConsumerConfigProperties().toExtendedConsumerPropertiesMap();
		}

		private Map<String, Object> defaultExtPropsWith(String key, Object value) {
			var defaultExtProps = defaultExtProps();
			defaultExtProps.put(key, value);
			return defaultExtProps;
		}

		private Map<String, Object> defaultExtPropsWith(Consumer<ConsumerConfigProperties> extPropsCustomizer) {
			var extProps = new ConsumerConfigProperties();
			extPropsCustomizer.accept(extProps);
			return extProps.toExtendedConsumerPropertiesMap();
		}

		private void doMergeConsumerPropertiesTest(
				Consumer<ConsumerConfigProperties> binderPropsCustomizer,
				Consumer<ConsumerConfigProperties> bindingPropsCustomizer,
				Map<String, Object> expectedMergedProperties) {
			var binderConsumerProps = new ConsumerConfigProperties();
			binderPropsCustomizer.accept(binderConsumerProps);
			var bindingConsumerProps = new ConsumerConfigProperties();
			bindingPropsCustomizer.accept(bindingConsumerProps);
			var mergedProps = PulsarBinderUtils.mergeModifiedConsumerProperties(binderConsumerProps, bindingConsumerProps);
			assertThat(mergedProps).containsExactlyInAnyOrderEntriesOf(expectedMergedProperties);
		}

	}

}