InteractiveQueryService.java
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Services pertinent to the interactive query capabilities of Kafka Streams. This class
* provides services such as querying for a particular store, which instance is hosting a
* particular store etc. This is part of the public API of the kafka streams binder, and
* the users can inject this service in their applications to make use of it.
*
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @author Nico Pommerening
* @author Chris Bono
* @author Artem Bilan
* @since 2.1.0
*/
public class InteractiveQueryService {
private static final Log LOG = LogFactory.getLog(InteractiveQueryService.class);
private final KafkaStreamsRegistry kafkaStreamsRegistry;
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KafkaStreamsVersionAgnosticTopologyInfoFacade topologyInfoFacade;
private StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer;
/**
* Constructor for InteractiveQueryService.
* @param kafkaStreamsRegistry holding {@link KafkaStreamsRegistry}
* @param binderConfigurationProperties kafka Streams binder configuration properties
*/
public InteractiveQueryService(KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
this.binderConfigurationProperties = binderConfigurationProperties;
this.topologyInfoFacade = new KafkaStreamsVersionAgnosticTopologyInfoFacade();
}
/**
* Retrieve and return a queryable store by name created in the application.
* @param storeName name of the queryable store
* @param storeType type of the queryable store
* @param <T> generic queryable store
* @return queryable store.
*/
@SuppressWarnings("unchecked")
public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
if (this.storeQueryParametersCustomizer != null) {
storeQueryParams = ((StoreQueryParametersCustomizer<T>) this.storeQueryParametersCustomizer).customize(storeQueryParams);
}
AtomicReference<StoreQueryParameters<T>> storeQueryParametersAtomicReference = new AtomicReference<>(storeQueryParams);
try {
return getRetryTemplate().execute(() -> {
T store = null;
Throwable throwable = null;
if (contextSpecificKafkaStreams != null) {
try {
store = contextSpecificKafkaStreams.store(storeQueryParametersAtomicReference.get());
}
catch (InvalidStateStoreException e) {
throwable = e;
}
}
if (store != null) {
return store;
}
if (contextSpecificKafkaStreams != null) {
LOG.warn("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances");
}
// Find all apps that know about the store
Map<KafkaStreams, T> candidateStores = new HashMap<>();
for (KafkaStreams kafkaStreamApp : kafkaStreamsRegistry.getKafkaStreams()) {
try {
candidateStores.put(kafkaStreamApp, kafkaStreamApp.store(storeQueryParametersAtomicReference.get()));
}
catch (Exception ex) {
throwable = ex;
}
}
// Store exists in a single app - no further resolution required
if (candidateStores.size() == 1) {
return candidateStores.values().stream().findFirst().get();
}
// If the store is in multiple streams apps - discard any apps that do not actually have the store
if (candidateStores.size() > 1) {
candidateStores = candidateStores.entrySet().stream()
.filter((e) -> this.topologyInfoFacade.streamsAppActuallyHasStore(e.getKey(), storeName))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (candidateStores.size() == 1) {
return candidateStores.values().stream().findFirst().get();
}
throwable = (candidateStores.isEmpty()) ?
new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") :
new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");
}
throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
});
}
catch (RetryException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getCause());
return null;
}
}
/**
* Retrieves the current {@link KafkaStreams} context if executing Thread is created by a Streams App (contains a matching application id in Thread's name).
*
* @return KafkaStreams instance associated with Thread
*/
private KafkaStreams getThreadContextSpecificKafkaStreams() {
return this.kafkaStreamsRegistry.getKafkaStreams().stream()
.filter(this::filterByThreadName).findAny().orElse(null);
}
/**
* Checks if the supplied {@link KafkaStreams} instance belongs to the calling Thread by matching the Thread's name with the Streams Application Id.
*
* @param streams {@link KafkaStreams} instance to filter
* @return true if Streams Instance is associated with Thread
*/
private boolean filterByThreadName(KafkaStreams streams) {
String applicationId = Objects.requireNonNull(kafkaStreamsRegistry.streamBuilderFactoryBean(
streams).getStreamsConfiguration())
.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
// TODO: is there some better way to find out if a Stream App created the Thread?
return Thread.currentThread().getName().contains(applicationId);
}
/**
* Gets the current {@link HostInfo} that the calling kafka streams application is
* running on.
*
* Note that the end user applications must provide `application.server` as a
* configuration property when calling this method. If this is not available, then
* null is returned.
* @return the current {@link HostInfo}
*/
public HostInfo getCurrentHostInfo() {
Map<String, String> configuration = this.binderConfigurationProperties
.getConfiguration();
if (configuration.containsKey("application.server")) {
String applicationServer = configuration.get("application.server");
String[] splits = StringUtils.split(applicationServer, ":");
return new HostInfo(Objects.requireNonNull(splits)[0], Integer.parseInt(splits[1]));
}
return null;
}
/**
* Gets the {@link HostInfo} where the provided store and key are hosted on. This may
* not be the current host that is running the application. Kafka Streams will look
* through all the consumer instances under the same application id and retrieves the
* proper host.
*
* Note that the end user applications must provide `application.server` as a
* configuration property for all the application instances when calling this method.
* If this is not available, then null maybe returned.
* @param <K> generic type for key
* @param store store name
* @param key key to look for
* @param serializer {@link Serializer} for the key
* @return the {@link HostInfo} where the key for the provided store is hosted currently
*/
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
final RetryTemplate retryTemplate = getRetryTemplate();
try {
return retryTemplate.execute(() -> {
Throwable throwable = null;
try {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
}
}
catch (Exception e) {
throwable = e;
}
throw new IllegalStateException(
"Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
});
}
catch (RetryException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getCause());
return null;
}
}
private RetryTemplate getRetryTemplate() {
var stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxRetries(stateStoreRetry.getMaxAttempts())
.delay(Duration.ofMillis(stateStoreRetry.getBackoffPeriod()))
.build();
return new RetryTemplate(retryPolicy);
}
/**
* Retrieves and returns the {@link KeyQueryMetadata} associated with the given combination of
* key and state store. If none found, it will return null.
*
* @param <K> generic type for key
* @param store store name
* @param key key to look for
* @param serializer {@link Serializer} for the key
* @return the {@link KeyQueryMetadata} if available, null otherwise.
*/
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer) {
return this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
}
/**
* Retrieves and returns the {@link KafkaStreams} object that is associated with the given combination of
* key and state store. If none found, it will return null.
*
* @param <K> generic type for key
* @param store store name
* @param key key to look for
* @param serializer {@link Serializer} for the key
* @return {@link KafkaStreams} object associated with this combination of store and key
*/
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer) {
final AtomicReference<KafkaStreams> kafkaStreamsAtomicReference = new AtomicReference<>();
this.kafkaStreamsRegistry.getKafkaStreams()
.forEach(k -> {
final KeyQueryMetadata keyQueryMetadata = k.queryMetadataForKey(store, key, serializer);
if (keyQueryMetadata != null) {
kafkaStreamsAtomicReference.set(k);
}
});
return kafkaStreamsAtomicReference.get();
}
/**
* Gets the list of {@link HostInfo} where the provided store is hosted on.
* It also can include current host info.
* Kafka Streams will look through all the consumer instances under the same application id
* and retrieves all hosts info.
*
* Note that the end-user applications must provide `application.server` as a configuration property
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
*
* @param store store name
* @return the list of {@link HostInfo} where provided store is hosted on
*/
public List<HostInfo> getAllHostsInfo(String store) {
return kafkaStreamsRegistry.getKafkaStreams()
.stream()
.flatMap(k -> k.streamsMetadataForStore(store).stream())
.filter(Objects::nonNull)
.map(StreamsMetadata::hostInfo)
.collect(Collectors.toList());
}
/**
* @param storeQueryParametersCustomizer to customize
* @since 4.0.1
*/
public void setStoreQueryParametersCustomizer(StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer) {
this.storeQueryParametersCustomizer = storeQueryParametersCustomizer;
}
}