KafkaStreamsFunctionCompositionTests.java

/*
 * Copyright 2021-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.Assert;

import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(topics = {"fooFuncanotherFooFunc-out-0", "bar"})
class KafkaStreamsFunctionCompositionTests {

	private static Consumer<String, String> consumer;

	private static final CountDownLatch countDownLatch1 = new CountDownLatch(1);
	private static final CountDownLatch countDownLatch2 = new CountDownLatch(1);
	private static final CountDownLatch countDownLatch3 = new CountDownLatch(2);

	private static EmbeddedKafkaBroker embeddedKafka;

	@BeforeAll
	public static void setUp() {
		embeddedKafka = EmbeddedKafkaCondition.getBroker();
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("fn-composition-group", "false",
				embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		consumer = cf.createConsumer();
		embeddedKafka.consumeFromEmbeddedTopics(consumer, "fooFuncanotherFooFunc-out-0", "bar");
	}

	@AfterAll
	public static void tearDown() {
		consumer.close();
	}

	@Test
	void basicFunctionCompositionWithDefaultDestination() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig1.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.function.definition=fooFunc|anotherFooFunc;anotherProcess",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
				template.setDefaultTopic("fooFuncanotherFooFunc-in-0");
				template.sendDefault("foobar!!");
				//Verify non-composed funcions can be run standalone with composed function chains, i.e foo|bar;buzz
				template.setDefaultTopic("anotherProcess-in-0");
				template.sendDefault("this is crazy!!!");
				Thread.sleep(1000);

				ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "fooFuncanotherFooFunc-out-0");
				assertThat(cr.value().contains("foobar!!")).isTrue();

				Assert.isTrue(countDownLatch1.await(5, TimeUnit.SECONDS), "anotherProcess consumer didn't trigger.");
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void basicFunctionCompositionWithDestinaion() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig1.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.function.definition=fooFunc|anotherFooFunc;anotherProcess",
				"--spring.cloud.stream.bindings.fooFuncanotherFooFunc-in-0.destination=foo",
				"--spring.cloud.stream.bindings.fooFuncanotherFooFunc-out-0.destination=bar",
				"--spring.cloud.stream.bindings.anotherProcess-in-0.destination=buzz",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
				template.setDefaultTopic("foo");
				template.sendDefault("foobar!!");
				template.setDefaultTopic("buzz");
				template.sendDefault("this is crazy!!!");
				Thread.sleep(1000);

				ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "bar");
				assertThat(cr.value().contains("foobar!!")).isTrue();

				Assert.isTrue(countDownLatch1.await(5, TimeUnit.SECONDS), "anotherProcess consumer didn't trigger.");
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void testFunctionToConsumerComposition() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig2.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.function.definition=fooFunc|anotherProcess",
				"--spring.cloud.stream.bindings.fooFuncanotherProcess-in-0.destination=foo",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
				template.setDefaultTopic("foo");
				template.sendDefault("foobar!!");

				Thread.sleep(1000);

				Assert.isTrue(countDownLatch2.await(5, TimeUnit.SECONDS), "anotherProcess consumer didn't trigger.");
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void testBiFunctionToConsumerComposition() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig3.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.function.definition=fooBiFunc|anotherProcess",
				"--spring.cloud.stream.bindings.fooBiFuncanotherProcess-in-0.destination=foo",
				"--spring.cloud.stream.bindings.fooBiFuncanotherProcess-in-1.destination=foo-1",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
				template.setDefaultTopic("foo");
				template.sendDefault("foobar!!");

				template.setDefaultTopic("foo-1");
				template.sendDefault("another foobar!!");

				Thread.sleep(1000);

				Assert.isTrue(countDownLatch3.await(5, TimeUnit.SECONDS), "anotherProcess consumer didn't trigger.");
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void chainedFunctionsAsComposed() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig4.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.stream.kafka.streams.binder.applicationId=my-app-id",
				"--spring.cloud.function.definition=fooBiFunc|anotherFooFunc|yetAnotherFooFunc|lastFunctionInChain",
				"--spring.cloud.stream.function.bindings.fooBiFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-0=input1",
				"--spring.cloud.stream.function.bindings.fooBiFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-1=input2",
				"--spring.cloud.stream.function.bindings.fooBiFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-out-0=output",
				"--spring.cloud.stream.bindings.input1.destination=my-foo-1",
				"--spring.cloud.stream.bindings.input2.destination=my-foo-2",
				"--spring.cloud.stream.bindings.output.destination=bar",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);

				template.setDefaultTopic("my-foo-2");
				template.sendDefault("foo-1", "foo2");

				template.setDefaultTopic("my-foo-1");
				template.sendDefault("foo-1", "foo1");

				Thread.sleep(1000);

				final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
				assertThat(records.iterator().hasNext()).isTrue();
				assertThat(records.iterator().next().value().equals("foo1foo2From-anotherFooFuncFrom-yetAnotherFooFuncFrom-lastFunctionInChain")).isTrue();
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void firstFunctionCurriedThenComposeWithOtherFunctions() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig5.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.stream.kafka.streams.binder.applicationId=my-app-id-xyz",
				"--spring.cloud.function.definition=curriedFunc|anotherFooFunc|yetAnotherFooFunc|lastFunctionInChain",
				"--spring.cloud.stream.function.bindings.curriedFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-0=input1",
				"--spring.cloud.stream.function.bindings.curriedFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-in-1=input2",
				"--spring.cloud.stream.function.bindings.curriedFuncanotherFooFuncyetAnotherFooFunclastFunctionInChain-out-0=output",
				"--spring.cloud.stream.bindings.input1.destination=my-foo-1",
				"--spring.cloud.stream.bindings.input2.destination=my-foo-2",
				"--spring.cloud.stream.bindings.output.destination=bar",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);

				template.setDefaultTopic("my-foo-2");
				template.sendDefault("foo-1", "foo2");

				Thread.sleep(1000);

				template.setDefaultTopic("my-foo-1");
				template.sendDefault("foo-1", "foo1");

				Thread.sleep(1000);

				final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
				assertThat(records.iterator().hasNext()).isTrue();
				assertThat(records.iterator().next().value().equals("foo1foo2From-anotherFooFuncFrom-yetAnotherFooFuncFrom-lastFunctionInChain")).isTrue();
			}
			finally {
				pf.destroy();
			}
		}
	}

	@Test
	void testFunctionToConsumerCompositionWithFunctionProducesKTable() throws InterruptedException {
		SpringApplication app = new SpringApplication(FunctionCompositionConfig6.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		try (ConfigurableApplicationContext context = app.run(
				"--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.function.definition=fooFunc|anotherProcess",
				"--spring.cloud.stream.bindings.fooFuncanotherProcess-in-0.destination=foo",
				"--spring.cloud.stream.bindings.fooFuncanotherProcess-out-0.destination=bar",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
			try {
				KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
				template.setDefaultTopic("foo");
				template.sendDefault("foo", "foobar!!");

				Thread.sleep(1000);

				ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "bar");
				assertThat(cr.value().contains("foobar!!")).isTrue();
			}
			finally {
				pf.destroy();
			}
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig1 {

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> fooFunc() {
			return input -> input.peek((s, s2) -> {
				System.out.println("hello: " + s2);
			});
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> anotherFooFunc() {
			return input -> input.peek((s, s2) -> System.out.println("hello Foo: " + s2));
		}

		@Bean
		public java.util.function.Consumer<KStream<String, String>> anotherProcess() {
			return c -> c.foreach((s, s2) -> {
				System.out.println("s2s2s2::" + s2);
				countDownLatch1.countDown();
			});
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig2 {

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> fooFunc() {
			return input -> input.peek((s, s2) -> {
				System.out.println("hello: " + s2);
			});
		}

		@Bean
		public java.util.function.Consumer<KStream<String, String>> anotherProcess() {
			return c -> c.foreach((s, s2) -> {
				System.out.println("s2s2s2::" + s2);
				countDownLatch2.countDown();
			});
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig3 {

		@Bean
		public BiFunction<KStream<String, String>, KStream<String, String>, KStream<String, String>> fooBiFunc() {
			return KStream::merge;
		}

		@Bean
		public java.util.function.Consumer<KStream<String, String>> anotherProcess() {
			return c -> c.foreach((s, s2) -> {
				System.out.println("s2s2s2::" + s2);
				countDownLatch3.countDown();
			});
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig4 {

		@Bean
		public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> fooBiFunc() {
			return (a, b) -> a.join(b, (value1, value2) -> value1 + value2);
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> anotherFooFunc() {
			return input -> input.mapValues(value -> value + "From-anotherFooFunc");
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> yetAnotherFooFunc() {
			return input -> input.mapValues(value -> value + "From-yetAnotherFooFunc");
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> lastFunctionInChain() {
			return input -> input.mapValues(value -> value + "From-lastFunctionInChain");
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig5 {

		@Bean
		public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFunc() {
			return a -> b ->
					a.join(b, (value1, value2) -> value1 + value2);
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> anotherFooFunc() {
			return input -> input.mapValues(value -> value + "From-anotherFooFunc");
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> yetAnotherFooFunc() {
			return input -> input.mapValues(value -> value + "From-yetAnotherFooFunc");
		}

		@Bean
		public Function<KStream<String, String>, KStream<String, String>> lastFunctionInChain() {
			return input -> input.mapValues(value -> value + "From-lastFunctionInChain");
		}
	}

	@EnableAutoConfiguration
	public static class FunctionCompositionConfig6 {

		@Bean
		public Function<KStream<String, String>, KTable<String, String>> fooFunc() {
			return ks -> {
				ks.foreach(new ForeachAction<String, String>() {
					@Override
					public void apply(String key, String value) {
						System.out.println();
					}
				});
				return ks.toTable();
			};
		}

		@Bean
		public Function<KTable<String, String>, KStream<String, String>> anotherProcess() {
			return KTable::toStream;
		}
	}
}