FunctionConfiguration.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.function;

import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import reactor.util.function.Tuples;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PollableBean;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties.PollerProperties;
import org.springframework.cloud.stream.binding.BindableProxyFactory;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.binding.SupportedBindableFeatures;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.type.MethodMetadata;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/**
 * @author Oleg Zhurakousky
 * @author David Turanski
 * @author Ilayaperumal Gopinathan
 * @author Soby Chacko
 * @author Chris Bono
 * @author Byungjun You
 * @author Ivan Shapoval
 * @author Patrik P��ter S��li
 * @author Artem Bilan
 * @since 2.1
 */
@Lazy(false)
@AutoConfiguration
@EnableConfigurationProperties(StreamFunctionConfigurationProperties.class)
@Import({ BinderFactoryAutoConfiguration.class })
@AutoConfigureBefore(BindingServiceConfiguration.class)
@AutoConfigureAfter(ContextFunctionCatalogAutoConfiguration.class)
@ConditionalOnBean(FunctionRegistry.class)
public class FunctionConfiguration {

	private static final boolean isContextPropagationPresent = ClassUtils.isPresent(
		"io.micrometer.context.ContextSnapshot", FunctionConfiguration.class.getClassLoader());


	@SuppressWarnings("rawtypes")
	@Bean
	public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog,
			BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext,
			@Nullable NewDestinationBindingCallback callback,
			ObjectProvider<ObservationRegistry> observationRegistries) {
		return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback,
				observationRegistries);
	}

	@Bean
	public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog,
			StreamFunctionProperties streamFunctionProperties) {
		return new FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
	}

	@Bean
	public InitializingBean functionInitializer(FunctionCatalog functionCatalog,
												StreamFunctionProperties functionProperties,
												BindingServiceProperties serviceProperties, ConfigurableApplicationContext applicationContext,
												StreamBridge streamBridge) {
		return new FunctionToDestinationBinder(functionCatalog, functionProperties,
				serviceProperties, streamBridge);
	}

	/*
	 * Binding initializer responsible only for Suppliers
	 */
	@SuppressWarnings({"rawtypes", "unchecked"})
	@Bean
	InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties,
			GenericApplicationContext context, BindingServiceProperties serviceProperties,
			@Nullable List<BindableFunctionProxyFactory> proxyFactories, StreamBridge streamBridge,
			TaskScheduler taskScheduler) {

		if (CollectionUtils.isEmpty(proxyFactories)) {
			return null;
		}

		return () -> {
			for (BindableFunctionProxyFactory proxyFactory : proxyFactories) {
				FunctionInvocationWrapper functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition());
				if (functionWrapper != null && functionWrapper.isSupplier()) {
					// gather output content types
					List<String> contentTypes = new ArrayList<String>();
					if (proxyFactory.getOutputs().size() == 0) {
						return;
					}
					Assert.isTrue(proxyFactory.getOutputs().size() == 1, "Supplier with multiple outputs is not supported at the moment.");
					String outputName  = proxyFactory.getOutputs().iterator().next();

					BindingProperties bindingProperties = serviceProperties.getBindingProperties(outputName);
					ProducerProperties producerProperties = bindingProperties.getProducer();
					if (!(bindingProperties.getProducer() != null && producerProperties.isUseNativeEncoding())) {
						contentTypes.add(bindingProperties.getContentType());
					}

					// see https://github.com/spring-cloud/spring-cloud-stream/issues/2027
					String functionDefinition = proxyFactory.getFunctionDefinition();
					if (!StringUtils.hasText(functionDefinition)) {
						continue;
					}
					String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");

					Function supplier = null;
					Function function = null;
					if (!ObjectUtils.isEmpty(functionNames) && functionNames.length > 1) {
						String supplierName = functionNames[0];
						String remainingFunctionDefinition = StringUtils
								.arrayToCommaDelimitedString(Arrays.copyOfRange(functionNames, 1, functionNames.length));

						supplier = functionCatalog.lookup(supplierName);
						function = functionCatalog.lookup(remainingFunctionDefinition, contentTypes.toArray(new String[0]));

						if (!((FunctionInvocationWrapper) supplier).isOutputTypePublisher() &&
								((FunctionInvocationWrapper) function).isInputTypePublisher()) {
							functionWrapper = null;
						}
						else {
							functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
						}
					}
					else {
						functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0]));
					}

					if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) {
						String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow";

						PollableBean pollable = null;
						try {
							pollable = extractPollableAnnotation(functionProperties, context, proxyFactory);
						}
						catch (Exception e) {
							// Will fix itself once https://github.com/spring-projects/spring-framework/issues/28748 is fixed
						}

						if (functionWrapper != null) {
							FunctionInvocationWrapper postProcessor = functionWrapper;
							IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties),
									pollable, context, taskScheduler, producerProperties, outputName)
									.intercept(new ChannelInterceptor() {
										public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
											postProcessor.postProcess();
										}
									})
									.route(Message.class, message -> {
										if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
											String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination");
											return streamBridge.resolveDestination(destinationName, producerProperties, null);
										}
										return outputName;
									})
									.get();
							IntegrationFlow postProcessedFlow = (IntegrationFlow) context.getAutowireCapableBeanFactory()
									.initializeBean(integrationFlow, integrationFlowName);
							context.registerBean(integrationFlowName, IntegrationFlow.class, () -> {
								return postProcessedFlow;
							});
						}
						else {
							IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties),
									pollable, context, taskScheduler, producerProperties, outputName)
									.channel(c -> c.direct())
									.fluxTransform((Function<? super Flux<Message<Object>>, ? extends Publisher<Object>>) function)
									.route(Message.class, message -> {
										if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
											String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination");
											return streamBridge.resolveDestination(destinationName, producerProperties, null);
										}
										return outputName;
									})
									.get();
							IntegrationFlow postProcessedFlow = (IntegrationFlow) context.getAutowireCapableBeanFactory()
									.initializeBean(integrationFlow, integrationFlowName);
							context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow);
						}
					}
				}
			}
		};
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> supplier,
			PollableBean pollable, GenericApplicationContext context,
			TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) {

		IntegrationFlowBuilder integrationFlowBuilder;

		boolean splittable = pollable != null
				&& (boolean) AnnotationUtils.getAnnotationAttributes(pollable).get("splittable");

		FunctionInvocationWrapper function =
			(supplier instanceof PartitionAwareFunctionWrapper partitionAwareFunctionWrapper)
				? (FunctionInvocationWrapper) partitionAwareFunctionWrapper.function : (FunctionInvocationWrapper) supplier;
		boolean reactive = FunctionTypeUtils.isPublisher(function.getOutputType());

		if (pollable == null && reactive) {
			Publisher publisher = (Publisher) supplier.get();
			publisher = publisher instanceof Mono
					? ((Mono) publisher).map(this::wrapToMessageIfNecessary)
					: ((Flux) publisher).map(this::wrapToMessageIfNecessary);

			// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
			DirectWithAttributesChannel messageChannel = context.getBean(bindingName, DirectWithAttributesChannel.class);
			FluxMessageChannel reactiveChannel = new FluxMessageChannel();
			reactiveChannel.subscribeTo(publisher);
			messageChannel.setAttribute(DirectWithAttributesChannel.COMPANION_ATTR, reactiveChannel);
			integrationFlowBuilder = IntegrationFlow.from((MessageChannel) reactiveChannel);

			// see https://github.com/spring-cloud/spring-cloud-stream/issues/1863 for details about the following code
			taskScheduler.schedule(() -> { }, Instant.now()); // will keep AC alive
		}
		else { // implies pollable
			AtomicReference<PollerMetadata> pollerMetadata = new AtomicReference<>();
			if (producerProperties != null && producerProperties.getPoller() != null) {
				PollerProperties poller = producerProperties.getPoller();

				PollerMetadata pm = new PollerMetadata();
				PropertyMapper map = PropertyMapper.get();
				map.from(poller::getMaxMessagesPerPoll).to(pm::setMaxMessagesPerPoll);
				map.from(poller).as(this::asTrigger).to(pm::setTrigger);
				pollerMetadata.set(pm);
			}

			boolean autoStartup = producerProperties == null || producerProperties.isAutoStartup();
			integrationFlowBuilder = pollerMetadata == null
					? IntegrationFlow.fromSupplier(supplier,
							spca -> spca.id(bindingName + "_spca").autoStartup(autoStartup))
					: IntegrationFlow.fromSupplier(supplier, spca -> spca.id(bindingName + "_spca")
							.poller(pollerMetadata.get()).autoStartup(autoStartup));

			// only apply the PollableBean attributes if this is a reactive function.
			if (splittable && reactive) {
				integrationFlowBuilder = integrationFlowBuilder.split();
			}
		}

		return integrationFlowBuilder;
	}

	private Trigger asTrigger(PollerProperties poller) {
		if (StringUtils.hasText(poller.getCron())) {
			return new CronTrigger(poller.getCron());
		}
		return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay());
	}

	private Trigger createPeriodicTrigger(Duration period, Duration initialDelay) {
		PeriodicTrigger trigger = new PeriodicTrigger(period);
		if (initialDelay != null) {
			trigger.setInitialDelay(initialDelay);
		}
		return trigger;
	}

	private PollableBean extractPollableAnnotation(StreamFunctionProperties functionProperties, GenericApplicationContext context,
			BindableFunctionProxyFactory proxyFactory) {
		// here we need to ensure that for cases where composition is defined we only look for supplier method to find Pollable annotation.
		String supplierFunctionName = StringUtils
				.delimitedListToStringArray(proxyFactory.getFunctionDefinition().replaceAll(",", "|").trim(), "|")[0];
		BeanDefinition bd = context.getBeanDefinition(supplierFunctionName);
		if (!(bd instanceof RootBeanDefinition rootBeanDefinition)) {
			return null;
		}

		Method factoryMethod = rootBeanDefinition.getResolvedFactoryMethod();
		if (factoryMethod == null) {
			Object source = bd.getSource();
			if (source instanceof MethodMetadata methodMetadata) {
				Class<?> factory = ClassUtils.resolveClassName(methodMetadata.getDeclaringClassName(), null);
				Class<?>[] params = FunctionContextUtils.getParamTypesFromBeanDefinitionFactory(factory, (RootBeanDefinition) bd, methodMetadata.getMethodName());
				factoryMethod = ReflectionUtils.findMethod(factory, methodMetadata.getMethodName(), params);
			}
		}
		Assert.notNull(factoryMethod, "Failed to introspect factory method since it was not discovered for function '"
				+ functionProperties.getDefinition() + "'");
		return factoryMethod.getReturnType().isAssignableFrom(Supplier.class)
				? AnnotationUtils.findAnnotation(factoryMethod, PollableBean.class)
						: null;
	}


	@SuppressWarnings("unchecked")
	private <T> Message<T> wrapToMessageIfNecessary(T value) {
		return value instanceof Message message
				? message
						: MessageBuilder.withPayload(value).build();
	}

	private static <P> Message<P> sanitize(Message<P> inputMessage) {
		return MessageBuilder
			.fromMessage(inputMessage)
			.removeHeader("spring.cloud.stream.sendto.destination")
//			.setHeader(MessageUtils.SOURCE_TYPE, inputMessage.getHeaders().get(MessageUtils.TARGET_PROTOCOL))
//			.removeHeader(MessageUtils.TARGET_PROTOCOL)
			.build();
	}

	private static class FunctionToDestinationBinder implements InitializingBean, ApplicationContextAware {

		protected final Log logger = LogFactory.getLog(getClass());

		private GenericApplicationContext applicationContext;

		private BindableProxyFactory[] bindableProxyFactories;

		private final FunctionCatalog functionCatalog;

		private final StreamFunctionProperties functionProperties;

		private final BindingServiceProperties serviceProperties;

		private final StreamBridge streamBridge;

		FunctionToDestinationBinder(FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties,
				BindingServiceProperties serviceProperties, StreamBridge streamBridge) {
			this.functionCatalog = functionCatalog;
			this.functionProperties = functionProperties;
			this.serviceProperties = serviceProperties;
			this.streamBridge = streamBridge;
		}

		@Override
		public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
			this.applicationContext = (GenericApplicationContext) applicationContext;
		}

		@Override
		public void afterPropertiesSet() throws Exception {
			Map<String, BindableProxyFactory> beansOfType = applicationContext.getBeansOfType(BindableProxyFactory.class);
			this.bindableProxyFactories = beansOfType.values().toArray(new BindableProxyFactory[0]);
			for (BindableProxyFactory bindableProxyFactory : this.bindableProxyFactories) {
				String functionDefinition = bindableProxyFactory instanceof BindableFunctionProxyFactory functionFactory
					&& functionFactory.isFunctionExist()
						? functionFactory.getFunctionDefinition()
								: null; /*this.functionProperties.getDefinition();*/

				boolean shouldNotProcess = false;
				if (!(bindableProxyFactory instanceof BindableFunctionProxyFactory)) {
					Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
					shouldNotProcess = !CollectionUtils.isEmpty(outputBindingNames)
							&& outputBindingNames.iterator().next().equals("applicationMetrics");
				}
				if (StringUtils.hasText(functionDefinition) && !shouldNotProcess) {
					FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
					if (function != null && !function.isSupplier() && functionDefinition.equals(function.getFunctionDefinition())) {
						this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition, applicationContext.getEnvironment());
					}
				}
			}
		}

		@SuppressWarnings({ "rawtypes", "unchecked" })
		private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactory, String functionDefinition, ConfigurableEnvironment environment) {
			this.assertBindingIsPossible(bindableProxyFactory);

			Set<String> inputBindingNames = bindableProxyFactory.getInputs();
			Set<String> outputBindingNames = bindableProxyFactory.getOutputs();

			String[] outputContentTypes = outputBindingNames.stream()
					.map(bindingName -> this.serviceProperties.getBindings().get(bindingName).getContentType())
					.toArray(String[]::new);

			FunctionInvocationWrapper function = this.functionCatalog.lookup(functionDefinition, outputContentTypes);
			this.assertSupportedSignatures(bindableProxyFactory, function);


			if (this.functionProperties.isComposeFrom()) {
				AbstractSubscribableChannel outputChannel = this.applicationContext.getBean(outputBindingNames.iterator().next(), AbstractSubscribableChannel.class);
				logger.info("Composing at the head of output destination: " + outputChannel.getBeanName());
				String outputChannelName = ((AbstractMessageChannel) outputChannel).getBeanName();
				DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel();
				newOutputChannel.setAttribute("type", "output");
				newOutputChannel.setComponentName("output.extended");
				this.applicationContext.registerBean("output.extended", MessageChannel.class, () -> newOutputChannel);
				bindableProxyFactory.replaceOutputChannel(outputChannelName, "output.extended", newOutputChannel);
				inputBindingNames = Collections.singleton("output");
			}

			if (isReactiveOrMultipleInputOutput(bindableProxyFactory, function.getInputType(), function.getOutputType())) {
				AtomicReference<Function<Message<?>, Message<?>>> targetProtocolEnhancer = new AtomicReference<>();
				if (!CollectionUtils.isEmpty(outputBindingNames)) {
					String outputBindingName = outputBindingNames.iterator().next(); // TODO only gets the first one
					String binderConfigurationName = this.serviceProperties.getBinder(outputBindingName);
					BinderFactory binderFactory = applicationContext.getBean(BinderFactory.class);
					final Boolean reactive = functionProperties.getReactive().get(functionDefinition);
					final boolean reactiveFn = reactive != null && reactive;
					Class<?> bindableType = MessageChannel.class;
					if (reactiveFn) {
						bindableType = FluxMessageChannel.class;
					}
					Object binder = binderFactory.getBinder(binderConfigurationName, bindableType);
					//String targetProtocol = binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
					Field headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
					headersField.setAccessible(true);
					targetProtocolEnhancer.set(message -> {
						Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
								.getField(headersField, ((Message) message).getHeaders());
						//headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
						if (CloudEventMessageUtils.isCloudEvent((message))) {
							headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
						}
						return message;
					});
				}

				Publisher[] inputPublishers = inputBindingNames.stream().map(inputBindingName -> {
					BindingProperties bindingProperties = this.serviceProperties.getBindings().get(inputBindingName);
					ConsumerProperties consumerProperties = bindingProperties == null ? null : bindingProperties.getConsumer();
					if (consumerProperties != null) {
						function.setSkipInputConversion(consumerProperties.isUseNativeDecoding());
						if (consumerProperties.getConcurrency() > 1) {
							this.logger.warn("When using concurrency > 1 in reactive contexts, please make sure that you are using a " +
								"reactive binder that supports concurrency settings. Otherwise, concurrency settings > 1 will be ignored when " +
								"using reactive types.");
						}
					}
					MessageChannel inputChannel = this.applicationContext.getBean(inputBindingName, MessageChannel.class);
					return IntegrationReactiveUtils.messageChannelToFlux(inputChannel).map(m -> {
						if (m != null) {
							m = sanitize(m);
						}
						return m;
					});
				})
				.map(publisher -> {
					if (targetProtocolEnhancer.get() != null) {
						return publisher.map(targetProtocolEnhancer.get());
					}
					else {
						return publisher;
					}
				})
				.toArray(Publisher[]::new);

				Function functionToInvoke = function;
				if (!CollectionUtils.isEmpty(outputBindingNames)) {
					BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next());
					ProducerProperties producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
					if (producerProperties != null) {
						function.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
					}
					functionToInvoke = new PartitionAwareFunctionWrapper(function, this.applicationContext, producerProperties);
					// If we have a multi-output scenario, we will do any message enrichment (aka, determining the outbound
					// partition) via the corresponding reactive Flux types. Currently, we support multiple output
					// bindings for reactive types only (Tuples).
					if (outputBindingNames.size() > 1) {
						((PartitionAwareFunctionWrapper) functionToInvoke).setMessageEnricherEnabled(false);
					}
				}
				Object resultPublishers = functionToInvoke.apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray(inputPublishers));
				if (!(resultPublishers instanceof Iterable)) {
					resultPublishers = Collections.singletonList(resultPublishers);
				}
				Iterator<String> outputBindingIter = outputBindingNames.iterator();
				long outputCount = StreamSupport.stream(((Iterable) resultPublishers).spliterator(), false).count();

				((Iterable) resultPublishers).forEach(publisher -> {
					Flux flux = Flux.from((Publisher) publisher);
					if (!CollectionUtils.isEmpty(outputBindingNames)) {
						String outputBinding = outputBindingIter.next();
						MessageChannel outputChannel = this.applicationContext.getBean(outputBinding, MessageChannel.class);
						flux = flux.doOnNext(message -> {
							// If there are more than 1 output bindings, then ensure that we properly calculate the partitions
							// based on information from the correct output binding properties.
							if (outputCount > 1) {
								Integer partitionId = determinePartitionForOutputBinding(outputBinding, message);
								message = MessageBuilder
									.fromMessage((Message<?>) message)
									.setHeader(BinderHeaders.PARTITION_HEADER, partitionId).build();
							}
							if (message instanceof Message m && m.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
								String destinationName = (String) m.getHeaders().get("spring.cloud.stream.sendto.destination");
								ProducerProperties producerProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next()).getProducer();
								MessageChannel dynamicChannel = streamBridge.resolveDestination(destinationName, producerProperties, null);
								if (logger.isInfoEnabled()) {
									logger.info("Output message is sent to '" + destinationName + "' destination");
								}
								dynamicChannel.send(m);
							}
							else {
								if (!(message instanceof Message)) {
									message = MessageBuilder.withPayload(message).build();
								}
								if (isContextPropagationPresent && outputChannel instanceof FluxMessageChannel) {
									ContextView reactorContext = StaticMessageHeaderAccessor.getReactorContext((Message) message);
									try (AutoCloseable autoCloseable = ContextSnapshotHelper.setContext(reactorContext)) {
										outputChannel.send((Message) message);
									}
									catch (Exception e) {

									}
								}
								else {
									outputChannel.send((Message) message);
								}

							}
						})
						.doOnError(e -> {
							logger.error("Failure was detected during execution of the reactive function '" +  functionDefinition + "'");
							((Throwable) e).printStackTrace();
						});
					}
					if (!function.isConsumer()) {
						flux.subscribe();
					}
				});
			}
			else {
				String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, function.isConsumer());
				if (!ObjectUtils.isEmpty(inputBindingNames)) {
					String inputDestinationName = inputBindingNames.iterator().next();
					Object inputDestination = this.applicationContext.getBean(inputDestinationName);
					if (inputDestination != null && inputDestination instanceof SubscribableChannel) {
						AbstractMessageHandler handler = createFunctionHandler(function, inputDestinationName, outputDestinationName);
						((SubscribableChannel) inputDestination).subscribe(handler);
					}
				}
			}
		}

		private Integer determinePartitionForOutputBinding(String outputBinding, Object message) {
			BindingProperties bindingProperties = FunctionToDestinationBinder.this.serviceProperties.getBindings().get(outputBinding);
			ProducerProperties producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
			if (producerProperties != null && producerProperties.isPartitioned()) {
				StandardEvaluationContext evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.applicationContext.getBeanFactory());
				PartitionHandler partitionHandler = new PartitionHandler(evaluationContext, producerProperties, this.applicationContext.getBeanFactory());
				if (message instanceof Message) {
					return partitionHandler.determinePartition((Message<?>) message);
				}
			}
			return null;
		}

		private AbstractMessageHandler createFunctionHandler(FunctionInvocationWrapper function,
				String inputChannelName, String outputChannelName) {
			ConsumerProperties consumerProperties = StringUtils.hasText(inputChannelName)
					? this.serviceProperties.getBindingProperties(inputChannelName).getConsumer()
							: null;
			ProducerProperties producerProperties = StringUtils.hasText(outputChannelName)
					? this.serviceProperties.getBindingProperties(outputChannelName).getProducer()
							: null;

			FunctionWrapper functionInvocationWrapper = (new FunctionWrapper(function, consumerProperties,
					producerProperties, applicationContext, this.determineTargetProtocol(outputChannelName)));

			MessagingTemplate template = new MessagingTemplate();
			template.setBeanFactory(applicationContext.getBeanFactory());

			AbstractMessageHandler handler = new AbstractMessageHandler() {
				@SuppressWarnings("unchecked")
				@Override
				public void handleMessageInternal(Message<?> message) throws MessagingException {
					Object result = functionInvocationWrapper.apply((Message<byte[]>) message);
					if (result == null) {
						logger.debug("Function execution resulted in null. No message will be sent");
						return;
					}
					if (result instanceof Iterable<?> iterableResult) {
						for (Object resultElement : iterableResult) {
							this.doSendMessage(resultElement, message);
						}
					}
					else if (ObjectUtils.isArray(result) && !(result instanceof byte[])) {
						for (int i = 0; i < ((Object[]) result).length; i++) {
							this.doSendMessage(((Object[]) result)[i], message);
						}
					}
					else {
						this.doSendMessage(result, message);
					}
				}

				private void doSendMessage(Object result, Message<?> requestMessage) {
					if (result instanceof Message<?> messageResult && messageResult.getHeaders().get("spring.cloud.stream.sendto.destination") != null) {
						String destinationName = (String) messageResult.getHeaders().get("spring.cloud.stream.sendto.destination");
						MessageChannel outputChannel = streamBridge.resolveDestination(destinationName, producerProperties, null);
						BindingProperties bindingProperties = serviceProperties.getBindingProperties(destinationName);
						ProducerProperties sendToBindingProducerProperties = bindingProperties.getProducer();
						if (sendToBindingProducerProperties != null && sendToBindingProducerProperties.isPartitioned()) {
							((AbstractMessageChannel) outputChannel)
								.addInterceptor(new DefaultPartitioningInterceptor(bindingProperties, applicationContext.getBeanFactory()));
						}
						if (logger.isInfoEnabled()) {
							logger.info("Output message is sent to '" + destinationName + "' destination");
						}
						outputChannel.send(messageResult);
					}
					else if (StringUtils.hasText(outputChannelName)) {
						if (!(result instanceof Message)) {
							result = MessageBuilder.withPayload(result).copyHeadersIfAbsent(requestMessage.getHeaders()).build();
						}
						template.send(outputChannelName, (Message<?>) result);
					}
					else if (function.isRoutingFunction()) {
						if (!(result instanceof Message)) {
							result = MessageBuilder.withPayload(result).copyHeadersIfAbsent(requestMessage.getHeaders()).build();
						}
						streamBridge.send(function.getFunctionDefinition() + "-out-0", result);
					}
					function.postProcess();
				}

			};

			handler.setBeanFactory(this.applicationContext);
			handler.afterPropertiesSet();
			return handler;
		}

		private String determineTargetProtocol(String outputBindingName) {
			if (StringUtils.hasText(outputBindingName)) {
				String binderConfigurationName = this.serviceProperties.getBinder(outputBindingName);
				BinderFactory binderFactory = applicationContext.getBean(BinderFactory.class);
				Object binder = binderFactory.getBinder(binderConfigurationName, MessageChannel.class);
				return binder.getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
			}
			return null;
		}

		private boolean isReactiveOrMultipleInputOutput(BindableProxyFactory bindableProxyFactory, Type inputType, Type outputType) {
			boolean reactiveInputsOutputs = FunctionTypeUtils.isPublisher(inputType) ||
					FunctionTypeUtils.isPublisher(outputType);
			return isMultipleInputOutput(bindableProxyFactory) || reactiveInputsOutputs;
		}

		private String determineOutputDestinationName(int index, BindableProxyFactory bindableProxyFactory, boolean isConsumer) {
			List<String> outputNames = new ArrayList<>(bindableProxyFactory.getOutputs());
			if (CollectionUtils.isEmpty(outputNames)) {
				outputNames = Collections.singletonList("output");
			}
			return bindableProxyFactory instanceof BindableFunctionProxyFactory
					? ((BindableFunctionProxyFactory) bindableProxyFactory).getOutputName(index)
							: (isConsumer ? null : outputNames.get(index));
		}

		private void assertBindingIsPossible(BindableProxyFactory bindableProxyFactory) {
			if (this.isMultipleInputOutput(bindableProxyFactory)) {
				Assert.isTrue(!functionProperties.isComposeTo() && !functionProperties.isComposeFrom(),
						"Composing to/from existing Sinks and Sources are not supported for functions with multiple arguments.");
			}
		}

		private boolean isMultipleInputOutput(BindableProxyFactory bindableProxyFactory) {
			return bindableProxyFactory instanceof BindableFunctionProxyFactory
					&& ((BindableFunctionProxyFactory) bindableProxyFactory).isMultiple();
		}

		private boolean isArray(Type type) {
			return type instanceof GenericArrayType || type instanceof Class && ((Class<?>) type).isArray();
		}

		private void assertSupportedSignatures(BindableProxyFactory bindableProxyFactory, FunctionInvocationWrapper function) {
			if (this.isMultipleInputOutput(bindableProxyFactory)) {
				Assert.isTrue(!function.isConsumer(),
						"Function '" + functionProperties.getDefinition() + "' is a Consumer which is not supported "
								+ "for multi-in/out reactive streams. Only Functions are supported");
				Assert.isTrue(!function.isSupplier(),
						"Function '" + functionProperties.getDefinition() + "' is a Supplier which is not supported "
								+ "for multi-in/out reactive streams. Only Functions are supported");
				Assert.isTrue(!this.isArray(function.getInputType()) && !this.isArray(function.getOutputType()),
						"Function '" + functionProperties.getDefinition() + "' has the following signature: ["
						+ function + "]. Your input and/or outout lacks arity and therefore we "
								+ "can not determine how many input/output destinations are required in the context of "
								+ "function input/output binding.");
			}
		}

	}

	/**
	 *
	 * It's signatures ensures that within the context of s-c-stream Spring Integration does
	 * not attempt any conversion and sends a raw Message.
	 */
	@SuppressWarnings("rawtypes")
	private static class FunctionWrapper implements Function<Message<byte[]>, Object> {
		private final Function function;

		private final ConsumerProperties consumerProperties;

		private final ProducerProperties producerProperties;

		private final Field headersField;

		private final ConfigurableApplicationContext applicationContext;

		private final boolean isRoutingFunction;

		private final String targetProtocol;

		FunctionWrapper(Function function, ConsumerProperties consumerProperties,
				ProducerProperties producerProperties, ConfigurableApplicationContext applicationContext, String targetProtocol) {

			isRoutingFunction = ((FunctionInvocationWrapper) function).getTarget() instanceof RoutingFunction;
			this.applicationContext = applicationContext;
			this.function = new PartitionAwareFunctionWrapper(function, this.applicationContext, producerProperties);
			this.consumerProperties = consumerProperties;
			if (this.consumerProperties != null) {
				((FunctionInvocationWrapper) function).setSkipInputConversion(this.consumerProperties.isUseNativeDecoding());
			}
			this.producerProperties = producerProperties;
			if (this.producerProperties != null) {
				((FunctionInvocationWrapper) function).setSkipOutputConversion(this.producerProperties.isUseNativeEncoding());
			}
			this.headersField = ReflectionUtils.findField(MessageHeaders.class, "headers");
			this.headersField.setAccessible(true);
			this.targetProtocol = targetProtocol;
		}

		@SuppressWarnings("unchecked")
		@Override
		public Object apply(Message<byte[]> message) {
			message = sanitize(message);
			setHeadersIfNeeded(message);
			Object result = function.apply(message);
			if (result instanceof Publisher && this.isRoutingFunction) {
				throw new IllegalStateException("Routing to functions that return Publisher "
						+ "is not supported in the context of Spring Cloud Stream.");
			}
			if (result instanceof Message<?> resultMessage) {
				setHeadersIfNeeded(resultMessage);
			}
			return result;
		}

		private void setHeadersIfNeeded(Message message) {
			Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
				.getField(this.headersField, message.getHeaders());
//			if (StringUtils.hasText(targetProtocol)) {
//				headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol);
//			}
			if (CloudEventMessageUtils.isCloudEvent(message)) {
				headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
			}
		}

	}

	/**
	 * Creates and registers instances of BindableFunctionProxyFactory for each user defined function
	 * thus triggering destination bindings between function arguments and destinations.
	 *
	 * In other words this class is responsible to do the same work as EnableBinding except that it derives the input/output names
	 * from the names of the function (e.g., function-in-0).
	 */
	private static class FunctionBindingRegistrar implements InitializingBean, ApplicationContextAware, EnvironmentAware {

		protected final Log logger = LogFactory.getLog(getClass());

		private final FunctionCatalog functionCatalog;

		private final StreamFunctionProperties streamFunctionProperties;

		private ConfigurableApplicationContext applicationContext;

		private Environment environment;

		private int inputCount;

		private int outputCount;

		FunctionBindingRegistrar(FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
			this.functionCatalog = functionCatalog;
			this.streamFunctionProperties = streamFunctionProperties;
		}

		@Override
		public void afterPropertiesSet() throws Exception {
			this.determineFunctionName(functionCatalog, environment);

			if (StringUtils.hasText(streamFunctionProperties.getDefinition())) {
				String[] functionDefinitions = this.filterEligibleFunctionDefinitions();
				for (String functionDefinition : functionDefinitions) {

					FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
					if (function != null) {
						if (function.isSupplier()) {
							this.inputCount = 0;
							this.outputCount = this.getOutputCount(function, true);
						}
						else if (function.isConsumer() || function.isRoutingFunction()) {
							this.inputCount = FunctionTypeUtils.getInputCount(function);
							this.outputCount = 0;
						}
						else {
							this.inputCount = FunctionTypeUtils.getInputCount(function);
							if (function.isWrappedBiConsumer()) {
								this.outputCount = 0;
							}
							else {
								this.outputCount = this.getOutputCount(function, false);
							}
						}

						AtomicReference<BindableFunctionProxyFactory> proxyFactory = new AtomicReference<>();
						if (function.isInputTypePublisher()) {
							final SupportedBindableFeatures supportedBindableFeatures = new SupportedBindableFeatures();
							supportedBindableFeatures.setPollable(false);
							supportedBindableFeatures.setReactive(true);

							proxyFactory.set(new BindableFunctionProxyFactory(functionDefinition,
								this.inputCount, this.outputCount, this.streamFunctionProperties, supportedBindableFeatures));
						}
						else {
							proxyFactory.set(new BindableFunctionProxyFactory(functionDefinition,
								this.inputCount, this.outputCount, this.streamFunctionProperties));
						}
						((GenericApplicationContext) this.applicationContext).registerBean(functionDefinition + "_binding",
							BindableFunctionProxyFactory.class, proxyFactory::get);
					}
					else {
						logger.warn("The function definition '" + streamFunctionProperties.getDefinition() +
								"' is not valid. The referenced function bean or one of its components does not exist");
					}
				}
			}

			this.createStandAloneBindingsIfNecessary(applicationContext.getBean(BindingServiceProperties.class));

		}

		private void createStandAloneBindingsIfNecessary(BindingServiceProperties bindingProperties) {
			String[] inputBindings = StringUtils.hasText(bindingProperties.getInputBindings())
					? bindingProperties.getInputBindings().split(";") : new String[0];

			String[] outputBindings = StringUtils.hasText(bindingProperties.getOutputBindings())
					? bindingProperties.getOutputBindings().split(";") : new String[0];

			for (String inputBindingName : inputBindings) {
				FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(inputBindingName);
				if (sourceFunc != null && !sourceFunc.getFunctionDefinition().equals(inputBindingName)) {
					sourceFunc = null;
				}

				if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229
						sourceFunc.isSupplier() ||
						(!sourceFunc.getFunctionDefinition().equals(inputBindingName) && applicationContext.containsBean(inputBindingName))) {

					BindableFunctionProxyFactory proxyFactory = new BindableFunctionProxyFactory(inputBindingName, 1, 0, this.streamFunctionProperties, sourceFunc != null);
					((GenericApplicationContext) this.applicationContext).registerBean(inputBindingName + "_binding_in",
						BindableFunctionProxyFactory.class, () -> proxyFactory);
				}
			}

			for (String outputBindingName : outputBindings) {
				FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(outputBindingName);
				if (sourceFunc != null && !sourceFunc.getFunctionDefinition().equals(outputBindingName)) {
					sourceFunc = null;
				}

				if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229
						sourceFunc.isConsumer() ||
						(!sourceFunc.getFunctionDefinition().equals(outputBindingName) && applicationContext.containsBean(outputBindingName))) {

					BindableFunctionProxyFactory proxyFactory = new BindableFunctionProxyFactory(outputBindingName, 0, 1, this.streamFunctionProperties, sourceFunc != null);
					((GenericApplicationContext) this.applicationContext).registerBean(outputBindingName + "_binding_out",
						BindableFunctionProxyFactory.class, () -> proxyFactory);
				}
			}
		}

		@Override
		public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
			this.applicationContext = (ConfigurableApplicationContext) applicationContext;
		}

		@Override
		public void setEnvironment(Environment environment) {
			this.environment = environment;
		}

		private int getOutputCount(FunctionInvocationWrapper function, boolean isSupplier) {
			int outputCount = FunctionTypeUtils.getOutputCount(function);
			if (!function.isSupplier() && function.getOutputType() instanceof ParameterizedType) {
				Type outputType = function.getOutputType();
				if (FunctionTypeUtils.isMono(outputType) && outputType instanceof ParameterizedType
						&& FunctionTypeUtils.getRawType(((ParameterizedType) outputType).getActualTypeArguments()[0]).equals(Void.class)) {
					outputCount = 0;
				}
				else if (FunctionTypeUtils.getRawType(outputType).equals(Void.class)) {
					outputCount = 0;
				}
			}
			return outputCount;
		}

		private boolean determineFunctionName(FunctionCatalog catalog, Environment environment) {
			boolean autodetect = environment.getProperty("spring.cloud.stream.function.autodetect", boolean.class, true);
			String definition = streamFunctionProperties.getDefinition();
			if (!StringUtils.hasText(definition)) {
				definition = environment.getProperty("spring.cloud.function.definition");
			}

			if (StringUtils.hasText(definition)) {
				streamFunctionProperties.setDefinition(definition);
			}
			else if (Boolean.parseBoolean(environment.getProperty("spring.cloud.stream.function.routing.enabled", "false"))
					|| environment.containsProperty("spring.cloud.function.routing-expression")) {
				streamFunctionProperties.setDefinition(RoutingFunction.FUNCTION_NAME);
			}
			else if (autodetect) {
				FunctionInvocationWrapper function = functionCatalog.lookup("");
				if (function != null) {
					streamFunctionProperties.setDefinition(function.getFunctionDefinition());
				}
			}
			return StringUtils.hasText(streamFunctionProperties.getDefinition());
		}

		/*
		 * This is to accommodate Kafka streams binder, since it does not rely on binding mechanism provided by s-c-stream core.
		 * So we basically filter out any function name who's type contains KTable or KStream.
		 */
		private String[] filterEligibleFunctionDefinitions() {
			List<String> eligibleFunctionDefinitions = new ArrayList<>();
			String[] functionDefinitions = streamFunctionProperties.getDefinition().split(";");
			for (String functionDefinition : functionDefinitions) {
				functionDefinition = functionDefinition.trim();
				String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
				boolean eligibleDefinition = true;
				for (int i = 0; i < functionNames.length && eligibleDefinition; i++) {
					String functionName = functionNames[i];
					if (this.applicationContext.containsBean(functionName)) {
						Object functionBean = this.applicationContext.getBean(functionName);
						Type functionType = FunctionTypeUtils.discoverFunctionType(functionBean, functionName, (GenericApplicationContext) this.applicationContext);
						if (functionType == null) {
							eligibleDefinition = false;
						}
						else {
							String functionTypeStringValue = functionType.toString();
							if (functionTypeStringValue.contains("KTable") || functionTypeStringValue.contains("KStream")) {
								eligibleDefinition = false;
							}
						}
					}
					else {
						logger.warn("You have defined function definition that does not exist: " + functionName);
					}
				}
				if (eligibleDefinition) {
					eligibleFunctionDefinitions.add(functionDefinition);
				}
			}
			return eligibleFunctionDefinitions.toArray(new String[0]);
		}

	}

	private static final class ContextSnapshotHelper {

		private static final ContextSnapshotFactory CONTEXT_SNAPSHOT_FACTORY = ContextSnapshotFactory.builder().build();

		static AutoCloseable setContext(ContextView context) {
			return CONTEXT_SNAPSHOT_FACTORY.setThreadLocalsFrom(context);
		}

	}

}