MultipleOutputBindingsPartitionsTests.java
/*
* Copyright 2023-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import static org.assertj.core.api.Assertions.assertThat;
/***
* @author Soby Chacko
*/
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka
class MultipleOutputBindingsPartitionsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@BeforeEach
public void before() {
NewTopic newTopic1 = new NewTopic("odd-topic", 10, (short) 1);
NewTopic newTopic2 = new NewTopic("even-topic", 5, (short) 1);
embeddedKafka.addTopics(newTopic1, newTopic2);
}
@Test
void singleInputTupleOutputWithDifferentPartitions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(MultiOutputApplication.class)
.web(WebApplicationType.NONE).run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.kafka.consumer.metadata.max.age.ms=1000",
"--spring.cloud.function.definition=singleInputMultipleOutputs",
"--spring.cloud.stream.function.reactive.singleInputMultipleOutputs=true",
"--spring.cloud.stream.kafka.binder.autoAddPartitions=true",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-in-0.group=grp5",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-in-0.destination=multi-input",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-0.destination=odd-topic",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-1.destination=even-topic",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-0.producer.partitionKeyExpression=payload",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-0.producer.partitionCount=10",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-0.producer.partitionSelectorName=mySelectorStrategy",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-1.producer.partitionKeyExpression=payload",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-1.producer.partitionCount=5",
"--spring.cloud.stream.bindings.singleInputMultipleOutputs-out-1.producer.partitionSelectorName=mySelectorStrategy",
"--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("multi-input", MessageBuilder.withPayload(101)
.build());
streamBridge.send("multi-input", MessageBuilder.withPayload(102)
.build());
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group3", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "odd-topic");
Consumer<String, String> consumer2 = cf.createConsumer("group4", null);
embeddedKafka.consumeFromEmbeddedTopics(consumer2, "even-topic");
ConsumerRecord<String, String> record1 = KafkaTestUtils.getSingleRecord(consumer1, "odd-topic");
assertThat(record1)
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("ODD: 101");
assertThat(record1)
.extracting(ConsumerRecord::partition)
.isEqualTo(9);
ConsumerRecord<String, String> record2 = KafkaTestUtils.getSingleRecord(consumer2, "even-topic");
assertThat(record2)
.isNotNull()
.extracting(ConsumerRecord::value)
.isEqualTo("EVEN: 102");
assertThat(record2)
.extracting(ConsumerRecord::partition)
.isEqualTo(4);
}
}
@EnableAutoConfiguration
@Configuration
public static class MultiOutputApplication {
@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> singleInputMultipleOutputs() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor odd = UnicastProcessor.create();
UnicastProcessor even = UnicastProcessor.create();
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
return Tuples.of(Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()), Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()));
};
}
@Bean
public PartitionSelectorStrategy mySelectorStrategy() {
return new MyPartitionSelector();
}
}
static class MyPartitionSelector implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
// selecting the last partition for easy test verification.
return partitionCount - 1;
}
}
}