RabbitExchangeQueueProvisionerTests.java

/*
 * Copyright 2022-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.rabbit.provisioning;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.AlternateExchange;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.AlternateExchange.Binding;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.ApplicationContext;
import org.springframework.expression.common.LiteralExpression;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;

/**
 * @author Gary Russell
 * @since 3.2.3
 *
 */
class RabbitExchangeQueueProvisionerTests {

	@Test
	void consumerDeclarationsWithDlq() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitConsumerProperties props = new RabbitConsumerProperties();
		props.setAutoBindDlq(true);
		ExtendedConsumerProperties<RabbitConsumerProperties> properties =
				new ExtendedConsumerProperties<RabbitConsumerProperties>(props);
		ConsumerDestination dest = provisioner.provisionConsumerDestination("foo", "group", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		assertThat(declarables).contains("foo.group.exchange", "foo.group", "foo.group.binding", "foo.group.dlq",
				"DLX.group.exchange", "foo.group.dlq.binding", "foo.group.dlq.2.binding");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void consumerDeclarationsWithDlqQueueNameIsGroup() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitConsumerProperties props = new RabbitConsumerProperties();
		props.setAutoBindDlq(true);
		props.setQueueNameGroupOnly(true);
		ExtendedConsumerProperties<RabbitConsumerProperties> properties =
				new ExtendedConsumerProperties<RabbitConsumerProperties>(props);
		ConsumerDestination dest = provisioner.provisionConsumerDestination("fiz", "group", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		assertThat(declarables).contains("fiz.group.exchange", "group", "group.binding", "group.dlq",
				"DLX.group.exchange", "group.dlq.binding", "group.dlq.2.binding");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsNoGroups() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		ProducerDestination dest = provisioner.provisionProducerDestination("bar", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		assertThat(declarables).contains("bar." + qual + ".exchange");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsWithAlternateNoQueue() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		AlternateExchange alternate = new AlternateExchange();
		alternate.setName("altEx");
		props.setAlternateExchange(alternate);
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		ProducerDestination dest = provisioner.provisionProducerDestination("withAlt", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Map<String, Declarables> declarables = ctx.getBeansOfType(Declarables.class);
		assertThat(declarables).hasSize(2);
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		Declarables mainEx = declarables.get("withAlt." + qual + ".exchange");
		assertThat(mainEx).isNotNull();
		Exchange exch = (Exchange) mainEx.getDeclarables().iterator().next();
		assertThat(exch.getArguments().get("alternate-exchange")).isEqualTo("altEx");
		Declarables altEx = declarables.get("altEx." + qual + ".exchange");
		assertThat(altEx).isNotNull();
		exch = (Exchange) altEx.getDeclarables().iterator().next();
		assertThat(exch).isInstanceOf(TopicExchange.class);
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsWithAlternateWithQueue() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		AlternateExchange alternate = new AlternateExchange();
		alternate.setName("altEx");
		alternate.setType(ExchangeTypes.DIRECT);
		Binding binding = new Binding();
		binding.setQueue("altQ");
		binding.setRoutingKey("altRK");
		alternate.setBinding(binding);
		props.setAlternateExchange(alternate);
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		ProducerDestination dest = provisioner.provisionProducerDestination("withAlt", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Map<String, Declarables> declarables = ctx.getBeansOfType(Declarables.class);
		assertThat(declarables).hasSize(4);
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		Declarables mainEx = declarables.get("withAlt." + qual + ".exchange");
		assertThat(mainEx).isNotNull();
		Exchange exch = (Exchange) mainEx.getDeclarables().iterator().next();
		assertThat(exch).isInstanceOf(TopicExchange.class);
		assertThat(exch.getArguments().get("alternate-exchange")).isEqualTo("altEx");
		Declarables altEx = declarables.get("altEx." + qual + ".exchange");
		assertThat(altEx).isNotNull();
		exch = (Exchange) altEx.getDeclarables().iterator().next();
		assertThat(exch).isInstanceOf(DirectExchange.class);
		Declarables queueDec = declarables.get("altEx.altQ." + qual);
		assertThat(queueDec).isNotNull();
		Declarables bindingDec = declarables.get("altEx.altQ." + qual + ".binding");
		assertThat(bindingDec).isNotNull();
		org.springframework.amqp.core.Binding bdg = (org.springframework.amqp.core.Binding) bindingDec.getDeclarables()
				.iterator().next();
		assertThat(bdg.getExchange()).isEqualTo("altEx");
		assertThat(bdg.getDestination()).isEqualTo("altQ");
		assertThat(bdg.getRoutingKey()).isEqualTo("altRK");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsWithGroupsAndDlq() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		props.setAutoBindDlq(true);
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		properties.setRequiredGroups("group1", "group2");
		ProducerDestination dest = provisioner.provisionProducerDestination("baz", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		assertThat(declarables).contains("baz." + qual + ".exchange", "baz.group1", "baz.group1.binding",
				"baz.group1.dlq", "DLX.group1.exchange", "baz.group1.dlq.binding", "baz.group2", "baz.group2.binding",
				"baz.group2.dlq", "DLX.group2.exchange", "baz.group2.dlq.binding");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsWithGroupsAndDlqAndPartitions() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		props.setAutoBindDlq(true);
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		properties.setRequiredGroups("group1", "group2");
		properties.setPartitionKeyExpression(new LiteralExpression("foo"));
		properties.setPartitionCount(2);
		ProducerDestination dest = provisioner.provisionProducerDestination("qux", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		assertThat(declarables).contains("qux." + qual + ".exchange", "qux.group1-0", "qux.group1-0.binding",
				"qux.group1-1", "qux.group1-1.binding", "qux.group1.dlq", "DLX.group1.exchange",
				"qux.group1.dlq.binding", "qux.group2-0",
				"qux.group2-0.binding", "qux.group2-1", "qux.group2-1.binding", "qux.group2.dlq", "DLX.group2.exchange",
				"qux.group2.dlq.binding");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

	@Test
	void producerDeclarationsWithGroupsAndDlqAndPartitionsQueueNameIsGroup() throws IOException {
		ConnectionFactory cf = mock(ConnectionFactory.class);
		Connection conn = mock(Connection.class);
		given(cf.createConnection()).willReturn(conn);
		Channel channel = mock(Channel.class);
		willReturn(new DeclareOk("x", 0, 0))
				.given(channel).queueDeclare(any(), eq(Boolean.TRUE), eq(Boolean.FALSE), eq(Boolean.FALSE), any());
		given(conn.createChannel(anyBoolean())).willReturn(channel);
		RabbitExchangeQueueProvisioner provisioner = new RabbitExchangeQueueProvisioner(cf);

		RabbitProducerProperties props = new RabbitProducerProperties();
		props.setAutoBindDlq(true);
		props.setQueueNameGroupOnly(true);
		ExtendedProducerProperties<RabbitProducerProperties> properties =
				new ExtendedProducerProperties<RabbitProducerProperties>(props);
		properties.setRequiredGroups("group1", "group2");
		properties.setPartitionKeyExpression(new LiteralExpression("foo"));
		properties.setPartitionCount(2);
		ProducerDestination dest = provisioner.provisionProducerDestination("qux", properties);
		ApplicationContext ctx =
				TestUtils.getPropertyValue(provisioner, "autoDeclareContext", ApplicationContext.class);
		Set<String> declarables = ctx.getBeansOfType(Declarables.class).keySet();
		String qual = TestUtils.getPropertyValue(dest, "beanNameQualifier", String.class);
		assertThat(declarables).contains("qux." + qual + ".exchange", "group1-0", "group1-0.binding", "group1-1",
				"group1-1.binding", "group1.dlq", "DLX.group1.exchange", "group1.dlq.binding", "group2-0",
				"group2-0.binding", "group2-1", "group2-1.binding", "group2.dlq", "DLX.group2.exchange",
				"group2.dlq.binding");
		provisioner.cleanAutoDeclareContext(dest, properties);
		assertThat(ctx.getBeansOfType(Declarables.class)).isEmpty();
	}

}