KafkaStreamsComponentBeansTests.java
/*
* Copyright 2021-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Georg Friedrich
*/
@EmbeddedKafka(topics = {"testFunctionComponent-out-0", "testFunctionComponent-out-1", "testBiFunctionComponent-out", "testCurriedFunctionWithFunctionTerminal-out"})
class KafkaStreamsComponentBeansTests {
private static EmbeddedKafkaBroker embeddedKafka;
private static Consumer<String, String> consumer1;
private static Consumer<String, String> consumer2;
private static Consumer<String, String> consumer3;
private static Consumer<String, String> consumer4;
private final static CountDownLatch LATCH_1 = new CountDownLatch(1);
private final static CountDownLatch LATCH_2 = new CountDownLatch(2);
private final static CountDownLatch LATCH_3 = new CountDownLatch(3);
@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer1 = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "testFunctionComponent-out-0");
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("group-x", "false",
embeddedKafka);
consumerProps1.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps1.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory<String, String> cf1 = new DefaultKafkaConsumerFactory<>(consumerProps1);
consumer2 = cf1.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer2, "testFunctionComponent-out-1");
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("group-y", "false",
embeddedKafka);
consumerProps2.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory<String, String> cf2 = new DefaultKafkaConsumerFactory<>(consumerProps2);
consumer3 = cf2.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer3, "testBiFunctionComponent-out");
Map<String, Object> consumerProps3 = KafkaTestUtils.consumerProps("group-z", "false",
embeddedKafka);
consumerProps3.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps3.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
DefaultKafkaConsumerFactory<String, String> cf3 = new DefaultKafkaConsumerFactory<>(consumerProps3);
consumer4 = cf3.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer4, "testCurriedFunctionWithFunctionTerminal-out");
}
@AfterAll
public static void tearDown() {
closeConsumers(consumer1, consumer2, consumer3, consumer4);
}
static void closeConsumers(Consumer<?, ?>... consumers) {
for (Consumer<?, ?> consumer : consumers) {
if (consumer != null) {
consumer.close();
}
}
}
@Test
void functionComponent() {
SpringApplication app = new SpringApplication(FunctionAsComponent.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.foo-in-0.destination=testFunctionComponent-in",
"--spring.cloud.stream.bindings.foo-out-0.destination=testFunctionComponent-out-0",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testFunctionComponent-in");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1, "testFunctionComponent-out-0");
assertThat(cr.value().contains("foobarfoobar")).isTrue();
}
finally {
pf.destroy();
}
}
}
@Test
void functionComponentBeanComposedWithConsumer() throws InterruptedException {
SpringApplication app = new SpringApplication(FunctionAsComponent.class, StringConsumer.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run(
"--spring.cloud.function.definition=foo|stringConsumer",
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.foostringConsumer-in-0.destination=fooStringConsumer-in",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("fooStringConsumer-in");
template.sendDefault("foobar");
Assert.isTrue(LATCH_1.await(10, TimeUnit.SECONDS), "foobar");
}
finally {
pf.destroy();
}
}
}
@Test
void functionComponentWithBranching() {
SpringApplication app = new SpringApplication(FunctionAsComponentWithBranching.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.branchedFoo-in-0.destination=testFunctionBranchingComponent-in",
"--spring.cloud.stream.bindings.branchedFoo-out-0.destination=testFunctionComponent-out-0",
"--spring.cloud.stream.bindings.branchedFoo-out-1.destination=testFunctionComponent-out-1",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testFunctionBranchingComponent-in");
template.sendDefault(0, "foo");
template.sendDefault(1, "bar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1, "testFunctionComponent-out-0");
assertThat(cr.value().contains("foo")).isTrue();
ConsumerRecord<String, String> cr2 = KafkaTestUtils.getSingleRecord(consumer2, "testFunctionComponent-out-1");
assertThat(cr2.value().contains("bar")).isTrue();
}
finally {
pf.destroy();
}
}
}
@Test
void consumerComponent() throws Exception {
SpringApplication app = new SpringApplication(ConsumerAsComponent.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.bar-in-0.destination=testConsumerComponent-in",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testConsumerComponent-in");
template.sendDefault("foobar");
Assert.isTrue(LATCH_1.await(10, TimeUnit.SECONDS), "bar");
}
finally {
pf.destroy();
}
}
}
@Test
void biFunctionComponent() {
SpringApplication app = new SpringApplication(BiFunctionAsComponent.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.bazz-in-0.destination=testBiFunctionComponent-in-0",
"--spring.cloud.stream.bindings.bazz-in-1.destination=testBiFunctionComponent-in-1",
"--spring.cloud.stream.bindings.bazz-out-0.destination=testBiFunctionComponent-out",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testBiFunctionComponent-in-0");
template.sendDefault("foobar");
template.setDefaultTopic("testBiFunctionComponent-in-1");
template.sendDefault("foobar");
final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer3, Duration.ofSeconds(10), 2);
assertThat(records.count()).isEqualTo(2);
records.forEach(stringStringConsumerRecord -> assertThat(stringStringConsumerRecord.value().contains("foobar")).isTrue());
}
finally {
pf.destroy();
}
}
}
@Test
void biConsumerComponent() throws Exception {
SpringApplication app = new SpringApplication(BiConsumerAsComponent.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.buzz-in-0.destination=testBiConsumerComponent-in-0",
"--spring.cloud.stream.bindings.buzz-in-1.destination=testBiConsumerComponent-in-1",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testBiConsumerComponent-in-0");
template.sendDefault("foobar");
template.setDefaultTopic("testBiConsumerComponent-in-1");
template.sendDefault("foobar");
Assert.isTrue(LATCH_2.await(10, TimeUnit.SECONDS), "bar");
}
finally {
pf.destroy();
}
}
}
@Test
void curriedFunctionWithConsumerTerminal() throws Exception {
SpringApplication app = new SpringApplication(CurriedFunctionWithConsumerTerminal.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.curriedConsumer-in-0.destination=testCurriedFunctionWithConsumerTerminal-in-0",
"--spring.cloud.stream.bindings.curriedConsumer-in-1.destination=testCurriedFunctionWithConsumerTerminal-in-1",
"--spring.cloud.stream.bindings.curriedConsumer-in-2.destination=testCurriedFunctionWithConsumerTerminal-in-2",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-0");
template.sendDefault("foobar");
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-1");
template.sendDefault("foobar");
template.setDefaultTopic("testCurriedFunctionWithConsumerTerminal-in-2");
template.sendDefault("foobar");
Assert.isTrue(LATCH_3.await(10, TimeUnit.SECONDS), "bar");
}
finally {
pf.destroy();
}
}
}
@Test
void curriedFunctionWithFunctionTerminal() {
SpringApplication app = new SpringApplication(CurriedFunctionWithFunctionTerminal.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.curriedFunction-in-0.destination=testCurriedFunctionWithFunctionTerminal-in-0",
"--spring.cloud.stream.bindings.curriedFunction-in-1.destination=testCurriedFunctionWithFunctionTerminal-in-1",
"--spring.cloud.stream.bindings.curriedFunction-in-2.destination=testCurriedFunctionWithFunctionTerminal-in-2",
"--spring.cloud.stream.bindings.curriedFunction-out-0.destination=testCurriedFunctionWithFunctionTerminal-out",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-0");
template.sendDefault("foobar");
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-1");
template.sendDefault("foobar");
template.setDefaultTopic("testCurriedFunctionWithFunctionTerminal-in-2");
template.sendDefault("foobar");
final ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer4, Duration.ofSeconds(10), 3);
assertThat(records.count()).isEqualTo(3);
records.forEach(stringStringConsumerRecord -> assertThat(stringStringConsumerRecord.value().contains("foobar")).isTrue());
}
finally {
pf.destroy();
}
}
}
@Component("foo")
@EnableAutoConfiguration
public static class FunctionAsComponent implements Function<KStream<Integer, String>,
KStream<String, String>> {
@Override
public KStream<String, String> apply(KStream<Integer, String> stringIntegerKStream) {
return stringIntegerKStream.map((integer, s) -> new KeyValue<>(s, s + s));
}
}
@Component("branchedFoo")
@EnableAutoConfiguration
public static class FunctionAsComponentWithBranching implements Function<KStream<Integer, String>,
KStream<String, String>[]> {
@Override
@SuppressWarnings("unchecked")
public KStream<String, String>[] apply(KStream<Integer, String> stringIntegerKStream) {
return stringIntegerKStream.map((key, value) -> new KeyValue<>(key.toString(), value))
.split()
.branch((k, v) -> "1".equals(k))
.defaultBranch()
.values()
.toArray(new KStream[0]);
}
}
@Component("bar")
@EnableAutoConfiguration
public static class ConsumerAsComponent implements java.util.function.Consumer<KStream<Integer, String>> {
@Override
public void accept(KStream<Integer, String> integerStringKStream) {
integerStringKStream.foreach((integer, s) -> LATCH_1.countDown());
}
}
@Component("stringConsumer")
@EnableAutoConfiguration
public static class StringConsumer implements java.util.function.Consumer<KStream<String, String>> {
@Override
public void accept(KStream<String, String> integerStringKStream) {
integerStringKStream.foreach((s1, s2) -> LATCH_1.countDown());
}
}
@Component("bazz")
@EnableAutoConfiguration
public static class BiFunctionAsComponent implements BiFunction<KStream<String, String>, KStream<String, String>, KStream<String, String>> {
@Override
public KStream<String, String> apply(KStream<String, String> stringStringKStream, KStream<String, String> stringStringKStream2) {
return stringStringKStream.merge(stringStringKStream2);
}
}
@Component("buzz")
@EnableAutoConfiguration
public static class BiConsumerAsComponent implements BiConsumer<KStream<String, String>, KStream<String, String>> {
@Override
public void accept(KStream<String, String> stringStringKStream, KStream<String, String> stringStringKStream2) {
final KStream<String, String> merged = stringStringKStream.merge(stringStringKStream2);
merged.foreach((s, s2) -> LATCH_2.countDown());
}
}
@Component("curriedConsumer")
@EnableAutoConfiguration
public static class CurriedFunctionWithConsumerTerminal implements Function<KStream<String, String>,
Function<KStream<String, String>,
java.util.function.Consumer<KStream<String, String>>>> {
@Override
public Function<KStream<String, String>, java.util.function.Consumer<KStream<String, String>>> apply(KStream<String, String> stringStringKStream) {
return stringStringKStream1 -> stringStringKStream2 -> {
final KStream<String, String> merge1 = stringStringKStream.merge(stringStringKStream1);
final KStream<String, String> merged2 = merge1.merge(stringStringKStream2);
merged2.foreach((s1, s2) -> LATCH_3.countDown());
};
}
}
@Component("curriedFunction")
@EnableAutoConfiguration
public static class CurriedFunctionWithFunctionTerminal implements Function<KStream<String, String>,
Function<KStream<String, String>,
java.util.function.Function<KStream<String, String>, KStream<String, String>>>> {
@Override
public Function<KStream<String, String>, Function<KStream<String, String>, KStream<String, String>>> apply(KStream<String, String> stringStringKStream) {
return stringStringKStream1 -> stringStringKStream2 -> {
final KStream<String, String> merge1 = stringStringKStream.merge(stringStringKStream1);
return merge1.merge(stringStringKStream2);
};
}
}
}