BindingsLifecycleController.java

/*
 * Copyright 2021-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binding;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import tools.jackson.databind.JacksonModule;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.json.JsonMapper;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinderFactory;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.BindableFunctionProxyFactory;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;


/**
 *
 * Lifecycle controller for the bindings.
 * It is registered as a bean and once injected could be used to control the lifecycle f the bindings.
 *
 * @author Oleg Zhurakousky
 * @author Soby Chacko
 * @since 3.x
 */
public class BindingsLifecycleController implements ApplicationContextAware {

	private final List<InputBindingLifecycle> inputBindingLifecycles;

	private final List<OutputBindingLifecycle> outputBindingsLifecycles;

	private final ObjectMapper objectMapper;

	private ApplicationContext applicationContext;

	@SuppressWarnings("unchecked")
	public BindingsLifecycleController(List<InputBindingLifecycle> inputBindingLifecycles,
			List<OutputBindingLifecycle> outputBindingsLifecycles) {

		Assert.notEmpty(inputBindingLifecycles,
				"'inputBindingLifecycles' must not be null or empty");
		this.inputBindingLifecycles = inputBindingLifecycles;
		this.outputBindingsLifecycles = outputBindingsLifecycles;

		JsonMapper.Builder builder = JsonMapper.builder();

		//this.objectMapper = new ObjectMapper(); //see https://github.com/spring-cloud/spring-cloud-stream/issues/2253
		// we need to use ObjectMapper that could not be modified by the user.

		try {
			Class<? extends JacksonModule> javaTimeModuleClass = (Class<? extends JacksonModule>)
					ClassUtils.forName("tools.jackson.datatype.jsr310.JavaTimeModule", ClassUtils.getDefaultClassLoader());
			JacksonModule javaTimeModule = BeanUtils.instantiateClass(javaTimeModuleClass);
			builder.addModule(javaTimeModule);
		}
		catch (ClassNotFoundException ex) {
			// ignore; jackson-datatype-jsr310 not available
		}
		finally {
			this.objectMapper = builder.build();
		}
	}
	
	/**
	 * Allows to dynamically create a new input binding returning its consumer properties for further customization.
	 * @param <P> the type of consumer properties. For example, if binding derives from Kafka, it will return KafkaConsumerProperties.
	 * @param bindingName the name of the binding.
	 * @param binderName the name of the binder.
	 * @param bindingProperties instance of BindingProperties.
	 * @return instance of the consumer properties.
	 */
	public <P> P createInputBinding(String bindingName, String binderName, BindingProperties bindingProperties) {
		Assert.hasText(bindingProperties.getGroup(), "Anonymous bindings are not allowed for explicit creation. You must define 'group'");
		DefaultBinderFactory binderFactory = this.applicationContext.getBean(DefaultBinderFactory.class);
		Object binder = binderFactory.getBinder(binderName, MessageChannel.class);
		BindingServiceProperties bindingServiceProperties = this.applicationContext.getBean(BindingServiceProperties.class);
		bindingServiceProperties.setBindings(Collections.singletonMap(bindingName, bindingProperties));
		if (binder instanceof ExtendedPropertiesBinder) {
			return this.defineInputBinding(bindingName);
		}
		else {
			throw new IllegalStateException("Binder must be an instance of ExtendedPropertiesBinder");
		}
	}
	
	/**
	 * Allows to dynamically create a new input binding returning its consumer properties for further customization.
	 * @param <P> the type of consumer properties. For example, if binding derives from Kafka, it will return KafkaConsumerProperties.
	 * @param bindingName the name of the binding.
	 * @param binderName the name of the binder.
	 * @param bindingProperties instance of BindingProperties.
	 * @return instance of the consumer properties.
	 */
	public <P> P createOutputBinding(String bindingName, String binderName, BindingProperties bindingProperties) {
		DefaultBinderFactory binderFactory = this.applicationContext.getBean(DefaultBinderFactory.class);
		Object binder = binderFactory.getBinder(binderName, MessageChannel.class);
		BindingServiceProperties bindingServiceProperties = this.applicationContext.getBean(BindingServiceProperties.class);
		bindingServiceProperties.setBindings(Collections.singletonMap(bindingName, bindingProperties));
		if (binder instanceof ExtendedPropertiesBinder) {
			return this.defineOutputBinding(bindingName);
		}
		else {
			throw new IllegalStateException("Binder must be an instance of ExtendedPropertiesBinder");
		}
	}

