BindingServiceProperties.java
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.util.Assert;
/**
* @author Dave Syer
* @author Marius Bogoevici
* @author Gary Russell
* @author Ilayaperumal Gopinathan
* @author Oleg Zhurakousky
* @author Michael Michailidis
* @author Kurt Hong
*/
@ConfigurationProperties("spring.cloud.stream")
@JsonInclude(Include.NON_DEFAULT)
public class BindingServiceProperties
implements ApplicationContextAware, InitializingBean {
private static final int DEFAULT_BINDING_RETRY_INTERVAL = 30;
/**
* A semi-colon delimited string to explicitly define input bindings (specifically for cases when there
* is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
*/
private String inputBindings;
/**
* A semi-colon delimited string to explicitly define output bindings (specifically for cases when there
* is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
*/
private String outputBindings;
/**
* The instance id of the application: a number from 0 to instanceCount-1. Used for
* partitioning and with Kafka. NOTE: Could also be managed per individual binding
* "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name
* of the binding.
*/
@Value("${INSTANCE_INDEX:${CF_INSTANCE_INDEX:0}}")
private int instanceIndex;
/**
* A list of instance id's from 0 to instanceCount-1. Used for partitioning and with
* Kafka. NOTE: Could also be managed per individual binding
* "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is
* the name of the binding. This setting will override the one set in
* 'spring.cloud.stream.instance-index'
*/
private List<Integer> instanceIndexList = new ArrayList<>();
/**
* The number of deployed instances of an application. Default: 1. NOTE: Could also be
* managed per individual binding
* "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name
* of the binding.
*/
private int instanceCount = 1;
/**
* Additional binding properties (see {@link BinderProperties}) per binding name
* (e.g., 'input`).
*
* For example; This sets the content-type for the 'input' binding of a Sink
* application: 'spring.cloud.stream.bindings.input.contentType=text/plain'
*/
private Map<String, BindingProperties> bindings = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
/**
* Additional per-binder properties (see {@link BinderProperties}) if more then one
* binder of the same type is used (i.e., connect to multiple instances of RabbitMq).
* Here you can specify multiple binder configurations, each with different
* environment settings. For example; spring.cloud.stream.binders.rabbit1.environment.
* . . , spring.cloud.stream.binders.rabbit2.environment. . .
*/
private Map<String, BinderProperties> binders = new HashMap<>();
/**
* The name of the binder to use by all bindings in the event multiple binders
* available (e.g., 'rabbit').
*/
private String defaultBinder;
/**
* The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once
* this size is reached, new destinations will trigger the removal of old destinations.
* Default: 10
*/
private int dynamicDestinationCacheSize = 10;
/**
* Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec.
*/
private int bindingRetryInterval = DEFAULT_BINDING_RETRY_INTERVAL;
private ConfigurableApplicationContext applicationContext = new GenericApplicationContext();
private ConversionService conversionService;
public Map<String, BindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, BindingProperties> bindings) {
this.bindings.putAll(bindings);
}
public Map<String, BinderProperties> getBinders() {
return this.binders;
}
public void setBinders(Map<String, BinderProperties> binders) {
this.binders = binders;
}
public String getDefaultBinder() {
return this.defaultBinder;
}
public void setDefaultBinder(String defaultBinder) {
this.defaultBinder = defaultBinder;
}
public int getInstanceIndex() {
return this.instanceIndex;
}
public void setInstanceIndex(int instanceIndex) {
this.instanceIndex = instanceIndex;
}
public List<Integer> getInstanceIndexList() {
return this.instanceIndexList;
}
public void setInstanceIndexList(List<Integer> instanceIndexList) {
this.instanceIndexList = instanceIndexList;
}
public int getInstanceCount() {
return this.instanceCount;
}
public void setInstanceCount(int instanceCount) {
this.instanceCount = instanceCount;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
GenericConversionService cs = (GenericConversionService) IntegrationUtils
.getConversionService(this.applicationContext.getBeanFactory());
if (this.applicationContext.containsBean("spelConverter")) {
Converter<?, ?> converter = (Converter<?, ?>) this.applicationContext
.getBean("spelConverter");
cs.addConverter(converter);
}
}
public void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}
@Override
public void afterPropertiesSet() throws Exception {
if (this.conversionService == null) {
this.conversionService = this.applicationContext.getBean(
IntegrationUtils.INTEGRATION_CONVERSION_SERVICE_BEAN_NAME,
ConversionService.class);
}
}
public String getBinderType(String binderName) {
BinderProperties bp = this.binders.get(binderName);
return bp != null ? bp.getType() : this.bindings.keySet().iterator().next();
}
public String getBinder(String bindingName) {
return getBindingProperties(bindingName).getBinder();
}
/**
* Return configuration properties as Map.
* @return map of binding configuration properties.
*/
public Map<String, Object> asMapProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put("instanceIndex", String.valueOf(getInstanceIndex()));
properties.put("instanceCount", String.valueOf(getInstanceCount()));
properties.put("defaultBinder", getDefaultBinder());
for (Map.Entry<String, BindingProperties> entry : this.bindings.entrySet()) {
properties.put(entry.getKey(), entry.getValue().toString());
}
for (Map.Entry<String, BinderProperties> entry : this.binders.entrySet()) {
properties.put(entry.getKey(), entry.getValue());
}
return properties;
}
public ConsumerProperties getConsumerProperties(String inputBindingName) {
Assert.notNull(inputBindingName, "The input binding name cannot be null");
BindingProperties bindingProperties = getBindingProperties(inputBindingName);
ConsumerProperties consumerProperties = bindingProperties.getConsumer();
if (consumerProperties == null) {
consumerProperties = new ConsumerProperties();
bindingProperties.setConsumer(consumerProperties);
}
// propagate instance count and instance index if not already set
if (consumerProperties.getInstanceCount() < 0) {
consumerProperties.setInstanceCount(this.instanceCount);
}
if (consumerProperties.getInstanceIndex() < 0) {
consumerProperties.setInstanceIndex(this.instanceIndex);
}
if (consumerProperties.getInstanceIndexList() == null) {
consumerProperties.setInstanceIndexList(this.instanceIndexList);
}
return consumerProperties;
}
public ProducerProperties getProducerProperties(String outputBindingName) {
Assert.notNull(outputBindingName, "The output binding name cannot be null");
BindingProperties bindingProperties = getBindingProperties(outputBindingName);
ProducerProperties producerProperties = bindingProperties.getProducer();
if (producerProperties == null) {
producerProperties = new ProducerProperties();
bindingProperties.setProducer(producerProperties);
}
return producerProperties;
}
public BindingProperties getBindingProperties(String bindingName) {
this.bindIfNecessary(bindingName);
BindingProperties bindingProperties = this.bindings.get(bindingName);
if (bindingProperties.getDestination() == null) {
bindingProperties.setDestination(bindingName);
}
return bindingProperties;
}
public String getGroup(String bindingName) {
return getBindingProperties(bindingName).getGroup();
}
public String getBindingDestination(String bindingName) {
return getBindingProperties(bindingName).getDestination();
}
public int getBindingRetryInterval() {
return this.bindingRetryInterval;
}
public void setBindingRetryInterval(int bindingRetryInterval) {
this.bindingRetryInterval = bindingRetryInterval;
}
public void updateProducerProperties(String bindingName,
ProducerProperties producerProperties) {
if (this.bindings.containsKey(bindingName)) {
this.bindings.get(bindingName).setProducer(producerProperties);
}
}
public int getDynamicDestinationCacheSize() {
return dynamicDestinationCacheSize;
}
public void setDynamicDestinationCacheSize(int dynamicDestinationCacheSize) {
this.dynamicDestinationCacheSize = dynamicDestinationCacheSize;
}
public String getInputBindings() {
return inputBindings;
}
public void setInputBindings(String inputBindings) {
this.inputBindings = inputBindings;
}
public String getOutputBindings() {
return outputBindings;
}
public void setOutputBindings(String outputBindings) {
this.outputBindings = outputBindings;
}
/*
* The "necessary" implies the scenario where only defaults are defined.
*/
private void bindIfNecessary(String bindingName) {
if (!this.bindings.containsKey(bindingName)) {
this.bindToDefault(bindingName);
}
}
private void bindToDefault(String binding) {
BindingProperties bindingPropertiesTarget = new BindingProperties();
Binder binder = new Binder(
ConfigurationPropertySources
.get(this.applicationContext.getEnvironment()),
new PropertySourcesPlaceholdersResolver(
this.applicationContext.getEnvironment()),
IntegrationUtils.getConversionService(
this.applicationContext.getBeanFactory()),
null);
binder.bind("spring.cloud.stream.default",
Bindable.ofInstance(bindingPropertiesTarget));
this.bindings.put(binding, bindingPropertiesTarget);
}
}