StreamToTableJoinFunctionTests.java

/*
 * Copyright 2019-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams.function;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.Assert;

import static org.assertj.core.api.Assertions.assertThat;

@EmbeddedKafka(topics = "output-topic-1")
class StreamToTableJoinFunctionTests {

	private static EmbeddedKafkaBroker embeddedKafka;

	@BeforeAll
	public static void setUp() {
		embeddedKafka = EmbeddedKafkaCondition.getBroker();
	}

	@Test
	void streamToTable() {
		SpringApplication app = new SpringApplication(CountClicksPerRegionApplication.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		Consumer<String, Long> consumer;
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-1",
				"false", embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
		DefaultKafkaConsumerFactory<String, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		consumer = cf.createConsumer();
		embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");

		runTest(app, consumer);
	}

	@Test
	void streamToTableBiFunction() {
		SpringApplication app = new SpringApplication(BiFunctionCountClicksPerRegionApplication.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		Consumer<String, Long> consumer;
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-2",
				"false", embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
		DefaultKafkaConsumerFactory<String, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		consumer = cf.createConsumer();
		embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");

		runTest(app, consumer);
	}

	@Test
	void streamToTableBiConsumer() throws Exception {
		SpringApplication app = new SpringApplication(BiConsumerApplication.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		Consumer<String, Long> consumer;
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-2",
				"false", embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
		DefaultKafkaConsumerFactory<String, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		consumer = cf.createConsumer();
		embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");

		try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.stream.bindings.process-in-0.destination=user-clicks-1",
				"--spring.cloud.stream.bindings.process-in-1.destination=user-regions-1",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
				"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.applicationId" +
						"=testStreamToTableBiConsumer",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			// Input 1: Region per user (multiple records allowed per user).
			List<KeyValue<String, String>> userRegions = Arrays.asList(
					new KeyValue<>("alice", "asia")
			);

			Map<String, Object> senderProps1 = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			senderProps1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

			DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(senderProps1);
			KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
			template1.setDefaultTopic("user-regions-1");

			for (KeyValue<String, String> keyValue : userRegions) {
				template1.sendDefault(keyValue.key, keyValue.value);
			}

			// Input 2: Clicks per user (multiple records allowed per user).
			List<KeyValue<String, Long>> userClicks = Arrays.asList(
					new KeyValue<>("alice", 13L)
			);

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

			DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(senderProps);
			KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
			template.setDefaultTopic("user-clicks-1");

			for (KeyValue<String, Long> keyValue : userClicks) {
				template.sendDefault(keyValue.key, keyValue.value);
			}

			Assert.isTrue(BiConsumerApplication.latch.await(10, TimeUnit.SECONDS), "Failed to receive message");

		}
		finally {
			consumer.close();
		}
	}

	private void runTest(SpringApplication app, Consumer<String, Long> consumer) {
		try (ConfigurableApplicationContext context = app.run("--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.stream.bindings.process-in-0.destination=user-clicks-1",
				"--spring.cloud.stream.bindings.process-in-1.destination=user-regions-1",
				"--spring.cloud.stream.bindings.process-out-0.destination=output-topic-1",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
				"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.applicationId" +
						"=StreamToTableJoinFunctionTests-abc",
				"--spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.topic.properties.cleanup.policy=compact",
				"--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.topic.properties.cleanup.policy=compact",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {

			// Input 1: Region per user (multiple records allowed per user).
			List<KeyValue<String, String>> userRegions = Arrays.asList(
					new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
					new KeyValue<>("bob", "americas"),
					new KeyValue<>("chao", "asia"),
					new KeyValue<>("dave", "europe"),
					new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
					new KeyValue<>("eve", "americas"),
					new KeyValue<>("fang", "asia")
			);

			Map<String, Object> senderProps1 = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			senderProps1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

			DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(senderProps1);
			KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
			template1.setDefaultTopic("user-regions-1");

			for (KeyValue<String, String> keyValue : userRegions) {
				template1.sendDefault(keyValue.key, keyValue.value);
			}

			// Input 2: Clicks per user (multiple records allowed per user).
			List<KeyValue<String, Long>> userClicks = Arrays.asList(
					new KeyValue<>("alice", 13L),
					new KeyValue<>("bob", 4L),
					new KeyValue<>("chao", 25L),
					new KeyValue<>("bob", 19L),
					new KeyValue<>("dave", 56L),
					new KeyValue<>("eve", 78L),
					new KeyValue<>("alice", 40L),
					new KeyValue<>("fang", 99L)
			);

			Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

			DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(senderProps);
			KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
			template.setDefaultTopic("user-clicks-1");

			for (KeyValue<String, Long> keyValue : userClicks) {
				template.sendDefault(keyValue.key, keyValue.value);
			}

			List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
					new KeyValue<>("americas", 101L),
					new KeyValue<>("europe", 109L),
					new KeyValue<>("asia", 124L)
			);

			//Verify that we receive the expected data
			int count = 0;
			long start = System.currentTimeMillis();
			List<KeyValue<String, Long>> actualClicksPerRegion = new ArrayList<>();
			do {
				ConsumerRecords<String, Long> records = KafkaTestUtils.getRecords(consumer);
				count = count + records.count();
				for (ConsumerRecord<String, Long> record : records) {
					actualClicksPerRegion.add(new KeyValue<>(record.key(), record.value()));
				}
			} while (count < expectedClicksPerRegion.size() && (System.currentTimeMillis() - start) < 30000);

			assertThat(count == expectedClicksPerRegion.size()).isTrue();
			assertThat(actualClicksPerRegion).hasSameElementsAs(expectedClicksPerRegion);

			// Testing certain ancillary configuration of GlobalKTable around topics creation.
			// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
			BinderFactory binderFactory = context.getBeanFactory()
					.getBean(BinderFactory.class);

			Binder<KTable, ? extends ConsumerProperties, ? extends ProducerProperties> ktableBinder = binderFactory
					.getBinder("ktable", KTable.class);

			KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) ktableBinder)
					.getExtendedConsumerProperties("process-in-1");
			String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");

			assertThat(cleanupPolicyX).isEqualTo("compact");

			Binder<KStream, ? extends ConsumerProperties, ? extends ProducerProperties> kStreamBinder = binderFactory
					.getBinder("kstream", KStream.class);

			KafkaStreamsProducerProperties producerProperties = (KafkaStreamsProducerProperties) ((ExtendedPropertiesBinder) kStreamBinder)
					.getExtendedProducerProperties("process-out-0");

			String cleanupPolicyOutput = producerProperties.getTopic().getProperties().get("cleanup.policy");

			assertThat(cleanupPolicyOutput).isEqualTo("compact");
		}
		finally {
			consumer.close();
		}
	}

//	@Test
	public void testGlobalStartOffsetWithLatestAndIndividualBindingWthEarliest() throws Exception {
		SpringApplication app = new SpringApplication(BiFunctionCountClicksPerRegionApplication.class);
		app.setWebApplicationType(WebApplicationType.NONE);

		Consumer<String, Long> consumer;
		Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-3",
				"false", embeddedKafka);
		consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
		DefaultKafkaConsumerFactory<String, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
		consumer = cf.createConsumer();
		embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-2");

		// Produce data first to the input topic to test the startOffset setting on the
		// binding (which is set to earliest below).
		// Input 1: Clicks per user (multiple records allowed per user).
		List<KeyValue<String, Long>> userClicks = Arrays.asList(
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L),
				new KeyValue<>("alice", 100L)
		);

		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

		DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(senderProps);
		KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic("user-clicks-2");

		for (KeyValue<String, Long> keyValue : userClicks) {
			template.sendDefault(keyValue.key, keyValue.value);
		}

		try (ConfigurableApplicationContext context = app.run("--server.port=0",
				"--spring.jmx.enabled=false",
				"--spring.cloud.stream.bindings.process-in-0.destination=user-clicks-2",
				"--spring.cloud.stream.bindings.process-in-1.destination=user-regions-2",
				"--spring.cloud.stream.bindings.process-out-0.destination=output-topic-2",
				"--spring.cloud.stream.bindings.process-in-0.consumer.useNativeDecoding=true",
				"--spring.cloud.stream.bindings.process-in-1.consumer.useNativeDecoding=true",
				"--spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding=true",
				"--spring.cloud.stream.kafka.streams.binder.configuration.auto.offset.reset=latest",
				"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.startOffset=earliest",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
						"=org.apache.kafka.common.serialization.Serdes$StringSerde",
				"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
				"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.application-id" +
						"=StreamToTableJoinFunctionTests-foobar",
				"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
			Thread.sleep(1000L);

			// Input 2: Region per user (multiple records allowed per user).
			List<KeyValue<String, String>> userRegions = Arrays.asList(
					new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
					new KeyValue<>("bob", "americas"),
					new KeyValue<>("chao", "asia"),
					new KeyValue<>("dave", "europe"),
					new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
					new KeyValue<>("eve", "americas"),
					new KeyValue<>("fang", "asia")
			);

			Map<String, Object> senderProps1 = KafkaTestUtils.producerProps(embeddedKafka);
			senderProps1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
			senderProps1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

			DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(senderProps1);
			KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
			template1.setDefaultTopic("user-regions-2");

			for (KeyValue<String, String> keyValue : userRegions) {
				template1.sendDefault(keyValue.key, keyValue.value);
			}


			// Input 1: Clicks per user (multiple records allowed per user).
			List<KeyValue<String, Long>> userClicks1 = Arrays.asList(
					new KeyValue<>("bob", 4L),
					new KeyValue<>("chao", 25L),
					new KeyValue<>("bob", 19L),
					new KeyValue<>("dave", 56L),
					new KeyValue<>("eve", 78L),
					new KeyValue<>("fang", 99L)
			);

			for (KeyValue<String, Long> keyValue : userClicks1) {
				template.sendDefault(keyValue.key, keyValue.value);
			}

			List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
					new KeyValue<>("americas", 101L),
					new KeyValue<>("europe", 56L),
					new KeyValue<>("asia", 124L),
					//1000 alice entries which were there in the topic before the consumer started.
					//Since we set the startOffset to earliest for the topic, it will read them,
					//but the join fails to associate with a valid region, thus UNKNOWN.
					new KeyValue<>("UNKNOWN", 1000L)
			);

			//Verify that we receive the expected data
			int count = 0;
			long start = System.currentTimeMillis();
			List<KeyValue<String, Long>> actualClicksPerRegion = new ArrayList<>();
			do {
				ConsumerRecords<String, Long> records = KafkaTestUtils.getRecords(consumer);
				count = count + records.count();
				for (ConsumerRecord<String, Long> record : records) {
					actualClicksPerRegion.add(new KeyValue<>(record.key(), record.value()));
				}
			} while (count < expectedClicksPerRegion.size() && (System.currentTimeMillis() - start) < 30000);

			// TODO: Matched count is 3 and not 4 (expectedClicksPerRegion.size()) when running with full suite. Investigate why.
			// TODO: This behavior is only observed after the Spring Kafka upgrade to 2.5.0 and kafka client to 2.5.
			// TODO: Note that the test passes fine as a single test.
			assertThat(count).matches(
					matchedCount -> matchedCount == expectedClicksPerRegion.size() - 1 || matchedCount == expectedClicksPerRegion.size());
			assertThat(actualClicksPerRegion).containsAnyElementsOf(expectedClicksPerRegion);
		}
		finally {
			consumer.close();
		}
	}

	@Test
	public void trivialSingleKTableInputAsNonDeclarative() {
		SpringApplication app = new SpringApplication(TrivialKTableApp.class);
		app.setWebApplicationType(WebApplicationType.NONE);
		app.run("--server.port=0",
				"--spring.cloud.stream.kafka.streams.binder.brokers="
						+ embeddedKafka.getBrokersAsString(),
				"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.application-id=" +
						"testTrivialSingleKTableInputAsNonDeclarative");
		//All we are verifying is that this application didn't throw any errors.
		//See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/536
	}

	@Test
	public void twoKStreamsCanBeJoined() {
		SpringApplication app = new SpringApplication(
				JoinProcessor.class);
		app.setWebApplicationType(WebApplicationType.NONE);
		app.run("--server.port=0",
				"--spring.cloud.stream.kafka.streams.binder.brokers="
						+ embeddedKafka.getBrokersAsString(),
				"--spring.application.name=" +
						"two-kstream-input-join-integ-test");
		//All we are verifying is that this application didn't throw any errors.
		//See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/701
	}


	/**
	 * Tuple for a region and its associated number of clicks.
	 */
	private static final class RegionWithClicks {

		private final String region;
		private final long clicks;

		RegionWithClicks(String region, long clicks) {
			if (region == null || region.isEmpty()) {
				throw new IllegalArgumentException("region must be set");
			}
			if (clicks < 0) {
				throw new IllegalArgumentException("clicks must not be negative");
			}
			this.region = region;
			this.clicks = clicks;
		}

		public String getRegion() {
			return region;
		}

		public long getClicks() {
			return clicks;
		}

	}

	@EnableAutoConfiguration
	public static class CountClicksPerRegionApplication {

		@Bean
		public Function<KStream<String, Long>, Function<KTable<String, String>, KStream<String, Long>>> process() {
			return userClicksStream -> (userRegionsTable -> (userClicksStream
					.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
									"UNKNOWN" : region, clicks),
							Joined.with(Serdes.String(), Serdes.Long(), null))
					.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
							regionWithClicks.getClicks()))
					.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
					.reduce(Long::sum, Materialized.as("CountClicks-" + UUID.randomUUID()))
					.toStream()));
		}

		@Bean
		public CleanupConfig cleanupConfig() {
			return new CleanupConfig(false, true);
		}
	}

	@EnableAutoConfiguration
	public static class BiFunctionCountClicksPerRegionApplication {

		@Bean
		public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
			return (userClicksStream, userRegionsTable) -> (userClicksStream
					.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
									"UNKNOWN" : region, clicks),
							Joined.with(Serdes.String(), Serdes.Long(), null))
					.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
							regionWithClicks.getClicks()))
					.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
					.reduce(Long::sum, Materialized.as("CountClicks-" + UUID.randomUUID()))
					.toStream());
		}

		@Bean
		public CleanupConfig cleanupConfig() {
			return new CleanupConfig(false, true);
		}
	}

	@EnableAutoConfiguration
	public static class BiConsumerApplication {

		static CountDownLatch latch = new CountDownLatch(2);

		@Bean
		public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
			return (userClicksStream, userRegionsTable) -> {
				userClicksStream.foreach((key, value) -> latch.countDown());
				userRegionsTable.toStream().foreach((key, value) -> latch.countDown());
			};
		}
	}

	@EnableAutoConfiguration
	public static class TrivialKTableApp {

		public java.util.function.Consumer<KTable<String, String>> process() {
			return inputTable -> inputTable.toStream().foreach((key, value) -> System.out.println("key : value " + key + " : " + value));
		}
	}

	@EnableAutoConfiguration
	public static class JoinProcessor {

		public BiConsumer<KStream<String, String>, KStream<String, String>> testProcessor() {
			return (input1Stream, input2Stream) -> input1Stream
					.join(input2Stream,
							(event1, event2) -> null,
							JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(5)),
							StreamJoined.with(
									Serdes.String(),
									Serdes.String(),
									Serdes.String()
							)
					);
		}
	}

}