PulsarTopicProvisionerTests.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarTopic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
 * @author Soby Chacko
 */
class PulsarTopicProvisionerTests {

	@Test
	void provisionThroughProducerBindingWithDefaultPartitioning() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		ExtendedProducerProperties<PulsarProducerProperties> properties = new ExtendedProducerProperties<>(
				new PulsarProducerProperties());
		ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
				properties);
		verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 0);
	}

	@Test
	void provisionThroughConsumerBindingWithDefaultPartitioning() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		ExtendedConsumerProperties<PulsarConsumerProperties> properties = new ExtendedConsumerProperties<>(
				new PulsarConsumerProperties());
		ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
				properties);
		verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 0);
	}

	@Test
	void provisioningOnProducerBindingWithPartitionsSetAtTheBinderProperties() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		pulsarBinderConfigurationProperties.setPartitionCount(4);
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		ExtendedProducerProperties<PulsarProducerProperties> properties = new ExtendedProducerProperties<>(
				new PulsarProducerProperties());
		ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
				properties);
		verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4);
	}

	@Test
	void provisioningOnProducerBindingWithPartitionsSetAtTheBindingProperties() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		ExtendedProducerProperties<PulsarProducerProperties> properties = new ExtendedProducerProperties<>(
				new PulsarProducerProperties());
		properties.getExtension().setPartitionCount(4);
		ProducerDestination producerDestination = pulsarTopicProvisioner.provisionProducerDestination("foo",
				properties);
		verifyAndAssert(pulsarAdministration, producerDestination.getName(), "persistent://public/default/foo", 4);
	}

	@Test
	void provisionThroughConsumerBindingWithPartitionsSetAtTheBinderProperties() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		pulsarBinderConfigurationProperties.setPartitionCount(4);
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		ExtendedConsumerProperties<PulsarConsumerProperties> properties = new ExtendedConsumerProperties<>(
				new PulsarConsumerProperties());
		ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
				properties);
		verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4);
	}

	@Test
	void provisionThroughConsumerBindingWithPartitionsSetAtTheBindingProperties() {
		PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties = new PulsarBinderConfigurationProperties();
		PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class);
		PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
				pulsarBinderConfigurationProperties);
		PulsarConsumerProperties pulsarConsumerProperties = new PulsarConsumerProperties();
		pulsarConsumerProperties.setPartitionCount(4);
		ExtendedConsumerProperties<PulsarConsumerProperties> properties = new ExtendedConsumerProperties<>(
				pulsarConsumerProperties);
		ConsumerDestination consumerDestination = pulsarTopicProvisioner.provisionConsumerDestination("bar", "",
				properties);
		verifyAndAssert(pulsarAdministration, consumerDestination.getName(), "persistent://public/default/bar", 4);
	}

	private static void verifyAndAssert(PulsarAdministration pulsarAdministration, String actualProducerDestination,
			String expectedProducerDestination, int expectedPartitionCount) {
		ArgumentCaptor<PulsarTopic> pulsarTopicArgumentCaptor = ArgumentCaptor.forClass(PulsarTopic.class);
		verify(pulsarAdministration, times(1)).createOrModifyTopics(pulsarTopicArgumentCaptor.capture());
		assertThat(actualProducerDestination).isEqualTo(expectedProducerDestination);
		PulsarTopic pulsarTopic = pulsarTopicArgumentCaptor.getValue();
		assertThat(pulsarTopic.topicName()).isEqualTo(expectedProducerDestination);
		assertThat(pulsarTopic.numberOfPartitions()).isEqualTo(expectedPartitionCount);
	}
}