KafkaStreamsVersionAgnosticTopologyInfoFacadeTests.java
/*
* Copyright 2022-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link KafkaStreamsVersionAgnosticTopologyInfoFacade}.
*
* @author Chris Bono
*/
class KafkaStreamsVersionAgnosticTopologyInfoFacadeTests {
private static final Properties STREAM_PROPS = new Properties();
private static final StreamsBuilder STREAM_BUILDER = new StreamsBuilder();
static {
STREAM_PROPS.put(StreamsConfig.APPLICATION_ID_CONFIG, "topology-facade-tests-app");
STREAM_PROPS.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
STREAM_BUILDER.<String, String>stream("foo").to("bar");
}
@Nested
class KafkaStreams30 {
@Test
void sourceTopicsForStoreWithTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams30.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams30("topic1"), "store1")).isTrue();
}
@Test
void sourceTopicsForStoreWithNoTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams30.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams30(), "store1")).isFalse();
}
}
@Nested
class KafkaStreams31_32 {
@Test
void sourceTopicsForStoreWithTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams31_32.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams31_32("topic1"), "store1")).isTrue();
}
@Test
void sourceTopicsForStoreWithNoTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams31_32.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams31_32(), "store1")).isFalse();
}
}
@Nested
class KafkaStreams33 {
@Test
void sourceTopicsForStoreWithTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams33.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams33("topic1"), "store1")).isTrue();
}
@Test
void sourceTopicsForStoreWithNoTopics() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams33.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams33(), "store1")).isFalse();
}
}
@Nested
class NegativeCases {
@Test
void nullTopologyInfo() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreams30.class);
TestToplogyInfoOneArg internalTopologyBuilder = null;
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreams30(internalTopologyBuilder), "store1")).isFalse();
}
@Test
void sourceTopicsForStoreMethodNotFound() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreamsNoSourceTopicsMethod.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreamsNoSourceTopicsMethod(), "store1")).isFalse();
}
@Test
void sourceTopicsForStoreThrowsException() {
KafkaStreamsVersionAgnosticTopologyInfoFacade facade =
new KafkaStreamsVersionAgnosticTopologyInfoFacade(TestKafkaStreamsThrowsError.class);
assertThat(facade.streamsAppActuallyHasStore(new TestKafkaStreamsThrowsError(), "store1")).isFalse();
}
}
/**
* A test version of KafkaStreams as it exists in kafka-streams 3.0 w/ a topology
* info field named 'internalTopologyBuilder' that in turn has a single-arg version
* of sourceTopicsForStore(String)' that holds the info we need.
*/
static class TestKafkaStreams30 extends KafkaStreams {
private TestToplogyInfoOneArg internalTopologyBuilder;
TestKafkaStreams30(String... topics) {
this(new TestToplogyInfoOneArg(topics));
}
TestKafkaStreams30(TestToplogyInfoOneArg internalTopologyBuilder) {
super(STREAM_BUILDER.build(), STREAM_PROPS);
this.internalTopologyBuilder = internalTopologyBuilder;
}
}
/**
* A test version of KafkaStreams as it exists in kafka-streams [3.1,3.2] w/ a topology
* info field named 'topologyMetadata' that in turn has a single-arg version
* of sourceTopicsForStore(String)' that holds the info we need.
*/
static class TestKafkaStreams31_32 extends KafkaStreams {
private TestToplogyInfoOneArg topologyMetadata;
TestKafkaStreams31_32(String... topics) {
super(STREAM_BUILDER.build(), STREAM_PROPS);
this.topologyMetadata = new TestToplogyInfoOneArg(topics);
}
}
/**
* A test version of KafkaStreams as it exists in kafka-streams 3.3+ w/ a topology
* info field named 'topologyMetadata' that in turn has a two-arg version
* of sourceTopicsForStore(String,String)' that holds the info we need.
*/
static class TestKafkaStreams33 extends KafkaStreams {
private TestToplogyInfoTwoArgs topologyMetadata;
TestKafkaStreams33(String... topics) {
super(STREAM_BUILDER.build(), STREAM_PROPS);
this.topologyMetadata = new TestToplogyInfoTwoArgs(topics);
}
}
static class TestToplogyInfoOneArg {
private String[] sourceTopics;
TestToplogyInfoOneArg(String... sourceTopics) {
this.sourceTopics = sourceTopics;
}
public Collection<String> sourceTopicsForStore(String storeName) {
return this.sourceTopics == null ? Collections.emptyList() : Arrays.asList(this.sourceTopics);
}
}
static class TestToplogyInfoTwoArgs {
private String[] sourceTopics;
TestToplogyInfoTwoArgs(String... sourceTopics) {
this.sourceTopics = sourceTopics;
}
public Collection<String> sourceTopicsForStore(String storeName, String topologyName) {
return this.sourceTopics == null ? Collections.emptyList() : Arrays.asList(this.sourceTopics);
}
}
/**
* A test version of KafkaStreams w/ a topology info field named 'internalTopologyBuilder'
* that in turn has NO 'sourceTopicsForStore' method to use.
*/
static class TestKafkaStreamsNoSourceTopicsMethod extends KafkaStreams {
private TestTopologyInfoNoSourceTopicsMethod internalTopologyBuilder = new TestTopologyInfoNoSourceTopicsMethod();
TestKafkaStreamsNoSourceTopicsMethod() {
super(STREAM_BUILDER.build(), STREAM_PROPS);
}
static class TestTopologyInfoNoSourceTopicsMethod {
}
}
/**
* A test version of KafkaStreams w/ a topology info field named 'internalTopologyBuilder'
* that in turn has a single-arg version of sourceTopicsForStore(String)' that throws
* an exception when invoked.
*/
static class TestKafkaStreamsThrowsError extends KafkaStreams {
private TestToplogyInfoOneArg internalTopologyBuilder = new TestToplogyInfoOneArg() {
@Override
public Collection<String> sourceTopicsForStore(String storeName) {
throw new RuntimeException("BOOM: " + storeName);
}
};
TestKafkaStreamsThrowsError() {
super(STREAM_BUILDER.build(), STREAM_PROPS);
}
}
}