	/**
	 * Allows to dynamically define a new input binding returning its consumer properties for further customization.
	 * @param <P> the type of consumer properties. For example, if binding derives from Kafka, it will return KafkaConsumerProperties.
	 * @param bindingName the name of the binding.
	 * @return instance of the consumer properties.
	 * @deprecated since 5.0.2
	 */
	@Deprecated
	public <P> P defineInputBinding(String bindingName) {
		BindableFunctionProxyFactory bindingProxyFactory =
				new BindableFunctionProxyFactory(bindingName, 1, 0, this.applicationContext.getBean(StreamFunctionProperties.class), false);
		this.defineBinding(bindingProxyFactory);
		return this.getExtensionProperties(bindingName);
	}

	/**
	 * Allows to dynamically define a new input binding returning its producer properties for further customization.
	 * @param <P> the type of producer properties. For example, if binding derives from Kafka, it will return KafkaProducerProperties.
	 * @param bindingName the name of the binding.
	 * @return instance of the producer properties.
	 *  @deprecated since 5.0.2
	 */
	@Deprecated
	public <P> P defineOutputBinding(String bindingName) {
		BindableFunctionProxyFactory bindingProxyFactory =
				new BindableFunctionProxyFactory(bindingName, 0, 1, this.applicationContext.getBean(StreamFunctionProperties.class), false);
		this.defineBinding(bindingProxyFactory);
		return this.getExtensionProperties(bindingName);
	}

	/**
	 * Will return producer or consumer properties for a specified binding. For example, calling `getExtensionProperties("foo-in-0")`
	 * on Kafka binding  will return an instance of KafkaConsumerProperties.
	 * @param <T> type of producer or consumer properties for a specified binding
	 * @param bindingName name of the binding
	 * @return producer or consumer properties
	 */
	public <T> T getExtensionProperties(String bindingName) {
		List<Binding<?>> locateBinding = BindingsLifecycleController.this.locateBinding(bindingName);
		if (!CollectionUtils.isEmpty(locateBinding)) {
			return locateBinding.get(0).getExtension();
		}
		return null;
	}

	/**
	 * Provide an accessor for the custom ObjectMapper created by this controller.
	 * @return {@link ObjectMapper}
	 * @since 4.1.2
	 */
	public ObjectMapper getObjectMapper() {
		return objectMapper;
	}

	/**
	 * Convenience method to stop the binding with provided `bindingName`.
	 * @param bindingName the name of the binding.
	 */
	public void stop(String bindingName) {
		this.changeState(bindingName, State.STOPPED);
	}

	/**
	 * Convenience method to start the binding with provided `bindingName`.
	 * @param bindingName the name of the binding.
	 */
	public void start(String bindingName) {
		this.changeState(bindingName, State.STARTED);
	}

	/**
	 * Convenience method to pause the binding with provided `bindingName`.
	 * @param bindingName the name of the binding.
	 */
	public void pause(String bindingName) {
		this.changeState(bindingName, State.PAUSED);
	}

	/**
	 * Convenience method to resume the binding with provided `bindingName`.
	 * @param bindingName the name of the binding.
	 */
	public void resume(String bindingName) {
		this.changeState(bindingName, State.RESUMED);
	}

