AbstractMessageChannelBinder.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.springframework.cloud.function.json.JacksonMapper;
import tools.jackson.core.JacksonException;
import tools.jackson.core.JsonGenerator;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationContext;
import tools.jackson.databind.module.SimpleModule;
import tools.jackson.databind.ser.std.StdSerializer;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* {@link AbstractBinder} that serves as base class for {@link MessageChannel} binders.
* Implementors must implement the following methods:
* <ul>
* <li>{@link #createProducerMessageHandler(ProducerDestination, ProducerProperties, MessageChannel)}</li>
* <li>{@link #createConsumerEndpoint(ConsumerDestination, String, ConsumerProperties)}
* </li>
* </ul>
*
* @param <C> the consumer properties type
* @param <P> the producer properties type
* @param <PP> the provisioning producer properties type
*
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
* @author Oleg Zhurakousky
* @author Artem Bilan
* @author Gary Russell
* @author Byungjun You
* @since 1.1
*/
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>>
extends AbstractBinder<MessageChannel, C, P> implements
PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {
/**
* {@link ProvisioningProvider} delegated by the downstream binder implementations.
*/
protected final PP provisioningProvider;
private final EmbeddedHeadersChannelInterceptor embeddedHeadersChannelInterceptor = new EmbeddedHeadersChannelInterceptor(
this.logger);
private volatile ObjectMapper objectMapper;
/**
* Indicates which headers are to be embedded in the payload if a binding requires
* embedding headers.
*/
private final String[] headersToEmbed;
private ListenerContainerCustomizer<?> containerCustomizer;
private final MessageSourceCustomizer<?> sourceCustomizer;
private ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer =
(handler, destination) -> { };
private ConsumerEndpointCustomizer<MessageProducer> consumerCustomizer =
(adapter, destination, group) -> { };
private ApplicationEventPublisher applicationEventPublisher;
public AbstractMessageChannelBinder(String[] headersToEmbed,
PP provisioningProvider) {
this(headersToEmbed, provisioningProvider, null, null);
}
@Override
protected void onInit() throws Exception {
if (!CollectionUtils.isEmpty(this.getApplicationContext().getBeansOfType(JacksonMapper.class))) {
JacksonMapper jacksonMapper = this.getApplicationContext().getBean(JacksonMapper.class);
this.objectMapper = jacksonMapper.getObjectMapper();
}
else {
this.objectMapper = new ObjectMapper();
}
// if (!CollectionUtils.isEmpty(this.getApplicationContext().getBeansOfType(ObjectMapper.class))) {
// this.objectMapper = this.getApplicationContext().getBean(ObjectMapper.class);
// }
// else {
//
// }
SimpleModule module = new SimpleModule();
module.addSerializer(Expression.class, new ExpressionSerializer(Expression.class));
this.objectMapper = this.objectMapper.rebuild().build();
}
public AbstractMessageChannelBinder(String[] headersToEmbed, PP provisioningProvider,
@Nullable ListenerContainerCustomizer<?> containerCustomizer,
@Nullable MessageSourceCustomizer<?> sourceCustomizer) {
this.headersToEmbed = headersToEmbed == null ? new String[0] : headersToEmbed;
this.provisioningProvider = provisioningProvider;
this.containerCustomizer = containerCustomizer == null ? (c, q, g) -> {
} : containerCustomizer;
this.sourceCustomizer = sourceCustomizer == null ? (s, q, g) -> {
} : sourceCustomizer;
}
protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
* Configure an optional {@link ProducerMessageHandlerCustomizer} for further
* configuration of producer {@link MessageHandler} instances created by the binder.
* @param handlerCustomizer the {@link ProducerMessageHandlerCustomizer} to use.
* @since 3.0
*/
@SuppressWarnings("unchecked")
public void setProducerMessageHandlerCustomizer(
@Nullable ProducerMessageHandlerCustomizer<? extends MessageHandler> handlerCustomizer) {
this.handlerCustomizer =
handlerCustomizer == null
? (handler, destination) -> { }
: (ProducerMessageHandlerCustomizer<MessageHandler>) handlerCustomizer;
}
/**
* Configure an optional {@link ConsumerEndpointCustomizer} for further
* configuration of consumer {@link MessageProducer} instances created by the binder.
* @param endpointCustomizer the {@link ConsumerEndpointCustomizer} to use.
* @since 3.0
*/
@SuppressWarnings("unchecked")
public void setConsumerEndpointCustomizer(
@Nullable ConsumerEndpointCustomizer<? extends MessageProducer> endpointCustomizer) {
this.consumerCustomizer =
endpointCustomizer == null
? (handler, destination, group) -> { }
: (ConsumerEndpointCustomizer<MessageProducer>) endpointCustomizer;
}
/**
* Configure an optional {@link ListenerContainerCustomizer} for further
* configuration of the listener container instance created by the binder.
* @param containerCustomizer the {@link ListenerContainerCustomizer} to use.
*/
public void setContainerCustomizer(@Nullable ListenerContainerCustomizer<?> containerCustomizer) {
this.containerCustomizer =
containerCustomizer == null
? (container, destinationName, group) -> { }
: containerCustomizer;
}
@SuppressWarnings("unchecked")
protected <L> ListenerContainerCustomizer<L> getContainerCustomizer() {
return (ListenerContainerCustomizer<L>) this.containerCustomizer;
}
@SuppressWarnings("unchecked")
protected <S> MessageSourceCustomizer<S> getMessageSourceCustomizer() {
return (MessageSourceCustomizer<S>) this.sourceCustomizer;
}
private String resolveBinderName(String bindingName, BindingServiceProperties bindingServiceProperties) {
String binder = resolveBinder(bindingName, bindingServiceProperties);
if (!StringUtils.hasText(binder)) {
return resolveFromDefaultBinder();
}
return binder;
}
private String resolveBinderType(String bindingName, BindingServiceProperties bindingServiceProperties) {
String binder = resolveBinder(bindingName, bindingServiceProperties);
if (!StringUtils.hasText(binder)) {
return resolveFromDefaultBinder();
}
else {
if (bindingServiceProperties.getBinders().get(binder) == null) {
return binder;
}
return bindingServiceProperties.getBinders().get(binder).getType();
}
}
private static String resolveBinder(String bindingName, BindingServiceProperties bindingServiceProperties) {
String binder = null;
if (bindingServiceProperties != null) {
BindingProperties bindingProperties = bindingServiceProperties.getBindings().get(bindingName);
if (bindingProperties != null) {
binder = bindingProperties.getBinder();
}
}
return binder;
}
private String resolveFromDefaultBinder() {
DefaultBinderTypeRegistry binderTypeRegistry =
AbstractMessageChannelBinder.this.getApplicationContext().getBean(DefaultBinderTypeRegistry.class);
Map<String, BinderType> binderTypes = binderTypeRegistry.getAll();
if (binderTypes.entrySet().size() > 1 && getBindingServiceProperties().getDefaultBinder() != null) {
return getBindingServiceProperties().getDefaultBinder();
}
Assert.isTrue(binderTypes.entrySet().size() <= 1, "More than one binder types found, but no binder specified on the binding");
return (binderTypes.entrySet().size() < 1) ? null : binderTypes.keySet().iterator().next();
}
/**
* Binds an outbound channel to a given destination. The implementation delegates to
* {@link ProvisioningProvider#provisionProducerDestination(String, ProducerProperties)}
* and
* {@link #createProducerMessageHandler(ProducerDestination, ProducerProperties, MessageChannel)}
* for handling the middleware specific logic. If the returned producer message
* handler is an {@link InitializingBean} then
* {@link InitializingBean#afterPropertiesSet()} will be called on it. Similarly, if
* the returned producer message handler endpoint is a {@link Lifecycle}, then
* {@link Lifecycle#start()} will be called on it.
* @param destination the name of the destination
* @param outputChannel the channel to be bound
* @param producerProperties the {@link ProducerProperties} of the binding
* @return the Binding for the channel
* @throws BinderException on internal errors during binding
*/
@Override
public final Binding<MessageChannel> doBindProducer(final String destination,
MessageChannel outputChannel, final P producerProperties)
throws BinderException {
final MessageHandler producerMessageHandler;
final ProducerDestination producerDestination;
try {
producerDestination = this.provisioningProvider
.provisionProducerDestination(destination, producerProperties);
BindingProperties bp = null;
BindingServiceProperties bsp = this.getBindingServiceProperties();
if (bsp != null) {
String bindingName = StringUtils.hasText(producerProperties.getBindingName()) ? producerProperties.getBindingName() : destination;
bp = bsp.getBindingProperties(bindingName);
}
boolean errorHandlerDefined = bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition());
SubscribableChannel errorChannel = errorHandlerDefined || producerProperties.isErrorChannelEnabled()
? registerErrorInfrastructure(producerDestination, producerProperties.getBindingName(), errorHandlerDefined)
: null;
String errorChannelName = errorsBaseName(producerDestination, producerProperties.getBindingName());
this.subscribeFunctionErrorHandler(errorChannelName, producerProperties.getBindingName());
producerMessageHandler = createProducerMessageHandler(producerDestination,
producerProperties, outputChannel, errorChannel);
customizeProducerMessageHandler(producerMessageHandler, producerDestination.getName());
if (producerMessageHandler instanceof InitializingBean initializingHandler) {
initializingHandler.afterPropertiesSet();
}
}
catch (Exception e) {
if (e instanceof BinderException binderException) {
throw binderException;
}
else if (e instanceof ProvisioningException provisioningException) {
throw provisioningException;
}
else {
throw new BinderException(
"Exception thrown while building outbound endpoint", e);
}
}
if (producerProperties.isAutoStartup()
&& producerMessageHandler instanceof Lifecycle ProducerMessageHandlerWithLifeCycle) {
ProducerMessageHandlerWithLifeCycle.start();
}
this.postProcessOutputChannel(outputChannel, producerProperties);
AtomicReference<ReactiveStreamsConsumer> reactiveStreamsConsumerRef = new AtomicReference<>();
if (outputChannel instanceof SubscribableChannel subscribableOutputChannel) {
subscribableOutputChannel
.subscribe(new SendingHandler(producerMessageHandler,
HeaderMode.embeddedHeaders
.equals(producerProperties.getHeaderMode()),
this.headersToEmbed, useNativeEncoding(producerProperties)));
}
else if (outputChannel instanceof FluxMessageChannel) {
final ReactiveStreamsConsumer reactiveStreamsConsumer = new ReactiveStreamsConsumer(outputChannel, producerMessageHandler);
reactiveStreamsConsumerRef.set(reactiveStreamsConsumer);
reactiveStreamsConsumer.start();
}
else {
throw new IllegalStateException("No capable binding targets found.");
}
BindingServiceProperties bsp = this.getBindingServiceProperties();
Binding<MessageChannel> binding = new DefaultBinding<MessageChannel>(destination,
outputChannel, producerMessageHandler instanceof Lifecycle producerMessageHandlerWithLifecycle
? producerMessageHandlerWithLifecycle : null) {
@Override
public Map<String, Object> getExtendedInfo() {
return doGetExtendedInfo(destination, producerProperties);
}
@SuppressWarnings({ "unchecked", "hiding" })
public <P> P getExtension() {
if (producerProperties instanceof ExtendedProducerProperties extendedProperties) {
return (P) extendedProperties.getExtension();
}
return null;
}
@Override
public boolean isInput() {
return false;
}
@Override
public String getBinderName() {
return resolveBinderName(getBindingName(), bsp);
}
@Override
public String getBinderType() {
return resolveBinderType(getBindingName(), bsp);
}
@Override
public void afterUnbind() {
try {
destroyErrorInfrastructure(producerDestination, producerProperties.getBindingName());
final ReactiveStreamsConsumer rsc = reactiveStreamsConsumerRef.get();
if (rsc != null && rsc.isRunning()) {
rsc.destroy();
}
if (producerMessageHandler instanceof DisposableBean disposableProducerMessageHandler) {
disposableProducerMessageHandler.destroy();
}
}
catch (Exception e) {
AbstractMessageChannelBinder.this.logger
.error("Exception thrown while unbinding " + toString(), e);
}
afterUnbindProducer(producerDestination, producerProperties);
}
};
Lifecycle companion = null;
String outputChannelName = ((AbstractMessageChannel) outputChannel).getBeanName();
String companionLifecycleName = outputChannelName + "_spca";
if (this.getApplicationContext().containsBean(companionLifecycleName)) {
companion = this.getApplicationContext().getBean(companionLifecycleName, Lifecycle.class);
}
((DefaultBinding<?>) binding).setCompanion(companion);
doPublishEvent(new BindingCreatedEvent(binding));
return binding;
}
protected void customizeProducerMessageHandler(MessageHandler producerMessageHandler, String destinationName) {
this.handlerCustomizer.configure(producerMessageHandler, destinationName);
}
/**
* Whether the producer for the destination being created should be configured to use
* native encoding which may, or may not, be determined from the properties. For
* example, a transactional kafka binder uses a common producer for all destinations.
* The default implementation returns {@link ProducerProperties#isUseNativeEncoding()}.
* @param producerProperties the properties.
* @return true to use native encoding.
*/
protected boolean useNativeEncoding(P producerProperties) {
return producerProperties.isUseNativeEncoding();
}
/**
* Allows subclasses to perform post processing on the channel - for example to add
* more interceptors.
* @param outputChannel the channel.
* @param producerProperties the producer properties.
*/
protected void postProcessOutputChannel(MessageChannel outputChannel,
P producerProperties) {
// default no-op
}
/**
* Create a {@link MessageHandler} with the ability to send data to the target
* middleware. If the returned instance is also a {@link Lifecycle}, it will be
* stopped automatically by the binder.
* <p>
* In order to be fully compliant, the {@link MessageHandler} of the binder must
* observe the following headers:
* <ul>
* <li>{@link BinderHeaders#PARTITION_HEADER} - indicates the target partition where
* the message must be sent</li>
* </ul>
* <p>
* @param destination the name of the target destination.
* @param producerProperties the producer properties.
* @param channel the channel to bind.
* @param errorChannel the error channel (if enabled, otherwise null). If not null,
* the binder must wire this channel into the producer endpoint so that errors are
* forwarded to it.
* @return the message handler for sending data to the target middleware
* @throws Exception when producer messsage handler failed to be created
*/
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
P producerProperties, MessageChannel channel, MessageChannel errorChannel)
throws Exception {
return createProducerMessageHandler(destination, producerProperties,
errorChannel);
}
/**
* Create a {@link MessageHandler} with the ability to send data to the target
* middleware. If the returned instance is also a {@link Lifecycle}, it will be
* stopped automatically by the binder.
* <p>
* In order to be fully compliant, the {@link MessageHandler} of the binder must
* observe the following headers:
* <ul>
* <li>{@link BinderHeaders#PARTITION_HEADER} - indicates the target partition where
* the message must be sent</li>
* </ul>
* <p>
* @param destination the name of the target destination
* @param producerProperties the producer properties
* @param errorChannel the error channel (if enabled, otherwise null). If not null,
* the binder must wire this channel into the producer endpoint so that errors are
* forwarded to it.
* @return the message handler for sending data to the target middleware
* @throws Exception upon failure to create the producer message handler
*/
protected abstract MessageHandler createProducerMessageHandler(
ProducerDestination destination, P producerProperties,
MessageChannel errorChannel) throws Exception;
/**
* Invoked after the unbinding of a producer. Subclasses may override this to provide
* their own logic for dealing with unbinding.
* @param destination the bound destination
* @param producerProperties the producer properties
*/
protected void afterUnbindProducer(ProducerDestination destination,
P producerProperties) {
}
/**
* Binds an inbound channel to a given destination. The implementation delegates to
* {@link ProvisioningProvider#provisionConsumerDestination(String, String, ConsumerProperties)}
* and
* {@link #createConsumerEndpoint(ConsumerDestination, String, ConsumerProperties)}
* for handling middleware-specific logic. If the returned consumer endpoint is an
* {@link InitializingBean} then {@link InitializingBean#afterPropertiesSet()} will be
* called on it. Similarly, if the returned consumer endpoint is a {@link Lifecycle},
* then {@link Lifecycle#start()} will be called on it.
* @param name the name of the destination
* @param group the consumer group
* @param inputChannel the channel to be bound
* @param properties the {@link ConsumerProperties} of the binding
* @return the Binding for the channel
* @throws BinderException on internal errors during binding
*/
@Override
public final Binding<MessageChannel> doBindConsumer(String name, String group,
MessageChannel inputChannel, final C properties) throws BinderException {
MessageProducer consumerEndpoint = null;
try {
ConsumerDestination destination = this.provisioningProvider
.provisionConsumerDestination(name, group, properties);
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
enhanceMessageChannel(inputChannel);
}
consumerEndpoint = createConsumerEndpoint(destination, group, properties);
consumerEndpoint.setOutputChannel(inputChannel);
this.consumerCustomizer.configure(consumerEndpoint, name, group);
if (consumerEndpoint instanceof InitializingBean initializingConsumerEndpoint) {
initializingConsumerEndpoint.afterPropertiesSet();
}
if (properties.isAutoStartup() && consumerEndpoint instanceof Lifecycle consumerEndpointWithLifecycle) {
consumerEndpointWithLifecycle.start();
}
BindingServiceProperties bsp = this.getBindingServiceProperties();
Binding<MessageChannel> binding = new DefaultBinding<MessageChannel>(name,
group, inputChannel, consumerEndpoint instanceof Lifecycle consumerEndpointWithLifecycle
? consumerEndpointWithLifecycle : null) {
@Override
public Map<String, Object> getExtendedInfo() {
return doGetExtendedInfo(destination, properties);
}
@SuppressWarnings({ "unchecked", "hiding" })
public <P> P getExtension() {
if (properties instanceof ExtendedConsumerProperties extendedProperties) {
return (P) extendedProperties.getExtension();
}
return null;
}
@Override
public boolean isInput() {
return true;
}
@Override
public String getBinderName() {
return resolveBinderName(getBindingName(), bsp);
}
@Override
public String getBinderType() {
return resolveBinderType(getBindingName(), bsp);
}
@Override
public Map<String, Object> getAdditionalConfigurationProperties() {
return doGetAdditionalConfigurationProperties(this.getName());
}
@Override
protected void afterUnbind() {
try {
if (getEndpoint() instanceof DisposableBean disposableEndpoint) {
disposableEndpoint.destroy();
}
}
catch (Exception e) {
AbstractMessageChannelBinder.this.logger.error(
"Exception thrown while unbinding " + toString(), e);
}
afterUnbindConsumer(destination, this.group, properties);
destroyErrorInfrastructure(destination, this.group, properties);
}
};
doPublishEvent(new BindingCreatedEvent(binding));
return binding;
}
catch (Exception e) {
if (consumerEndpoint instanceof Lifecycle consumerEndpointWithLifecycle) {
consumerEndpointWithLifecycle.stop();
}
if (e instanceof BinderException binderException) {
throw binderException;
}
else if (e instanceof ProvisioningException provisioningException) {
throw provisioningException;
}
else {
throw new BinderException("Exception thrown while starting consumer: ", e);
}
}
}
/**
* This method must be implemented by an individual binders to produce an
* immutable version of additional configuration properties primarily for testing and diagnosing/debugging issues.
* @return map pf additional configuration properties as key/vaalue.
*/
protected Map<String, Object> doGetAdditionalConfigurationProperties(String name) {
logger.warn("This method must be implemented by an individual binders to produce " +
"an immutable version of additional configuration properties primarily for testing and diagnosing/debugging issues");
return null;
}
@Override
public Binding<PollableSource<MessageHandler>> bindPollableConsumer(String name,
String group, final PollableSource<MessageHandler> inboundBindTarget,
C properties) {
Assert.isInstanceOf(DefaultPollableMessageSource.class, inboundBindTarget);
DefaultPollableMessageSource bindingTarget = (DefaultPollableMessageSource) inboundBindTarget;
ConsumerDestination destination = this.provisioningProvider
.provisionConsumerDestination(name, group, properties);
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
bindingTarget.addInterceptor(0, this.embeddedHeadersChannelInterceptor);
}
final PolledConsumerResources resources = createPolledConsumerResources(name,
group, destination, properties);
MessageSource<?> messageSource = resources.getSource();
if (messageSource instanceof BeanFactoryAware beanFactoryAwareMessageSource) {
beanFactoryAwareMessageSource.setBeanFactory(getApplicationContext().getBeanFactory());
}
bindingTarget.setSource(messageSource);
if (resources.getErrorInfrastructure() != null) {
if (resources.getErrorInfrastructure().getErrorChannel() != null) {
bindingTarget.setErrorChannel(
resources.getErrorInfrastructure().getErrorChannel());
}
ErrorMessageStrategy ems = getErrorMessageStrategy();
if (ems != null) {
bindingTarget.setErrorMessageStrategy(ems);
}
}
if (properties.getMaxAttempts() > 1) {
bindingTarget.setRetryTemplate(buildRetryTemplate(properties));
bindingTarget.setRecoveryCallback(getPolledConsumerRecoveryCallback(
resources.getErrorInfrastructure(), properties));
}
postProcessPollableSource(bindingTarget);
if (properties.isAutoStartup() && resources.getSource() instanceof Lifecycle sourceWithLifecycle) {
sourceWithLifecycle.start();
}
BindingServiceProperties bsp = this.getBindingServiceProperties();
Binding<PollableSource<MessageHandler>> binding = new DefaultBinding<PollableSource<MessageHandler>>(
name, group, inboundBindTarget, resources.getSource() instanceof Lifecycle sourceWithLifecycle
? sourceWithLifecycle : null) {
@Override
public Map<String, Object> getExtendedInfo() {
return doGetExtendedInfo(destination, properties);
}
@Override
public boolean isInput() {
return true;
}
@Override
public String getBinderName() {
return resolveBinderName(properties.getBindingName(), bsp);
}
@Override
public String getBinderType() {
return resolveBinderType(properties.getBindingName(), bsp);
}
@Override
public void afterUnbind() {
afterUnbindConsumer(destination, this.group, properties);
destroyErrorInfrastructure(destination, this.group, properties);
}
};
doPublishEvent(new BindingCreatedEvent(binding));
return binding;
}
protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget) {
}
/**
* Implementations can override the default {@link ErrorMessageSendingRecoverer}.
* @param errorInfrastructure the infrastructure.
* @param properties the consumer properties.
* @return the recoverer.
*/
protected RecoveryCallback<Object> getPolledConsumerRecoveryCallback(
ErrorInfrastructure errorInfrastructure, C properties) {
return errorInfrastructure.getRecoverer();
}
protected PolledConsumerResources createPolledConsumerResources(String name,
String group, ConsumerDestination destination, C consumerProperties) {
throw new UnsupportedOperationException(
"This binder does not support pollable consumers");
}
private void enhanceMessageChannel(MessageChannel inputChannel) {
((AbstractMessageChannel) inputChannel).addInterceptor(0,
this.embeddedHeadersChannelInterceptor);
}
/**
* Creates {@link MessageProducer} that receives data from the consumer destination.
* will be started and stopped by the binder.
* @param group the consumer group
* @param destination reference to the consumer destination
* @param properties the consumer properties
* @return the consumer endpoint.
* @throws Exception when consumer endpoint creation failed.
*/
protected abstract MessageProducer createConsumerEndpoint(
ConsumerDestination destination, String group, C properties) throws Exception;
/**
* Invoked after the unbinding of a consumer. The binder implementation can override
* this method to provide their own logic (e.g. for cleaning up destinations).
* @param destination the consumer destination
* @param group the consumer group
* @param consumerProperties the consumer properties
*/
protected void afterUnbindConsumer(ConsumerDestination destination, String group,
C consumerProperties) {
}
/**
* Register an error channel for the destination when an async send error is received.
* Bridge the channel to the global error channel (if present).
* @param destination the destination.
* @return the channel.
*/
private SubscribableChannel registerErrorInfrastructure(
ProducerDestination destination, String bindingName, boolean errorHandlerDefinitionAvailable) {
String errorChannelName = errorsBaseName(destination, bindingName);
SubscribableChannel errorChannel = new PublishSubscribeChannel();
if (getApplicationContext().containsBean(errorChannelName)) {
Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
if (!(errorChannelObject instanceof SubscribableChannel)) {
throw new IllegalStateException("Error channel '" + errorChannelName
+ "' must be a SubscribableChannel");
}
if (errorChannelObject instanceof DirectChannel) {
errorChannelName = "bridged." + errorChannelName;
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannel((MessageChannel) errorChannelObject);
errorChannel.subscribe(bridge);
}
}
else {
((GenericApplicationContext) getApplicationContext()).registerBean(
errorChannelName, SubscribableChannel.class, () -> errorChannel);
}
MessageChannel defaultErrorChannel = null;
if (!errorHandlerDefinitionAvailable && getApplicationContext()
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
defaultErrorChannel = getApplicationContext().getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
MessageChannel.class);
}
if (defaultErrorChannel != null) {
BridgeHandler errorBridge = new BridgeHandler();
errorBridge.setOutputChannel(defaultErrorChannel);
errorChannel.subscribe(errorBridge);
String errorBridgeHandlerName = getErrorBridgeName(destination, bindingName);
((GenericApplicationContext) getApplicationContext()).registerBean(
errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
}
return errorChannel;
}
/**
* Build an errorChannelRecoverer that writes to a pub/sub channel for the destination
* when an exception is thrown to a consumer.
* @param destination the destination.
* @param group the group.
* @param consumerProperties the properties.
* @return the ErrorInfrastructure which is a holder for the error channel, the
* recoverer and the message handler that is subscribed to the channel.
*/
protected final ErrorInfrastructure registerErrorInfrastructure(
ConsumerDestination destination, String group, C consumerProperties) {
return registerErrorInfrastructure(destination, group, consumerProperties, false);
}
private boolean subscribeFunctionErrorHandler(String errorChannelName, String bindingName) {
if (!StringUtils.hasText(bindingName)) {
return false;
}
BindingServiceProperties bsp = this.getBindingServiceProperties();
if (bsp != null) {
BindingProperties bp = bsp.getBindingProperties(bindingName);
if (bp != null && StringUtils.hasText(bp.getErrorHandlerDefinition())) {
FunctionCatalog catalog = getApplicationContext().getBean(FunctionCatalog.class);
Consumer<ErrorMessage> errorHandler = catalog.lookup(Consumer.class, bp.getErrorHandlerDefinition());
if (errorHandler == null) {
logger.warn("Failed to retrieve error handling function with definition: " + bp.getErrorHandlerDefinition() + ", for binding: " + bindingName);
return false;
}
else {
SubscribableChannel functionErrorChannel = getApplicationContext().getBean(errorChannelName, SubscribableChannel.class);
functionErrorChannel.subscribe(errorMessage -> errorHandler.accept((ErrorMessage) errorMessage));
return true;
}
}
}
return false;
}
/**
* Build an errorChannelRecoverer that writes to a pub/sub channel for the destination
* when an exception is thrown to a consumer.
* @param destination the destination.
* @param group the group.
* @param consumerProperties the properties.
* @param polled true if this is for a polled consumer.
* @return the ErrorInfrastructure which is a holder for the error channel, the
* recoverer and the message handler that is subscribed to the channel.
*/
protected final ErrorInfrastructure registerErrorInfrastructure(
ConsumerDestination destination, String group, C consumerProperties,
boolean polled) {
ErrorMessageStrategy errorMessageStrategy = getErrorMessageStrategy();
String errorChannelName = errorsBaseName(destination, group, consumerProperties);
BindingServiceProperties bsp = this.getBindingServiceProperties();
FunctionInvocationWrapper userErrorHandler = null;
String errorHandlerDefinition = null;
if (bsp != null && StringUtils.hasText(consumerProperties.getBindingName())) {
BindingProperties bp = bsp.getBindingProperties(consumerProperties.getBindingName());
errorHandlerDefinition = bp.getErrorHandlerDefinition();
FunctionCatalog catalog = getApplicationContext().getBean(FunctionCatalog.class);
if (StringUtils.hasText(errorHandlerDefinition)) {
userErrorHandler = catalog.lookup(errorHandlerDefinition);
if (!(userErrorHandler != null && userErrorHandler.getFunctionDefinition().equals(errorHandlerDefinition))) {
userErrorHandler = null;
}
}
}
AbstractSubscribableChannel binderErrorChannel;
if (this.getApplicationContext().containsBean(errorChannelName)) {
binderErrorChannel = this.getApplicationContext().getBean(errorChannelName, AbstractSubscribableChannel.class);
}
else {
binderErrorChannel = new BinderErrorChannel();
binderErrorChannel.setComponentName(errorChannelName);
((GenericApplicationContext) getApplicationContext()).registerBean(
errorChannelName, SubscribableChannel.class, () -> binderErrorChannel);
}
boolean userHandlerSubscribed = this.subscribeFunctionErrorHandler(errorChannelName, consumerProperties.getBindingName());
ErrorMessageSendingRecoverer recoverer = new ErrorMessageSendingRecoverer(binderErrorChannel, errorMessageStrategy);
String recovererBeanName = getErrorRecovererName(destination, group, consumerProperties);
if (!getApplicationContext().containsBean(recovererBeanName)) {
((GenericApplicationContext) getApplicationContext()).registerBean(
recovererBeanName, ErrorMessageSendingRecoverer.class, () -> recoverer);
}
MessageHandler binderProvidedErrorHandler = polled
? getPolledConsumerErrorMessageHandler(destination, group, consumerProperties)
: getErrorMessageHandler(destination, group, consumerProperties);
String errorMessageHandlerName = getErrorMessageHandlerName(destination, group,
consumerProperties);
if (binderProvidedErrorHandler == null) {
binderProvidedErrorHandler = this.getDefaultErrorMessageHandler(binderErrorChannel, polled);
}
if (binderProvidedErrorHandler != null && !userHandlerSubscribed) {
if (this.isSubscribable(binderErrorChannel) && !getApplicationContext().containsBean(errorMessageHandlerName)) {
MessageHandler h = binderProvidedErrorHandler;
((GenericApplicationContext) getApplicationContext()).registerBean(errorMessageHandlerName, MessageHandler.class, () -> h);
binderErrorChannel.subscribe(binderProvidedErrorHandler);
}
else {
this.logger.warn("The provided errorChannel '" + errorChannelName
+ "' is an instance of DirectChannel, "
+ "so no more subscribers could be added which may affect DLQ processing. "
+ "Resolution: Configure your own errorChannel as "
+ "an instance of PublishSubscribeChannel");
}
}
// Setup a bridge to global errorChannel to ensure logging of errors could be controlled via standard SI way
if (this.getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) && this.isSubscribable(binderErrorChannel)) {
String errorBridgeHandlerName = getErrorBridgeName(destination, group, consumerProperties);
if (!getApplicationContext().containsBean(errorBridgeHandlerName)) {
SubscribableChannel globalErrorChannel = this.getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannel(globalErrorChannel);
binderErrorChannel.subscribe(bridge);
((GenericApplicationContext) getApplicationContext()).registerBean(errorBridgeHandlerName, BridgeHandler.class, () -> bridge);
}
}
return new ErrorInfrastructure(binderErrorChannel, recoverer, binderProvidedErrorHandler);
}
private boolean isSubscribable(SubscribableChannel errorChannel) {
if (errorChannel instanceof PublishSubscribeChannel) {
return true;
}
return !(errorChannel instanceof AbstractSubscribableChannel subscribableErrorChannel)
|| subscribableErrorChannel.getSubscriberCount() == 0;
}
private void destroyErrorInfrastructure(ProducerDestination destination, String bindingName) {
String errorChannelName = errorsBaseName(destination, bindingName);
String errorBridgeHandlerName = getErrorBridgeName(destination, bindingName);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext()
.getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorBridgeHandlerName);
}
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorChannelName);
}
}
private void destroyErrorInfrastructure(ConsumerDestination destination, String group,
C properties) {
try {
String recoverer = getErrorRecovererName(destination, group, properties);
destroyBean(recoverer);
String errorChannelName = errorsBaseName(destination, group, properties);
String errorMessageHandlerName = getErrorMessageHandlerName(destination,
group, properties);
String errorBridgeHandlerName = getErrorBridgeName(destination, group,
properties);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
MessageHandler.class);
}
MessageHandler handler = null;
if (getApplicationContext().containsBean(errorMessageHandlerName)) {
handler = getApplicationContext().getBean(errorMessageHandlerName,
MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext()
.getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
destroyBean(errorBridgeHandlerName);
}
if (handler != null) {
channel.unsubscribe(handler);
destroyBean(errorMessageHandlerName);
}
destroyBean(errorChannelName);
}
}
catch (IllegalStateException e) {
// context is shutting down.
}
}
private void destroyBean(String beanName) {
if (getApplicationContext().containsBeanDefinition(beanName)) {
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(beanName);
((GenericApplicationContext) getApplicationContext())
.removeBeanDefinition(beanName);
}
}
/**
* Binders can return a message handler to be subscribed to the error channel.
* Examples might be if the user wishes to (re)publish messages to a DLQ.
* @param destination the destination.
* @param group the group.
* @param consumerProperties the properties.
* @return the handler (may be null, which is the default, causing the exception to be
* rethrown).
*/
protected MessageHandler getErrorMessageHandler(ConsumerDestination destination,
String group, C consumerProperties) {
return null;
}
/**
* Binders can return a message handler to be subscribed to the error channel.
* Examples might be if the user wishes to (re)publish messages to a DLQ.
* @param destination the destination.
* @param group the group.
* @param consumerProperties the properties.
* @return the handler (may be null, which is the default, causing the exception to be
* rethrown).
*/
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group, C consumerProperties) {
return null;
}
/**
* Return the default error message handler, which throws the error message payload to
* the caller if there are no user handlers subscribed. The handler is ordered so it
* runs after any user-defined handlers that are subscribed.
* @param errorChannel the error channel.
* @param defaultErrorChannelPresent true if the context has a default 'errorChannel'.
* @return the handler.
*/
protected MessageHandler getDefaultErrorMessageHandler(
SubscribableChannel errorChannel, boolean defaultErrorChannelPresent) {
return new FinalRethrowingErrorMessageHandler();
}
/**
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
* binder implementations typically might add extra headers to the error message.
* @return the implementation - may be null.
*/
protected ErrorMessageStrategy getErrorMessageStrategy() {
return new DefaultErrorMessageStrategy();
}
protected String getErrorRecovererName(ConsumerDestination destination, String group,
C consumerProperties) {
return errorsBaseName(destination, group, consumerProperties) + ".recoverer";
}
protected String getErrorMessageHandlerName(ConsumerDestination destination,
String group, C consumerProperties) {
return errorsBaseName(destination, group, consumerProperties) + ".handler";
}
protected String getErrorBridgeName(ConsumerDestination destination, String group,
C consumerProperties) {
return errorsBaseName(destination, group, consumerProperties) + ".bridge";
}
protected String errorsBaseName(ConsumerDestination destination, String group,
C consumerProperties) {
return this.doErrorBaseName(consumerProperties.getBindingName());
}
protected String getErrorBridgeName(ProducerDestination destination, String bindingName) {
return errorsBaseName(destination, bindingName) + ".bridge" + destination.hashCode();
}
protected String errorsBaseName(ProducerDestination destination, String bindingName) {
return this.doErrorBaseName(bindingName);
}
private String doErrorBaseName(String bindingName) {
return this.getBinderIdentity() + "." + bindingName + ".errors";
}
private Map<String, Object> doGetExtendedInfo(Object destination, Object properties) {
Map<String, Object> extendedInfo = new LinkedHashMap<>();
extendedInfo.put("bindingDestination", destination.toString());
extendedInfo.put(properties.getClass().getSimpleName(),
this.objectMapper.convertValue(properties, Map.class));
return extendedInfo;
}
private void doPublishEvent(ApplicationEvent event) {
if (this.applicationEventPublisher != null) {
try {
this.applicationEventPublisher.publishEvent(event);
}
catch (Exception e) {
logger.warn("Failed while publishing event " + event + ". "
+ "From the framework perspective this is harmless and typically "
+ "happens when use implement custom ApplicationListener");
logger.debug(e);
}
}
}
protected static class ErrorInfrastructure {
private final SubscribableChannel errorChannel;
private final ErrorMessageSendingRecoverer recoverer;
private final MessageHandler handler;
ErrorInfrastructure(SubscribableChannel errorChannel,
ErrorMessageSendingRecoverer recoverer, MessageHandler handler) {
this.errorChannel = errorChannel;
this.recoverer = recoverer;
this.handler = handler;
}
public SubscribableChannel getErrorChannel() {
return this.errorChannel;
}
public ErrorMessageSendingRecoverer getRecoverer() {
return this.recoverer;
}
public MessageHandler getHandler() {
return this.handler;
}
}
private static final class EmbeddedHeadersChannelInterceptor
implements ChannelInterceptor {
protected final Log logger;
EmbeddedHeadersChannelInterceptor(Log logger) {
this.logger = logger;
}
@Override
@SuppressWarnings("unchecked")
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (message.getPayload() instanceof byte[] messagePayload
&& !message.getHeaders()
.containsKey(BinderHeaders.NATIVE_HEADERS_PRESENT)
&& EmbeddedHeaderUtils
.mayHaveEmbeddedHeaders(messagePayload)) {
MessageValues messageValues;
try {
messageValues = EmbeddedHeaderUtils
.extractHeaders((Message<byte[]>) message, true);
}
catch (Exception e) {
/*
* debug() rather then error() since we don't know for sure that it
* really is a message with embedded headers, it just meets the
* criteria in EmbeddedHeaderUtils.mayHaveEmbeddedHeaders().
*/
if (this.logger.isDebugEnabled()) {
this.logger.debug(
EmbeddedHeaderUtils.decodeExceptionMessage(message), e);
}
messageValues = new MessageValues(message);
}
return messageValues.toMessage();
}
return message;
}
}
protected static class PolledConsumerResources {
private final MessageSource<?> source;
private final ErrorInfrastructure errorInfrastructure;
public PolledConsumerResources(MessageSource<?> source,
ErrorInfrastructure errorInfrastructure) {
this.source = source;
this.errorInfrastructure = errorInfrastructure;
}
MessageSource<?> getSource() {
return this.source;
}
ErrorInfrastructure getErrorInfrastructure() {
return this.errorInfrastructure;
}
}
private final class SendingHandler extends AbstractMessageHandler
implements Lifecycle {
private final boolean embedHeaders;
private final String[] embeddedHeaders;
private final MessageHandler delegate;
private final boolean useNativeEncoding;
private SendingHandler(MessageHandler delegate, boolean embedHeaders,
String[] headersToEmbed, boolean useNativeEncoding) {
this.delegate = delegate;
setBeanFactory(AbstractMessageChannelBinder.this.getBeanFactory());
this.embedHeaders = embedHeaders;
this.embeddedHeaders = headersToEmbed;
this.useNativeEncoding = useNativeEncoding;
}
@Override
protected void handleMessageInternal(Message<?> message) {
Message<?> messageToSend = (this.useNativeEncoding) ? message
: serializeAndEmbedHeadersIfApplicable(message);
this.delegate.handleMessage(messageToSend);
}
private Message<?> serializeAndEmbedHeadersIfApplicable(Message<?> message) {
MessageValues transformed = new MessageValues(message);
Object payload;
if (this.embedHeaders) {
Object contentType = transformed.get(MessageHeaders.CONTENT_TYPE);
// transform content type headers to String, so that they can be properly
// embedded in JSON
if (contentType != null) {
transformed.put(MessageHeaders.CONTENT_TYPE, contentType.toString());
}
payload = EmbeddedHeaderUtils.embedHeaders(transformed,
this.embeddedHeaders);
}
else {
payload = transformed.getPayload();
}
return getMessageBuilderFactory().withPayload(payload)
.copyHeaders(transformed.getHeaders()).build();
}
@Override
public void start() {
if (this.delegate instanceof Lifecycle delegateWithLifecycle) {
delegateWithLifecycle.start();
}
}
@Override
public void stop() {
if (this.delegate instanceof Lifecycle delegateWithLifecycle) {
delegateWithLifecycle.stop();
}
}
@Override
public boolean isRunning() {
return this.delegate instanceof Lifecycle delegateWithLifecycle
&& delegateWithLifecycle.isRunning();
}
}
@SuppressWarnings("serial")
private static class ExpressionSerializer extends StdSerializer<Expression> {
protected ExpressionSerializer(Class<Expression> t) {
super(t);
}
public void serialize(Expression value, JsonGenerator gen, SerializationContext provider) throws JacksonException {
gen.writeString(value.getExpressionString());
}
}
}