KafkaStreamsBinderBootstrapTest.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams.bootstrap;

import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;

import tools.jackson.databind.JavaType;
import tools.jackson.databind.type.TypeFactory;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

/**
 * Integration tests to verify the bootstrap of a SpringBoot application using the KafkaStreams binder.
 *
 * @author Soby Chacko
 * @author Eduard Dom��nguez
 * @author Chris Bono
 */
@EmbeddedKafka
class KafkaStreamsBinderBootstrapTest {

	private static EmbeddedKafkaBroker embeddedKafka;

	@BeforeEach
	public void before() {
		embeddedKafka = EmbeddedKafkaCondition.getBroker();
		System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
	}

	@ParameterizedTest
	@ValueSource(booleans = { false, true })
	void kafkaStreamsBinderWithStandardConfigCanStart(boolean excludeKafkaAutoConfig) {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleKafkaStreamsApplication.class)
			.web(WebApplicationType.NONE).run(
				"--spring.cloud.function.definition=input1;input2;input3",
				"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
					+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
				"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
					+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
				"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
					+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
				"--spring.cloud.stream.kafka.streams.binder.brokers="
					+ embeddedKafka.getBrokersAsString(),
				excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off
		} // @checkstyle:on
	}

	@ParameterizedTest
	@ValueSource(booleans = { false, true })
	void kafkaStreamsBinderWithCustomConfigCanStart(boolean excludeKafkaAutoConfig) {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleKafkaStreamsApplication.class)
			.web(WebApplicationType.NONE).run(
				"--spring.cloud.function.definition=input1;input2;input3",
				"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
					+ "=testKStreamBinderWithCustomEnvironmentCanStart",
				"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
					+ "=testKStreamBinderWithCustomEnvironmentCanStart-foo",
				"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
					+ "=testKStreamBinderWithCustomEnvironmentCanStart-foobar",
				"--spring.cloud.stream.bindings.input1-in-0.destination=foo",
				"--spring.cloud.stream.bindings.input1-in-0.binder=kstreamBinder",
				"--spring.cloud.stream.binders.kstreamBinder.type=kstream",
				"--spring.cloud.stream.binders.kstreamBinder.environment"
					+ ".spring.cloud.stream.kafka.streams.binder.brokers"
					+ "=" + embeddedKafka.getBrokersAsString(),
				"--spring.cloud.stream.bindings.input2-in-0.destination=bar",
				"--spring.cloud.stream.bindings.input2-in-0.binder=ktableBinder",
				"--spring.cloud.stream.binders.ktableBinder.type=ktable",
				"--spring.cloud.stream.binders.ktableBinder.environment"
					+ ".spring.cloud.stream.kafka.streams.binder.brokers"
					+ "=" + embeddedKafka.getBrokersAsString(),
				"--spring.cloud.stream.bindings.input3-in-0.destination=foobar",
				"--spring.cloud.stream.bindings.input3-in-0.binder=globalktableBinder",
				"--spring.cloud.stream.binders.globalktableBinder.type=globalktable",
				"--spring.cloud.stream.binders.globalktableBinder.environment"
					+ ".spring.cloud.stream.kafka.streams.binder.brokers"
					+ "=" + embeddedKafka.getBrokersAsString(),
				excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off
		} // @checkstyle:on
	}

	private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) {
		return excludeKafkaAutoConfig ?
			"--spring.autoconfigure.exclude=org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration" : "foo=bar";
	}

	@Test
	@SuppressWarnings("unchecked")
	void streamConfigGlobalProperties_GH1149() {
		ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
				SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run(
				"--spring.cloud.function.definition=input1;input2;input3",
				"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
						+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
				"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
						+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
				"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType",
				"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
						+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
				"--spring.cloud.stream.kafka.streams.binder.brokers="
						+ embeddedKafka.getBrokersAsString());

		Map<String, Object> streamConfigGlobalProperties = applicationContext
				.getBean("streamConfigGlobalProperties", Map.class);
		// Make sure that global stream configs do not contain individual binding config set on second function.
		assertThat(streamConfigGlobalProperties.containsKey("spring.json.value.type.method")).isFalse();

		// Make sure that only input2 function gets the specific binding property set on it.
		final StreamsBuilderFactoryBean input1SBFB = applicationContext.getBean("&stream-builder-input1", StreamsBuilderFactoryBean.class);
		final Properties streamsConfiguration1 = input1SBFB.getStreamsConfiguration();
		assertThat(streamsConfiguration1.containsKey("spring.json.value.type.method")).isFalse();

		final StreamsBuilderFactoryBean input2SBFB = applicationContext.getBean("&stream-builder-input2", StreamsBuilderFactoryBean.class);
		final Properties streamsConfiguration2 = input2SBFB.getStreamsConfiguration();
		assertThat(streamsConfiguration2.containsKey("spring.json.value.type.method")).isTrue();

		final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class);
		final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration();
		assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
		applicationContext.getBean(KeyValueSerdeResolver.class);

		//TODO: In Kafka Streams 3.1, taskTopology field is removed. Re-evaluate this testing strategy.
//		String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
//				.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
//
//		assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
//
//		String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
//				.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
//		assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
//
//		String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
//				.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
//		assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);

		applicationContext.close();
	}

	public static JavaType determineType(byte[] data, Headers headers) {
		return TypeFactory.createDefaultInstance().constructParametricType(Map.class, String.class, String.class);
	}

	@EnableAutoConfiguration
	@Configuration
	static class SimpleKafkaStreamsApplication extends BaseConfig {

		@Bean
		public Consumer<KStream<Object, String>> input1() {
			return s -> {
				// No-op consumer
			};
		}

		@Bean
		public Consumer<KTable<Map<String, String>, Map<String, String>>> input2() {
			return s -> {
				// No-op consumer
			};
		}

	}

	// Testing the scenario reported by https://github.com/spring-cloud/spring-cloud-stream/issues/2737
	static class BaseConfig {

		@Bean
		public Consumer<GlobalKTable<Object, String>> input3() {
			return s -> {
				// No-op consumer
			};
		}
	}


}