RabbitBinderModuleTests.java

/*
 * Copyright 2015-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.rabbit.integration;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;
import org.testcontainers.containers.RabbitMQContainer;

import org.springframework.amqp.core.DeclarableCustomizer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.amqp.health.RabbitHealthIndicator;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.health.contributor.CompositeHealthContributor;
import org.springframework.boot.health.contributor.Status;
import org.springframework.cloud.Cloud;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
import org.springframework.cloud.stream.binder.rabbit.RestUtils;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.AlternateExchange;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.backoff.ExponentialBackOff;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

/**
 * @author Marius Bogoevici
 * @author Gary Russell
 * @author Artem Bilan
 * @author Soby Chacko
 * @author Chris Bono
 * @author Byungjun You
 */
class RabbitBinderModuleTests {

	private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();

	private static final ConnectionFactory MOCK_CONNECTION_FACTORY = mock(ConnectionFactory.class, Mockito.RETURNS_MOCKS);

	private static final WebClient client;

	private static final URI uri;

	static {
		client = WebClient.builder()
				.filter(ExchangeFilterFunctions
						.basicAuthentication(RABBITMQ.getAdminUsername(), RABBITMQ.getAdminPassword()))
				.build();
		try {
			uri = new URI(RABBITMQ.getHttpUrl() + "/api/");
		}
		catch (URISyntaxException ex) {
			throw new IllegalStateException(ex);
		}
	}

	@RegisterExtension
	private final RabbitTestSupport rabbitTestSupport = new RabbitTestSupport(true, RABBITMQ.getAmqpPort(), RABBITMQ.getHttpPort());

	private ConfigurableApplicationContext context;

	@AfterEach
	public void tearDown() {
		if (context != null) {
			context.close();
			context = null;
		}
		RabbitAdmin admin = new RabbitAdmin(rabbitTestSupport.getResource());
		admin.deleteExchange("process-in-0");
		admin.deleteExchange("process-out-0");
	}

	@Test
	void parentConnectionFactoryInheritedByDefault() throws Exception {
		context = new SpringApplicationBuilder(SimpleProcessor.class)
			.web(WebApplicationType.NONE).run("--server.port=0",
				"--spring.rabbitmq.port=" + RABBITMQ.getAmqpPort(),
				"--spring.cloud.function.definition=process",
				"--spring.cloud.stream.rabbit.binder.connection-name-prefix=foo",
				"--spring.cloud.stream.rabbit.bindings.process-in-0.consumer.single-active-consumer=true");
		BinderFactory binderFactory = context.getBean(BinderFactory.class);
		Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
		assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
		DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
		CachingConnectionFactory binderConnectionFactory = (CachingConnectionFactory) binderFieldAccessor
			.getPropertyValue("connectionFactory");
		assertThat(binderConnectionFactory).isInstanceOf(CachingConnectionFactory.class);
		ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
		assertThat(binderConnectionFactory).isSameAs(connectionFactory);

		CompositeHealthContributor bindersHealthIndicator = context
			.getBean("bindersHealthContributor", CompositeHealthContributor.class);

		assertThat(bindersHealthIndicator).isNotNull();

		RabbitHealthIndicator indicator = (RabbitHealthIndicator) bindersHealthIndicator.getContributor("rabbit");
		assertThat(indicator).isNotNull();
		assertThat(indicator.health().getStatus())
			.isEqualTo(Status.UP);

		ConnectionFactory publisherConnectionFactory = binderConnectionFactory
			.getPublisherConnectionFactory();
		assertThat(TestUtils.getPropertyValue(publisherConnectionFactory,
			"connection.target")).isNull();
		DirectChannel checkPf = new DirectChannel();
		Binding<MessageChannel> binding = ((RabbitMessageChannelBinder) binder)
			.bindProducer("checkPF", checkPf,
				new ExtendedProducerProperties<>(new RabbitProducerProperties()));
		checkPf.send(new GenericMessage<>("foo".getBytes()));
		binding.unbind();
		assertThat(TestUtils.getPropertyValue(publisherConnectionFactory,
			"connection.target")).isNotNull();

		CachingConnectionFactory cf = this.context
			.getBean(CachingConnectionFactory.class);
		ConnectionNameStrategy cns = TestUtils.getPropertyValue(cf,
			"connectionNameStrategy", ConnectionNameStrategy.class);
		assertThat(cns.obtainNewConnectionName(cf)).isEqualTo("foo#2");
		new RabbitAdmin(rabbitTestSupport.getResource()).deleteExchange("checkPF");
		checkCustomizedArgs();
		binderConnectionFactory.resetConnection();
		binderConnectionFactory.createConnection();
		checkCustomizedArgs();
	}

