AbstractBinder.java

/*
 * Copyright 2013-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.expression.EvaluationContext;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
 * Base class for {@link Binder} implementations.
 *
 * @param <T> outbound bind target class
 * @param <C> consumer properties class
 * @param <P> producer properties class
 * @author David Turanski
 * @author Gary Russell
 * @author Ilayaperumal Gopinathan
 * @author Mark Fisher
 * @author Marius Bogoevici
 * @author Soby Chacko
 * @author Vinicius Carvalho
 * @author Oleg Zhurakousky
 * @author Nicolas Homble
 */
public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends ProducerProperties>
		implements ApplicationContextAware, InitializingBean, Binder<T, C, P> {

	/**
	 * The delimiter between a group and index when constructing a binder
	 * consumer/producer.
	 */
	private static final String GROUP_INDEX_DELIMITER = ".";

	protected final Log logger = LogFactory.getLog(getClass());

	private volatile GenericApplicationContext applicationContext;

	private volatile EvaluationContext evaluationContext;

	@Autowired(required = false)
	@StreamRetryTemplate
	private Map<String, RetryTemplate> consumerBindingRetryTemplates;

	/**
	 * For binder implementations that support a prefix, apply the prefix to the name.
	 * @param prefix the prefix.
	 * @param name the name.
	 * @return name with the prefix
	 */
	public static String applyPrefix(String prefix, String name) {
		return prefix + name;
	}

	/**
	 * For binder implementations that support dead lettering, construct the name of the
	 * dead letter entity for the underlying pipe name.
	 * @param name the name.
	 * @return name with DLQ suffix
	 */
	public static String constructDLQName(String name) {
		return name + ".dlq";
	}

	protected AbstractApplicationContext getApplicationContext() {
		return this.applicationContext;
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext)
			throws BeansException {
		Assert.isInstanceOf(GenericApplicationContext.class, applicationContext);
		this.applicationContext = (GenericApplicationContext) applicationContext;
		Map<String, EvaluationContext> beansOfType = this.applicationContext.getBeansOfType(EvaluationContext.class);
		if (!beansOfType.isEmpty()) {
			this.evaluationContext = beansOfType.values().iterator().next();
		}

	}

	protected ConfigurableListableBeanFactory getBeanFactory() {
		return this.applicationContext.getBeanFactory();
	}

	protected EvaluationContext getEvaluationContext() {
		return this.evaluationContext;
	}

	@Override
	public final void afterPropertiesSet() throws Exception {
		Assert.notNull(this.applicationContext,
				"The 'applicationContext' property must not be null");
		onInit();
	}

	/**
	 * Subclasses may implement this method to perform any necessary initialization. It
	 * will be invoked from {@link #afterPropertiesSet()} which is itself {@code final}.
	 * @throws Exception when init fails
	 */
	protected void onInit() throws Exception {
		// no-op default
	}

	@Override
	public final Binding<T> bindConsumer(String name, String group, T target,
			C properties) {
		if (!StringUtils.hasText(group)) {
			Assert.isTrue(!properties.isPartitioned(),
					"A consumer group is required for a partitioned subscription");
		}
		return doBindConsumer(name, group, target, properties);
	}

	protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget,
			C properties);

	@Override
	public final Binding<T> bindProducer(String name, T outboundBindTarget,
			P properties) {
		return doBindProducer(name, outboundBindTarget, properties);
	}

	protected abstract Binding<T> doBindProducer(String name, T outboundBindTarget,
			P properties);

	/**
	 * Construct a name comprising the name and group.
	 * @param name the name.
	 * @param group the group.
	 * @return the constructed name.
	 */
	protected final String groupedName(String name, String group) {
		return name + GROUP_INDEX_DELIMITER
				+ (StringUtils.hasText(group) ? group : "default");
	}

	/**
	 * Attempts to get {@link BindingServiceProperties} from application context.
	 *
	 * @return instance of {@link BindingServiceProperties} or null.
	 */
	protected BindingServiceProperties getBindingServiceProperties() {
		try {
			return getApplicationContext().getBean(BindingServiceProperties.class);
		}
		catch (Exception e) {
			// ignore
			return null;
		}
	}

	/**
	 * Create and configure a default retry template unless one has already been provided
	 * via @Bean by an application.
	 * @param properties The properties.
	 * @return The retry template
	 */
	protected RetryTemplate buildRetryTemplate(ConsumerProperties properties) {
		RetryTemplate rt;
		if (CollectionUtils.isEmpty(this.consumerBindingRetryTemplates)) {
			rt = new RetryTemplate();

			Map<Class<? extends Throwable>, Boolean> retryableExceptionMapping = properties.getRetryableExceptions();
			List<Class<? extends Throwable>> retryableExceptions = new ArrayList<>();
			List<Class<? extends Throwable>> nonRetryableExceptions = new ArrayList<>();
			for (var classBooleanEntry : retryableExceptionMapping.entrySet()) {
				Class<? extends Throwable> exceptionClass = classBooleanEntry.getKey();
				if (classBooleanEntry.getValue()) {
					retryableExceptions.add(exceptionClass);
				}
				else {
					nonRetryableExceptions.add(exceptionClass);
				}
			}
			RetryPolicy retryPolicy =
				RetryPolicy.builder()
					.maxRetries(properties.getMaxAttempts())
					.delay(Duration.ofMillis(properties.getBackOffInitialInterval()))
					.multiplier(properties.getBackOffMultiplier())
					.maxDelay(Duration.ofMillis(properties.getBackOffMaxInterval()))
					.includes(retryableExceptions)
					.excludes(nonRetryableExceptions)
				.build();
			rt.setRetryPolicy(retryPolicy);
		}
		else {
			rt = StringUtils.hasText(properties.getRetryTemplateName())
					? this.consumerBindingRetryTemplates
							.get(properties.getRetryTemplateName())
					: this.consumerBindingRetryTemplates.values().iterator().next();
		}
		return rt;
	}

}