	/**
	 * General purpose method to change the state of the provided binding.
	 * @param bindingName the name of the binding.
	 * @param state the {@link State} you wish to set this binding to
	 */
	public void changeState(String bindingName, State state) {
		var bindingList = BindingsLifecycleController.this.locateBinding(bindingName);
		if (!bindingList.isEmpty()) {
			switch (state) {
				case STARTED -> bindingList.stream().forEach(Binding::start);
				case STOPPED -> bindingList.stream().forEach(Binding::stop);
				case PAUSED -> bindingList.stream().forEach(Binding::pause);
				case RESUMED -> bindingList.stream().forEach(Binding::resume);
				default -> {
				}
			}
		}
	}

	/**
	 * Queries the {@link List} of states for all available bindings. The returned list
	 * consists of {@link Binding} objects which could be further interrogated
	 * using {@link Binding#isPaused()} and {@link Binding#isRunning()}.
	 * @return the list of {@link Binding}s
	 */
	@SuppressWarnings("unchecked")
	public List<Map<String, Object>> queryStates() {
		List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
		bindings.addAll(gatherOutputBindings());
		return this.objectMapper.convertValue(bindings, List.class);
	}

	/**
	 * Queries the individual state of a binding. The returned list
	 * {@link Binding} object could be further interrogated
	 * using {@link Binding#isPaused()} and {@link Binding#isRunning()}.
	 * @return collection of {@link Binding} objects.
	 */
	public List<Binding<?>> queryState(String name) {
		Assert.notNull(name, "'name' must not be null");
		return this.locateBinding(name);
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

	private void defineBinding(BindableFunctionProxyFactory bindingProxyFactory) {
		bindingProxyFactory.setApplicationContext(this.applicationContext);
		bindingProxyFactory.afterPropertiesSet();

		AbstractBindingLifecycle bindingLifecycle = (bindingProxyFactory.getInputs().size() > 0) 
				? this.applicationContext.getBean(InputBindingLifecycle.class)
						: this.applicationContext.getBean(OutputBindingLifecycle.class);
		
		bindingLifecycle.startBindable(bindingProxyFactory);
	}


	/**
	 * Queries for all input {@link Binding}s.
	 * @return the list of input {@link Binding}s
	 */
	@SuppressWarnings("unchecked")
	private List<Binding<?>> gatherInputBindings() {
		List<Binding<?>> inputBindings = new ArrayList<>();
		for (InputBindingLifecycle inputBindingLifecycle : this.inputBindingLifecycles) {
			Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
					inputBindingLifecycle).getPropertyValue("inputBindings");
			inputBindings.addAll(lifecycleInputBindings);
		}
		return inputBindings;
	}

	/**
	 * Queries for all output {@link Binding}s.
	 * @return the list of output {@link Binding}s
	 */
	@SuppressWarnings("unchecked")
	private List<Binding<?>> gatherOutputBindings() {
		List<Binding<?>> outputBindings = new ArrayList<>();
		for (OutputBindingLifecycle inputBindingLifecycle : this.outputBindingsLifecycles) {
			Collection<Binding<?>> lifecycleInputBindings = (Collection<Binding<?>>) new DirectFieldAccessor(
					inputBindingLifecycle).getPropertyValue("outputBindings");
			outputBindings.addAll(lifecycleInputBindings);
		}
		return outputBindings;
	}

	private List<Binding<?>> locateBinding(String name) {
		Stream<Binding<?>> bindings = Stream.concat(this.gatherInputBindings().stream(),
			this.gatherOutputBindings().stream());
		return bindings.filter(binding -> name.equals(binding.getBindingName())).collect(Collectors.toList());
	}

	/**
	 * Binding states.
	 */
	public enum State {

		/**
		 * Started state of a binding.
		 */
		STARTED,

		/**
		 * Stopped state of a binding.
		 */
		STOPPED,

		/**
		 * Paused state of a binding.
		 */
		PAUSED,

		/**
		 * Resumed state of a binding.
		 */
		RESUMED;

	}

}