	@SuppressWarnings("unchecked")
	private void checkCustomizedArgs() throws MalformedURLException, URISyntaxException, InterruptedException {
		List<Map<String, Object>> bindings = RestUtils.getBindingsBySource(client, uri, "/", "process-in-0");
		int n = 0;
		while (n++ < 100 && bindings == null || bindings.size() < 1) {
			Thread.sleep(100);
			bindings = RestUtils.getBindingsBySource(client, uri, "/", "process-in-0");
		}
		assertThat(bindings).isNotNull();
		assertThat((Map<String, Object>) bindings.get(0).get("arguments")).contains(entry("added.by", "customizer"));
		Map<String, Object> exchange = RestUtils.getExchange(client, uri, "/", "process-in-0");
		assertThat((Map<String, Object>) exchange.get("arguments")).contains(entry("added.by", "customizer"));
		Map<String, Object> queue = RestUtils.getQueue(client, uri, "/", (String) bindings.get(0).get("destination"));
		Map<String, Object> args = (Map<String, Object>) queue.get("arguments");
		assertThat(args).contains(entry("added.by", "customizer"));
		assertThat(args).contains(entry("x-single-active-consumer", Boolean.TRUE));
	}

	@Test
	@SuppressWarnings("unchecked")
	void parentConnectionFactoryInheritedByDefaultAndRabbitSettingsPropagated() {
		context = new SpringApplicationBuilder(SimpleProcessor.class)
			.web(WebApplicationType.NONE).run("--server.port=0",
				"--spring.rabbitmq.port=" + RABBITMQ.getAmqpPort(),
				"--spring.cloud.function.definition=process",
				"--spring.cloud.stream.bindings.source.group=someGroup",
				"--spring.cloud.stream.bindings.process-in-0.group=someGroup",
				"--spring.cloud.stream.rabbit.bindings.process-in-0.consumer.transacted=true",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.transacted=true");
		BinderFactory binderFactory = context.getBean(BinderFactory.class);
		Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
		assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
		BindingService bindingService = context.getBean(BindingService.class);
		DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
			bindingService);
		// @checkstyle:off
		Map<String, List<Binding<MessageChannel>>> consumerBindings = (Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
			.getPropertyValue("consumerBindings");
		// @checkstyle:on
		Binding<MessageChannel> inputBinding = consumerBindings.get("process-in-0").get(0);
		assertThat(TestUtils.getPropertyValue(inputBinding, "lifecycle.beanName"))
			.isEqualTo("setByCustomizer:someGroup");
		SimpleMessageListenerContainer container = TestUtils.getPropertyValue(
			inputBinding, "lifecycle.messageListenerContainer",
			SimpleMessageListenerContainer.class);
		assertThat(TestUtils.getPropertyValue(container, "beanName"))
			.isEqualTo("setByCustomizerForQueue:process-in-0.someGroup,andGroup:someGroup");
		assertThat(TestUtils.getPropertyValue(container, "transactional", Boolean.class))
			.isTrue();
		Map<String, Binding<MessageChannel>> producerBindings = (Map<String, Binding<MessageChannel>>) TestUtils
			.getPropertyValue(bindingService, "producerBindings");
		Binding<MessageChannel> outputBinding = producerBindings.get("process-out-0");
		assertThat(TestUtils.getPropertyValue(outputBinding,
			"lifecycle.amqpTemplate.transactional", Boolean.class)).isTrue();
		assertThat(TestUtils.getPropertyValue(outputBinding, "lifecycle.beanName"))
			.isEqualTo("setByCustomizer:process-out-0");
		DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
		ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
			.getPropertyValue("connectionFactory");
		assertThat(binderConnectionFactory).isInstanceOf(CachingConnectionFactory.class);
		ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
		assertThat(binderConnectionFactory).isSameAs(connectionFactory);
		CompositeHealthContributor bindersHealthIndicator = context
			.getBean("bindersHealthContributor", CompositeHealthContributor.class);

		assertThat(bindersHealthIndicator).isNotNull();

		RabbitHealthIndicator indicator = (RabbitHealthIndicator) bindersHealthIndicator.getContributor("rabbit");
		assertThat(indicator).isNotNull();
		assertThat(indicator.health().getStatus())
			.isEqualTo(Status.UP);

		CachingConnectionFactory cf = this.context
			.getBean(CachingConnectionFactory.class);
		ConnectionNameStrategy cns = TestUtils.getPropertyValue(cf,
			"connectionNameStrategy", ConnectionNameStrategy.class);
		assertThat(cns.obtainNewConnectionName(cf)).startsWith("rabbitConnectionFactory");
//		assertThat(TestUtils.getPropertyValue(consumerBindings.get("source").get(0),
//				"target.source.h.advised.targetSource.target.beanName"))
//			.isEqualTo("setByCustomizer:someGroup");
	}

	@Test
	void parentConnectionFactoryInheritedIfOverridden() {
		context = new SpringApplicationBuilder(SimpleProcessor.class,
			ConnectionFactoryConfiguration.class).web(WebApplicationType.NONE)
			.run("--server.port=0", "--spring.rabbitmq.port=" + RABBITMQ.getAmqpPort());
		BinderFactory binderFactory = context.getBean(BinderFactory.class);
		Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
		assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
		DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
		ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
			.getPropertyValue("connectionFactory");
		assertThat(binderConnectionFactory).isSameAs(MOCK_CONNECTION_FACTORY);
		ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
		assertThat(binderConnectionFactory).isSameAs(connectionFactory);
		CompositeHealthContributor bindersHealthIndicator = context
			.getBean("bindersHealthContributor", CompositeHealthContributor.class);
		assertThat(bindersHealthIndicator).isNotNull();
		RabbitHealthIndicator indicator = (RabbitHealthIndicator) bindersHealthIndicator.getContributor("rabbit");
		assertThat(indicator).isNotNull();
		// mock connection factory behaves as if down
//		assertThat(indicator.health().getStatus())
//			.isEqualTo(Status.DOWN);
	}

	@Test
	@Disabled
	void parentConnectionFactoryNotInheritedByCustomizedBindersAndProducerRetryBootProperties() {
		List<String> params = new ArrayList<>();
		params.add("--spring.cloud.function.definition=process");
		params.add("--spring.cloud.stream.process-in-0.binder=custom");
		params.add("--spring.cloud.stream.process-out-0.binder=custom");
		params.add("--spring.cloud.stream.binders.custom.type=rabbit");
		params.add("--spring.cloud.stream.binders.custom.environment.foo=bar");
		params.add("--server.port=0");
		params.add("--spring.rabbitmq.port=" + RABBITMQ.getAmqpPort());
		params.add("--spring.rabbitmq.template.retry.enabled=true");
		params.add("--spring.rabbitmq.template.retry.maxAttempts=2");
		params.add("--spring.rabbitmq.template.retry.initial-interval=1000");
		params.add("--spring.rabbitmq.template.retry.multiplier=1.1");
		params.add("--spring.rabbitmq.template.retry.max-interval=3000");
		context = new SpringApplicationBuilder(SimpleProcessor.class)
			.web(WebApplicationType.NONE)
			.run(params.toArray(new String[0]));
		BinderFactory binderFactory = context.getBean(BinderFactory.class);
		// @checkstyle:off
		@SuppressWarnings("unchecked")
		Binder<MessageChannel, ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>> binder = (Binder<MessageChannel, ExtendedConsumerProperties<RabbitConsumerProperties>, ExtendedProducerProperties<RabbitProducerProperties>>) binderFactory
			.getBinder(null, MessageChannel.class);
		// @checkstyle:on
		assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
		DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
		ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
			.getPropertyValue("connectionFactory");
		ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
		assertThat(binderConnectionFactory).isNotSameAs(connectionFactory);
		CompositeHealthContributor bindersHealthIndicator = context
			.getBean("bindersHealthContributor", CompositeHealthContributor.class);
		assertThat(bindersHealthIndicator).isNotNull();

		RabbitHealthIndicator indicator = (RabbitHealthIndicator) bindersHealthIndicator.getContributor("custom");
		assertThat(indicator).isNotNull();
		assertThat(indicator.health().getStatus()).isEqualTo(Status.UP);
		String name = UUID.randomUUID().toString();
		Binding<MessageChannel> binding = binder.bindProducer(name, new DirectChannel(),
			new ExtendedProducerProperties<>(new RabbitProducerProperties()));
		RetryTemplate template = TestUtils.getPropertyValue(binding,
			"lifecycle.amqpTemplate.retryTemplate", RetryTemplate.class);
		assertThat(template).isNotNull();
		RetryPolicy retryPolicy = template.getRetryPolicy();
		assertThat(retryPolicy).isInstanceOf(ExponentialBackOff.class);
		ExponentialBackOff backOff = (ExponentialBackOff) retryPolicy.getBackOff();
		assertThat(backOff.getMaxAttempts()).isEqualTo(2);
		assertThat(backOff.getInitialInterval()).isEqualTo(1000L);
		assertThat(backOff.getMultiplier()).isEqualTo(1.1);
		assertThat(backOff.getMaxInterval()).isEqualTo(3000L);
		binding.unbind();
		new RabbitAdmin(rabbitTestSupport.getResource()).deleteExchange(name);
		context.close();
	}

	@Test
	void cloudProfile() {
		this.context = new SpringApplicationBuilder(SimpleProcessor.class,
			MockCloudConfiguration.class).web(WebApplicationType.NONE)
			.profiles("cloud").run();
		BinderFactory binderFactory = this.context.getBean(BinderFactory.class);
		Binder<?, ?, ?> binder = binderFactory.getBinder(null, MessageChannel.class);
		assertThat(binder).isInstanceOf(RabbitMessageChannelBinder.class);
		DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
		ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
			.getPropertyValue("connectionFactory");
		ConnectionFactory connectionFactory = this.context
			.getBean(ConnectionFactory.class);

		assertThat(binderConnectionFactory).isNotSameAs(connectionFactory);

		assertThat(TestUtils.getPropertyValue(connectionFactory, "addresses"))
			.isNotNull();
		assertThat(TestUtils.getPropertyValue(binderConnectionFactory, "addresses"))
			.isNull();

		Cloud cloud = this.context.getBean(Cloud.class);

		verify(cloud).getSingletonServiceConnector(ConnectionFactory.class, null);
	}

	@Test
	void extendedProperties() {
		context = new SpringApplicationBuilder(SimpleProcessor.class)
			.web(WebApplicationType.NONE).run("--server.port=0",
				"--spring.rabbitmq.port=" + RABBITMQ.getAmqpPort(),
				"--spring.cloud.function.definition=process",
				"--spring.cloud.stream.rabbit.default.producer.routing-key-expression=fooRoutingKey",
				"--spring.cloud.stream.rabbit.default.consumer.exchange-type=direct",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.batch-size=512",
				"--spring.cloud.stream.rabbit.default.consumer.max-concurrency=4",
				"--spring.cloud.stream.rabbit.bindings.process-in-0.consumer.exchange-type=fanout",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.alternateExchange.name=altEx",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.alternate-exchange.exists=true",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.alternate-exchange.type=direct",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.alternate-exchange.binding.queue=altQ",
				"--spring.cloud.stream.rabbit.bindings.process-out-0.producer.alternate-exchange.binding.routing-key=altRK");
		BinderFactory binderFactory = context.getBean(BinderFactory.class);
		Binder<?, ?, ?> rabbitBinder = binderFactory.getBinder(null,
			MessageChannel.class);

		RabbitProducerProperties rabbitProducerProperties = (RabbitProducerProperties) ((ExtendedPropertiesBinder) rabbitBinder)
			.getExtendedProducerProperties("process-out-0");

		assertThat(
			rabbitProducerProperties.getRoutingKeyExpression().getExpressionString())
			.isEqualTo("fooRoutingKey");
		assertThat(rabbitProducerProperties.getBatchSize()).isEqualTo(512);
		AlternateExchange alternate = rabbitProducerProperties.getAlternateExchange();
		assertThat(alternate.getName()).isEqualTo("altEx");
		assertThat(alternate.isExists()).isTrue();
		assertThat(alternate.getType()).isEqualTo("direct");
		assertThat(alternate.getBinding().getQueue()).isEqualTo("altQ");
		assertThat(alternate.getBinding().getRoutingKey()).isEqualTo("altRK");

		RabbitConsumerProperties rabbitConsumerProperties = (RabbitConsumerProperties) ((ExtendedPropertiesBinder) rabbitBinder)
			.getExtendedConsumerProperties("process-in-0");

		assertThat(rabbitConsumerProperties.getExchangeType())
			.isEqualTo(ExchangeTypes.FANOUT);
		assertThat(rabbitConsumerProperties.getMaxConcurrency()).isEqualTo(4);
	}

	@EnableAutoConfiguration
	@Configuration
	public static class SimpleProcessor {

		@Bean
		public ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer() {
			return (c, q, g) -> {
				((AbstractMessageListenerContainer) c).setBeanName(
						"setByCustomizerForQueue:" + q + (g == null ? "" : ",andGroup:" + g));
			};
		}

		@Bean
		public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
			return (s, q, g) -> s.setBeanName("setByCustomizer:" + g);
		}

		@Bean
		public ProducerMessageHandlerCustomizer<MessageHandler> messageHandlerCustomizer() {
			return (handler, destinationName) -> {
				if (handler instanceof AbstractMessageHandler messageHandler) {
					messageHandler.setBeanName("setByCustomizer:" + destinationName);
				}
			};
		}

		@Bean
		public ConsumerEndpointCustomizer<AmqpInboundChannelAdapter> adapterCustomizer() {
			return (producer, dest, grp) -> producer.setBeanName("setByCustomizer:" + grp);
		}

		@Bean
		public DeclarableCustomizer customizer() {
			return dec -> {
				dec.addArgument("added.by", "customizer");
				return dec;
			};
		}

		@Bean
		public Function<String, String> process() {
			return s -> s;
		}
	}

	public static class ConnectionFactoryConfiguration {

		@Bean
		public ConnectionFactory connectionFactory() {
			return MOCK_CONNECTION_FACTORY;
		}

	}

	public static class MockCloudConfiguration {

		@Bean
		public Cloud cloud() {
			Cloud cloud = mock(Cloud.class);

			willReturn(new CachingConnectionFactory("localhost")).given(cloud)
				.getSingletonServiceConnector(ConnectionFactory.class, null);

			return cloud;
		}

	}
}