KafkaTopicProvisioner.java
/*
* Copyright 2014-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.provisioning;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.kafka.autoconfigure.KafkaConnectionDetails;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaTopicProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.core.retry.RetryOperations;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Kafka implementation for {@link ProvisioningProvider}.
*
* @author Soby Chacko
* @author Gary Russell
* @author Ilayaperumal Gopinathan
* @author Simon Flandergan
* @author Oleg Zhurakousky
* @author Aldo Sinanaj
* @author Yi Liu
* @author Omer Celik
* @author Byungjun You
* @author Roman Akentev
* @author Artem Bilan
*/
public class KafkaTopicProvisioner implements
// @checkstyle:off
ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>,
// @checkstyle:on
InitializingBean {
private static final Log logger = LogFactory.getLog(KafkaTopicProvisioner.class);
private static final int DEFAULT_OPERATION_TIMEOUT = 30;
private final KafkaBinderConfigurationProperties configurationProperties;
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private final Map<String, Object> adminClientProperties;
private RetryOperations metadataRetryOperations;
/**
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizer != null ?
Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
}
/**
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the instance.
* @parak kafkaConnectionDetails the Kafka connection details used to build the instance
* @param adminClientConfigCustomizer to customize {@link AdminClient}.
* @since 4.1.4
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
AdminClientConfigCustomizer adminClientConfigCustomizer) {
this(kafkaBinderConfigurationProperties, kafkaProperties, kafkaConnectionDetails,
adminClientConfigCustomizer != null ? Arrays.asList(adminClientConfigCustomizer) : new ArrayList<>());
}
/**
* Create an instance.
*
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* @param adminClientConfigCustomizers to customize {@link AdminClient}.
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties,
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {
this(kafkaBinderConfigurationProperties, kafkaProperties, null, adminClientConfigCustomizers);
}
/**
* Create an instance.
*
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the instance.
* @param kafkaConnectionDetails the Kafka connection deatils used to build the instance.
* @param adminClientConfigCustomizers to customize {@link AdminClient}.
* @since 4.1.4
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaConnectionDetails kafkaConnectionDetails,
List<AdminClientConfigCustomizer> adminClientConfigCustomizers) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminClientProperties = createAdminClientProperties(kafkaProperties, kafkaConnectionDetails);
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
// If the application provides AdminConfig customizers
// and overrides properties, those take precedence.
adminClientConfigCustomizers.forEach(customizer -> customizer.configure(this.adminClientProperties));
}
private Map<String, Object> createAdminClientProperties(KafkaProperties properties, KafkaConnectionDetails connectionDetails) {
Map<String, Object> adminProperties = properties.buildAdminProperties();
if (connectionDetails != null) {
adminProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getAdmin().getBootstrapServers());
}
return adminProperties;
}
/**
* Return an unmodifiable map of merged admin properties.
* @return the properties.
* @since 4.0.3
*/
public Map<String, Object> getAdminClientProperties() {
return Collections.unmodifiableMap(this.adminClientProperties);
}
/**
* Mutator for metadata retry operations.
* @param metadataRetryOperations the retry configuration
*/
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
@Override
public void afterPropertiesSet() {
if (this.metadataRetryOperations == null) {
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxRetries(10)
.delay(Duration.ofMillis(100))
.multiplier(2)
.maxDelay(Duration.ofSeconds(1))
.build();
this.metadataRetryOperations = new RetryTemplate(retryPolicy);
}
}
@Override
public ProducerDestination provisionProducerDestination(final String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (logger.isInfoEnabled()) {
logger.info("Using kafka topic for outbound: " + name);
}
if (this.configurationProperties.isAutoCreateTopics()) {
KafkaTopicUtils.validateTopicName(name);
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, properties.getPartitionCount(), false,
properties.getExtension().getTopic());
int partitions = getPartitionsForTopic(name, adminClient);
return new KafkaProducerDestination(name, partitions);
}
}
return new KafkaProducerDestination(name, 0);
}
@Override
public ConsumerDestination provisionConsumerDestination(final String name,
final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
if (!properties.isMultiplex()) {
return doProvisionConsumerDestination(name, group, properties);
}
else {
String[] destinations = StringUtils.commaDelimitedListToStringArray(name);
for (String destination : destinations) {
doProvisionConsumerDestination(destination.trim(), group, properties);
}
return new KafkaConsumerDestination(name);
}
}
private ConsumerDestination doProvisionConsumerDestination(final String name,
final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
final KafkaConsumerDestination kafkaConsumerDestination = new KafkaConsumerDestination(name);
if (properties.getExtension().isDestinationIsPattern()) {
Assert.isTrue(!properties.getExtension().isEnableDlq(),
"enableDLQ is not allowed when listening to topic patterns");
if (logger.isDebugEnabled()) {
logger.debug("Listening to a topic pattern - " + name
+ " - no provisioning performed");
}
return kafkaConsumerDestination;
}
if (this.configurationProperties.isAutoCreateTopics()) {
KafkaTopicUtils.validateTopicName(name);
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
}
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
ConsumerDestination consumerDestination;
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, partitionCount,
properties.getExtension().isAutoRebalanceEnabled(),
properties.getExtension().getTopic());
int partitions = getPartitionsForTopic(name, adminClient);
consumerDestination = createDlqIfNeedBe(adminClient, name, group,
properties, anonymous, partitions);
if (consumerDestination == null) {
consumerDestination = new KafkaConsumerDestination(name,
partitions);
}
return consumerDestination;
}
}
return kafkaConsumerDestination;
}
private int getPartitionsForTopic(String topicName, AdminClient adminClient) {
int partitions = 0;
Map<String, TopicDescription> topicDescriptions = retrieveTopicDescriptions(topicName, adminClient);
TopicDescription topicDescription = topicDescriptions.get(topicName);
if (topicDescription != null) {
partitions = topicDescription.partitions().size();
}
return partitions;
}
private Map<String, TopicDescription> retrieveTopicDescriptions(String topicName, AdminClient adminClient) {
try {
return this.metadataRetryOperations.execute(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Attempting to retrieve the description for the topic: " + topicName);
}
DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.allTopicNames();
return all.get(this.operationTimeout, TimeUnit.SECONDS);
});
}
catch (RetryException ex) {
throw new ProvisioningException("Problems encountered with partitions finding for: " + topicName, ex);
}
}
AdminClient createAdminClient() {
return AdminClient.create(this.adminClientProperties);
}
/**
* In general, binder properties supersede boot kafka properties. The one exception is
* the bootstrap servers. In that case, we should only override the boot properties if
* (there is a binder property AND it is a non-default value) OR (if there is no boot
* property); this is needed because the binder property never returns a null value.
* @param adminProps the admin properties to normalize.
* @param bootProps the boot kafka properties.
* @param binderProps the binder kafka properties.
*/
public static void normalalizeBootPropsWithBinder(Map<String, Object> adminProps,
KafkaProperties bootProps, KafkaBinderConfigurationProperties binderProps) {
// First deal with the outlier
String kafkaConnectionString = binderProps.getKafkaConnectionString();
if (ObjectUtils
.isEmpty(adminProps.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))
|| !kafkaConnectionString
.equals(binderProps.getDefaultKafkaConnectionString())) {
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConnectionString);
}
// Now override any boot values with binder values
Map<String, String> binderProperties = binderProps.getConfiguration();
Set<String> adminConfigNames = AdminClientConfig.configNames();
binderProperties.forEach((key, value) -> {
if (key.equals(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalStateException(
"Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
}
if (adminConfigNames.contains(key)) {
Object replaced = adminProps.put(key, value);
if (replaced != null && KafkaTopicProvisioner.logger.isDebugEnabled()) {
KafkaTopicProvisioner.logger.debug("Overrode boot property: [" + key + "], from: ["
+ replaced + "] to: [" + value + "]");
}
}
});
}
private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name,
String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties,
boolean anonymous, int partitions) {
if (properties.getExtension().isEnableDlq() && !anonymous) {
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName())
? properties.getExtension().getDlqName()
: "error." + name + "." + group;
int dlqPartitions = properties.getExtension().getDlqPartitions() == null
? partitions
: properties.getExtension().getDlqPartitions();
try {
final KafkaProducerProperties dlqProducerProperties = properties.getExtension().getDlqProducerProperties();
createTopicAndPartitions(adminClient, dlqTopic, dlqPartitions,
properties.getExtension().isAutoRebalanceEnabled(),
dlqProducerProperties.getTopic());
}
catch (Throwable throwable) {
if (throwable instanceof Error throwableError) {
throw throwableError;
}
else {
throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
}
}
return new KafkaConsumerDestination(name, partitions, dlqTopic);
}
return null;
}
private void createTopic(AdminClient adminClient, String name, int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties properties) {
try {
createTopicIfNecessary(adminClient, name, partitionCount,
tolerateLowerPartitionsOnBroker, properties);
}
// TODO: Remove catching Throwable. See this thread:
// TODO:
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/514#discussion_r241075940
catch (Throwable throwable) {
if (throwable instanceof Error throwableError) {
throw throwableError;
}
else {
// TODO:
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/514#discussion_r241075940
throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
}
}
}
private void createTopicIfNecessary(AdminClient adminClient, final String topicName,
final int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaTopicProperties properties) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics()) {
createTopicAndPartitions(adminClient, topicName, partitionCount,
tolerateLowerPartitionsOnBroker, properties);
}
else {
logger.info("Auto creation of topics is disabled.");
}
}
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
* @param adminClient kafka admin client
* @param topicName topic name
* @param partitionCount partition count
* @param tolerateLowerPartitionsOnBroker whether lower partitions count on broker is
* tolerated ot not
* @param topicProperties kafka topic properties
* @throws Throwable from topic creation
*/
private void createTopicAndPartitions(AdminClient adminClient, final String topicName,
final int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaTopicProperties topicProperties) throws Throwable {
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
Set<String> names = namesFutures.get(this.operationTimeout, TimeUnit.SECONDS);
if (names.contains(topicName)) {
//check if topic.properties are different from Topic Configuration in Kafka
if (this.configurationProperties.isAutoAlterTopics()) {
alterTopicConfigsIfNecessary(adminClient, topicName, topicProperties);
}
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = this.configurationProperties
.isAutoAddPartitions()
? Math.max(
this.configurationProperties.getMinPartitionCount(),
partitionCount)
: partitionCount;
DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
KafkaFuture<Map<String, TopicDescription>> topicDescriptionsFuture = describeTopicsResult
.allTopicNames();
Map<String, TopicDescription> topicDescriptions = topicDescriptionsFuture
.get(this.operationTimeout, TimeUnit.SECONDS);
TopicDescription topicDescription = topicDescriptions.get(topicName);
int partitionSize = topicDescription.partitions().size();
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
CreatePartitionsResult partitions = adminClient
.createPartitions(Collections.singletonMap(topicName,
NewPartitions.increaseTo(effectivePartitionCount)));
partitions.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
else if (tolerateLowerPartitionsOnBroker) {
logger.warn("The number of expected partitions for topic "
+ topicName + " was: "
+ partitionCount + ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ")
+ "been found instead. " + "There will be "
+ (effectivePartitionCount - partitionSize)
+ " idle consumers");
}
else {
throw new ProvisioningException(
"The number of expected partitions for topic " + topicName
+ " was: " + partitionCount
+ ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ")
+ "been found instead. "
+ "Consider either increasing the partition count of the topic or enabling "
+ "`autoAddPartitions`");
}
}
}
else {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(
this.configurationProperties.getMinPartitionCount(), partitionCount);
this.metadataRetryOperations.execute(() -> {
NewTopic newTopic;
Map<Integer, List<Integer>> replicasAssignments = topicProperties
.getReplicasAssignments();
if (!CollectionUtils.isEmpty(replicasAssignments)) {
newTopic = new NewTopic(topicName,
topicProperties.getReplicasAssignments());
}
else {
newTopic = new NewTopic(topicName, effectivePartitionCount,
topicProperties.getReplicationFactor() != null
? topicProperties.getReplicationFactor()
: this.configurationProperties
.getReplicationFactor());
}
if (!topicProperties.getProperties().isEmpty()) {
newTopic.configs(topicProperties.getProperties());
}
CreateTopicsResult createTopicsResult = adminClient
.createTopics(Collections.singletonList(newTopic));
try {
createTopicsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (Exception ex) {
if (ex instanceof ExecutionException) {
if (ex.getCause() instanceof TopicExistsException) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName
+ ". Topic already exists.");
}
}
else {
logger.error("Failed to create topics", ex.getCause());
throw ex.getCause();
}
}
else {
logger.error("Failed to create topics", ex.getCause());
throw ex.getCause();
}
}
return null;
});
}
}
private void alterTopicConfigsIfNecessary(AdminClient adminClient,
String topicName,
KafkaTopicProperties topicProperties)
throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singletonList(topicConfigResource));
KafkaFuture<Map<ConfigResource, Config>> topicConfigurationFuture = describeConfigsResult.all();
Map<ConfigResource, Config> topicConfigMap = topicConfigurationFuture
.get(this.operationTimeout, TimeUnit.SECONDS);
Config config = topicConfigMap.get(topicConfigResource);
final List<AlterConfigOp> updatedConfigEntries = topicProperties.getProperties().entrySet().stream()
.filter(propertiesEntry -> {
// Property is new and should be added
if (config.get(propertiesEntry.getKey()) == null) {
return true;
}
else {
// Property changed and should be updated
return !config.get(propertiesEntry.getKey()).value().equals(propertiesEntry.getValue());
}
})
.map(propertyEntry -> new ConfigEntry(propertyEntry.getKey(), propertyEntry.getValue()))
.map(configEntry -> new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
.collect(Collectors.toList());
if (!updatedConfigEntries.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Attempting to alter configs " + updatedConfigEntries + " for the topic:" + topicName);
}
Map<ConfigResource, Collection<AlterConfigOp>> alterConfigForTopics = new HashMap<>();
alterConfigForTopics.put(topicConfigResource, updatedConfigEntries);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alterConfigForTopics);
alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
}
public Collection<PartitionInfo> getListenedPartitions(final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, int partitionCount,
boolean usingPatterns, boolean groupManagement, final String topic,
Map<String, TopicInformation> topicsInUse) {
Collection<PartitionInfo> listenedPartitions;
Collection<PartitionInfo> allPartitions = usingPatterns ? Collections.emptyList()
: getPartitionInfoForConsumer(topic, extendedConsumerProperties, consumerFactory,
partitionCount);
if (groupManagement || extendedConsumerProperties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
else {
listenedPartitions = new ArrayList<>();
for (PartitionInfo partition : allPartitions) {
// divide partitions across modules
if ((partition.partition() % extendedConsumerProperties
.getInstanceCount()) == extendedConsumerProperties
.getInstanceIndex()) {
listenedPartitions.add(partition);
}
}
}
topicsInUse.put(topic,
new TopicInformation(group, listenedPartitions, usingPatterns));
return listenedPartitions;
}
/**
* Check that the topic has the expected number of partitions and return the partition information for consumer.
*/
public Collection<PartitionInfo> getPartitionInfoForConsumer(final String topic,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
return getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
() -> {
try (Consumer<?, ?> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic);
}
}, topic);
}
/**
* Check that the topic has the expected number of partitions and return the partition information for producer.
*/
public Collection<PartitionInfo> getPartitionInfoForProducer(final String topicName,
final ProducerFactory<byte[], byte[]> producerFB,
final ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
return getPartitionsForTopic(producerProperties.getPartitionCount(), false,
() -> {
try (Producer<byte[], byte[]> producer = producerFB
.createProducer()) {
return producer.partitionsFor(topicName);
}
}, topicName);
}
/**
* Check that the topic has the expected number of partitions and return the partition information.
* @param partitionCount the expected count.
* @param tolerateLowerPartitionsOnBroker if false, throw an exception if there are not enough partitions.
* @param callable a Callable that will provide the partition information.
* @param topicName the topic./
* @return the partition information.
*/
public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount,
final boolean tolerateLowerPartitionsOnBroker,
final Callable<Collection<PartitionInfo>> callable, final String topicName) {
try {
return this.metadataRetryOperations.execute(() -> {
Collection<PartitionInfo> partitions = Collections.emptyList();
try {
// This call may return null or throw an exception.
partitions = callable.call();
}
catch (Exception ex) {
logger.warn("Error occurred while calculating partitions for topic: " + topicName, ex);
// The above call can potentially throw exceptions such as timeout. If
// we can determine
// that the exception was due to an unknown topic on the broker, just
// simply rethrow that.
if (ex instanceof UnknownTopicOrPartitionException) {
throw ex;
}
}
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
// For that, we are forcing another query to ensure that the topic is present on the server.
if (CollectionUtils.isEmpty(partitions)) {
try (AdminClient adminClient = createAdminClient()) {
final DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
describeTopicsResult.allTopicNames().get();
}
catch (ExecutionException ex) {
if (ex.getCause() instanceof UnknownTopicOrPartitionException unknownTopicOrPartitionException) {
throw unknownTopicOrPartitionException;
}
else {
logger.warn("No partitions have been retrieved for the topic "
+ "(" + topicName
+ "). This will affect the health check.");
}
}
// If we don't have partition information, that is a fatal operation and thus exiting the app.
throw new RuntimeException("Failed to obtain partition information for the topic " + topicName);
}
// do a sanity check on the partition set
int partitionSize = partitions.size();
if (partitionSize < partitionCount) {
if (tolerateLowerPartitionsOnBroker) {
logger.warn("The number of expected partitions for topic "
+ topicName + " was: "
+ partitionCount + ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ")
+ "been found instead. " + "There will be "
+ (partitionCount - partitionSize) + " idle consumers");
}
else {
throw new IllegalStateException(
"The number of expected partitions for topic " + topicName
+ " was: " + partitionCount
+ ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ")
+ "been found instead");
}
}
return partitions;
});
}
catch (Exception ex) {
logger.error("Cannot initialize Binder checking the topic (" + topicName + ").", ex);
throw new BinderException("Cannot initialize binder checking the topic (" + topicName + "):", ex);
}
}
private static final class KafkaProducerDestination implements ProducerDestination {
private final String producerDestinationName;
private final int partitions;
KafkaProducerDestination(String destinationName, Integer partitions) {
this.producerDestinationName = destinationName;
this.partitions = partitions;
}
@Override
public String getName() {
return this.producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return this.producerDestinationName;
}
@Override
public String toString() {
return "KafkaProducerDestination{" + "producerDestinationName='"
+ producerDestinationName + '\'' + ", partitions=" + partitions + '}';
}
}
private static final class KafkaConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
private final int partitions;
private final String dlqName;
KafkaConsumerDestination(String consumerDestinationName) {
this(consumerDestinationName, 0, null);
}
KafkaConsumerDestination(String consumerDestinationName, int partitions) {
this(consumerDestinationName, partitions, null);
}
KafkaConsumerDestination(String consumerDestinationName, Integer partitions,
String dlqName) {
this.consumerDestinationName = consumerDestinationName;
this.partitions = partitions;
this.dlqName = dlqName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
@Override
public String toString() {
return "KafkaConsumerDestination{" + "consumerDestinationName='"
+ consumerDestinationName + '\'' + ", partitions=" + partitions
+ ", dlqName='" + dlqName + '\'' + '}';
}
}
}