BindingServiceConfiguration.java

/*
 * Copyright 2015-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.config;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import tools.jackson.databind.ObjectMapper;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.SearchStrategy;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.cloud.stream.binder.BinderChildContextInitializer;
import org.springframework.cloud.stream.binder.BinderConfiguration;
import org.springframework.cloud.stream.binder.BinderCustomizer;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderType;
import org.springframework.cloud.stream.binder.BinderTypeRegistry;
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
import org.springframework.cloud.stream.binding.Bindable;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.cloud.stream.binding.ContextStartAfterRefreshListener;
import org.springframework.cloud.stream.binding.DynamicDestinationsBindable;
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.cloud.stream.binding.OutputBindingLifecycle;
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Role;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
 * Configuration class that provides necessary beans for {@link MessageChannel} binding.
 *
 * @author Dave Syer
 * @author David Turanski
 * @author Marius Bogoevici
 * @author Ilayaperumal Gopinathan
 * @author Gary Russell
 * @author Vinicius Carvalho
 * @author Artem Bilan
 * @author Oleg Zhurakousky
 * @author Soby Chacko
 * @author Chris Bono
 * @author Omer Celik
 */
@AutoConfiguration
@EnableConfigurationProperties({ BindingServiceProperties.class,
		SpringIntegrationProperties.class})
@Import({ SpelExpressionConverterConfiguration.class })
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnBean(value = BinderTypeRegistry.class, search = SearchStrategy.CURRENT)
public class BindingServiceConfiguration {

	@Autowired(required = false)
	private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;

	public static Map<String, BinderConfiguration> getBinderConfigurations(
			BinderTypeRegistry binderTypeRegistry,
			BindingServiceProperties bindingServiceProperties) {

		Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
		Map<String, BinderProperties> declaredBinders = bindingServiceProperties
				.getBinders();
		boolean defaultCandidatesExist = false;
		Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
				.entrySet().iterator();
		while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
			defaultCandidatesExist = binderPropertiesIterator.next().getValue()
					.isDefaultCandidate();
		}
		List<String> existingBinderConfigurations = new ArrayList<>();
		for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
				.entrySet()) {
			BinderProperties binderProperties = binderEntry.getValue();
			if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
				binderConfigurations.put(binderEntry.getKey(),
						new BinderConfiguration(binderEntry.getKey(),
								binderProperties.getEnvironment(),
								binderProperties.isInheritEnvironment(),
								binderProperties.isDefaultCandidate()));
				existingBinderConfigurations.add(binderEntry.getKey());
			}
			else {
				Assert.hasText(binderProperties.getType(),
						"No 'type' property present for custom binder "
								+ binderEntry.getKey());
				binderConfigurations.put(binderEntry.getKey(),
						new BinderConfiguration(binderProperties.getType(),
								binderProperties.getEnvironment(),
								binderProperties.isInheritEnvironment(),
								binderProperties.isDefaultCandidate()));
				existingBinderConfigurations.add(binderEntry.getKey());
			}
		}
		for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
				.entrySet()) {
			if (configurationEntry.getValue().isDefaultCandidate()) {
				defaultCandidatesExist = true;
				break;
			}
		}
		if (!defaultCandidatesExist) {
			for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
					.entrySet()) {
				if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
					binderConfigurations.put(binderEntry.getKey(),
							new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
									true, true)); // true, !"integration".equals(binderEntry.getKey())));
				}
			}
		}
		return binderConfigurations;
	}

	@Bean
	public static BeanPostProcessor globalErrorChannelCustomizer() {
		return new BeanPostProcessor() {
			@Override
			public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
				if ("errorChannel".equals(beanName) && bean instanceof PublishSubscribeChannel publishSubscribeChannel) {
						publishSubscribeChannel.setIgnoreFailures(true);
				}
				return bean;
			}
		};
	}

	@Bean
	public BindingHandlerAdvise BindingHandlerAdvise(
			@Nullable MappingsProvider[] providers) {
		Map<ConfigurationPropertyName, ConfigurationPropertyName> additionalMappings = new HashMap<>();
		if (!ObjectUtils.isEmpty(providers)) {
			for (int i = 0; i < providers.length; i++) {
				MappingsProvider mappingsProvider = providers[i];
				additionalMappings.putAll(mappingsProvider.getDefaultMappings());
			}
		}
		return new BindingHandlerAdvise(additionalMappings);
	}

	@Bean
	@ConditionalOnMissingBean(BinderFactory.class)
	public DefaultBinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
			BindingServiceProperties bindingServiceProperties,
			ObjectProvider<BinderCustomizer> binderCustomizerProvider,
			BinderChildContextInitializer binderChildContextInitializer) {
		DefaultBinderFactory binderFactory = new DefaultBinderFactory(
				getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
				binderTypeRegistry, binderCustomizerProvider.getIfUnique());
		binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
		binderFactory.setListeners(this.binderFactoryListeners);
		binderChildContextInitializer.setBinderFactory(binderFactory);
		return binderFactory;
	}

	@Bean
	public BinderChildContextInitializer binderChildContextInitializer() {
		return new BinderChildContextInitializer();
	}

	@Bean
	// This conditional is intentionally not in an autoconfig (usually a bad idea) because
	// it is used to detect a BindingService in the parent context (which we know
	// already exists).
	@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
	public BindingService bindingService(
			BindingServiceProperties bindingServiceProperties,
			BinderFactory binderFactory, TaskScheduler taskScheduler, @Nullable ObjectMapper objectMapper) {
		objectMapper = objectMapper == null ? new ObjectMapper() : objectMapper;
		return new BindingService(bindingServiceProperties, binderFactory, taskScheduler, objectMapper);
	}

	@Bean
	@DependsOn("bindingService")
	public OutputBindingLifecycle outputBindingLifecycle(BindingService bindingService,
			Map<String, Bindable> bindables) {

		return new OutputBindingLifecycle(bindingService, bindables);
	}

	@Bean
	@DependsOn("bindingService")
	public InputBindingLifecycle inputBindingLifecycle(BindingService bindingService,
			Map<String, Bindable> bindables) {
		return new InputBindingLifecycle(bindingService, bindables);
	}

	@Bean
	public BindingsLifecycleController bindingsLifecycleController(List<InputBindingLifecycle> inputBindingLifecycles,
			List<OutputBindingLifecycle> outputBindingsLifecycles) {
		return new BindingsLifecycleController(inputBindingLifecycles, outputBindingsLifecycles);
	}

	@Bean
	@DependsOn("bindingService")
	public ContextStartAfterRefreshListener contextStartAfterRefreshListener() {
		return new ContextStartAfterRefreshListener();
	}

	@Bean
	public DynamicDestinationsBindable dynamicDestinationsBindable() {
		return new DynamicDestinationsBindable();
	}

	@Bean
	public ApplicationListener<ContextRefreshedEvent> appListener(
			SpringIntegrationProperties springIntegrationProperties) {
		return event -> event.getApplicationContext()
				.getBeansOfType(AbstractReplyProducingMessageHandler.class)
				.values()
				.forEach(mh -> mh
						.addNotPropagatedHeaders(springIntegrationProperties
								.getMessageHandlerNotPropagatedHeaders()));
	}

}