InteractiveQueryServiceMultiStateStoreTests.java
/*
* Copyright 2022-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.time.Duration;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for the {@link InteractiveQueryService} when dealing with multiple KafkaStreams apps and state stores.
*
* @author Chris Bono
* @author Soby Chacko
*/
@EmbeddedKafka(topics = {"input1", "input2"})
class InteractiveQueryServiceMultiStateStoreTests {
private static final String STORE_1_NAME = "store1";
private static final String STORE_2_NAME = "store2";
private static EmbeddedKafkaBroker embeddedKafka;
@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}
@Test
void stateStoreAvailableOnProperAppWhenAppServerPropertySet() {
stateStoreAvailableOnProperApp(true);
}
@Test
void stateStoreAvailableOnProperAppWhenAppServerPropertyNotSet() {
stateStoreAvailableOnProperApp(false);
}
private void stateStoreAvailableOnProperApp(boolean shouldSetAppServerProperty) {
String appServerArg = shouldSetAppServerProperty ?
"--spring.cloud.stream.kafka.streams.binder.configuration.application.server=" + embeddedKafka.getBrokersAsString() :
"--foo=bar";
try (ConfigurableApplicationContext context = new SpringApplicationBuilder()
.sources(MultipleAppsWithUsedStateStoresTestApplication.class)
.web(WebApplicationType.NONE)
.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=app1",
"--spring.cloud.stream.function.bindings.app1-in-0=input1",
"--spring.cloud.stream.kafka.streams.binder.functions.app1.application-id=stateStoreTestApp1",
appServerArg,
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())
) {
waitForRunningStreams(context.getBean(KafkaStreamsRegistry.class));
InteractiveQueryService queryService = context.getBean(InteractiveQueryService.class);
// The KafkaStreams.store() used by query service is non-deterministic so perform the operation multiple times to
// surface any possible issues. Also, no need to actually write anything to the stores, the store.get() call will
// cause a failure when the state store is invalid.
for (int i = 0; i < 100; i++) {
assertThat(queryService.getQueryableStore(STORE_1_NAME, QueryableStoreTypes.keyValueStore())
.get("someKey")).isNull();
}
}
}
@Test
void stateStoreNotAvailableThrowsException() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder()
.sources(MultipleAppsWithUnusedStateStoresTestApplication.class)
.web(WebApplicationType.NONE)
.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=app1;app2",
"--spring.cloud.stream.function.bindings.app1-in-0=input1",
"--spring.cloud.stream.function.bindings.app2-in-0=input2",
"--spring.cloud.stream.kafka.streams.binder.functions.app1.application-id=stateStoreTestApp3",
"--spring.cloud.stream.kafka.streams.binder.functions.app2.application-id=stateStoreTestApp4",
"--spring.cloud.stream.kafka.streams.binder.configuration.application.server="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString())
) {
waitForRunningStreams(context.getBean(KafkaStreamsRegistry.class));
InteractiveQueryService queryService = context.getBean(InteractiveQueryService.class);
assertThatThrownBy(() -> queryService.getQueryableStore(STORE_1_NAME, QueryableStoreTypes.keyValueStore()))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Error retrieving state store: " + STORE_1_NAME)
.hasRootCauseInstanceOf(UnknownStateStoreException.class)
.hasRootCauseMessage("Store (" + STORE_1_NAME + ") not available to Streams instance");
assertThatThrownBy(() -> queryService.getQueryableStore(STORE_2_NAME, QueryableStoreTypes.keyValueStore()))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Error retrieving state store: " + STORE_2_NAME)
.hasRootCauseInstanceOf(UnknownStateStoreException.class)
.hasRootCauseMessage("Store (" + STORE_2_NAME + ") not available to Streams instance");
}
}
@Test
@SuppressWarnings("unchecked")
void storeQueryParameterCustomizerIsApplied() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder()
.sources(StoreQueryParameterCustomizerTestApplication.class)
.web(WebApplicationType.NONE)
.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.function.definition=app1",
"--spring.cloud.stream.function.bindings.app1-in-0=input1",
"--spring.cloud.stream.kafka.streams.binder.functions.app1.application-id=storeQueryParameterCustomizerIsAppliedAppId",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())
) {
waitForRunningStreams(context.getBean(KafkaStreamsRegistry.class));
InteractiveQueryService queryService = context.getBean(InteractiveQueryService.class);
StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer = context.getBean(StoreQueryParametersCustomizer.class);
StoreQueryParameters<?> storeQueryParams = StoreQueryParameters.fromNameAndType(STORE_1_NAME, QueryableStoreTypes.keyValueStore());
when(storeQueryParametersCustomizer.customize(Mockito.any(StoreQueryParameters.class))).thenReturn(storeQueryParams);
queryService.getQueryableStore(STORE_1_NAME, QueryableStoreTypes.keyValueStore()).get("someKey");
verify(storeQueryParametersCustomizer).customize(Mockito.any(StoreQueryParameters.class));
}
}
private void waitForRunningStreams(KafkaStreamsRegistry registry) {
await().atMost(Duration.ofSeconds(60))
.until(() -> registry.streamsBuilderFactoryBeans().stream()
.allMatch(x -> x.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)));
}
@EnableAutoConfiguration
@Configuration(proxyBeanMethods = false)
static class MultipleAppsWithUsedStateStoresTestApplication {
private static final Logger log = LoggerFactory.getLogger(MultipleAppsWithUsedStateStoresTestApplication.class);
public static void main(String[] args) {
SpringApplication.run(MultipleAppsWithUsedStateStoresTestApplication.class, args);
}
@Bean
public StoreBuilder<KeyValueStore<String, String>> store1() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_1_NAME), Serdes.String(), Serdes.String());
}
@Bean
public Consumer<KStream<String, String>> app1() {
return s -> s
.processValues(EchoProcessor::new, STORE_1_NAME)
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
}
@Bean
public StoreBuilder<KeyValueStore<String, String>> store2() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_2_NAME), Serdes.String(), Serdes.String());
}
@Bean
public Consumer<KStream<String, String>> app2() {
return s -> s
.processValues(EchoProcessor::new, STORE_1_NAME)
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_2_NAME));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
}
@EnableAutoConfiguration
@Configuration(proxyBeanMethods = false)
static class MultipleAppsWithUnusedStateStoresTestApplication {
private static final Logger log = LoggerFactory.getLogger(MultipleAppsWithUnusedStateStoresTestApplication.class);
public static void main(String[] args) {
SpringApplication.run(MultipleAppsWithUnusedStateStoresTestApplication.class, args);
}
@Bean
public StoreBuilder<KeyValueStore<String, String>> store1() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_1_NAME), Serdes.String(), Serdes.String());
}
@Bean
public Consumer<KStream<String, String>> app1() {
// NOTE: No reference to the state store via transformer
return s -> s
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
}
@Bean
public StoreBuilder<KeyValueStore<String, String>> store2() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_2_NAME), Serdes.String(), Serdes.String());
}
@Bean
public Consumer<KStream<String, String>> app2() {
// NOTE: No reference to the state store via transformer
return s -> s
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_2_NAME));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
}
@EnableAutoConfiguration
@Configuration(proxyBeanMethods = false)
static class StoreQueryParameterCustomizerTestApplication {
private static final Logger log = LoggerFactory.getLogger(StoreQueryParameterCustomizerTestApplication.class);
@Bean
public StoreBuilder<KeyValueStore<String, String>> store1() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_1_NAME), Serdes.String(), Serdes.String());
}
@Bean
public Consumer<KStream<String, String>> app1() {
return s -> s
.processValues(EchoProcessor::new, STORE_1_NAME)
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, true);
}
@Bean
StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer() {
return Mockito.mock(StoreQueryParametersCustomizer.class);
}
}
static class EchoProcessor implements FixedKeyProcessor<String, String, String> {
@Override
public void process(FixedKeyRecord<String, String> fixedKeyRecord) {
}
@Override
public void close() {
}
}
}