PulsarTopicProvisioner.java
/*
* Copyright 2022-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.pulsar.provisioning;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarTopicBuilder;
/**
* Pulsar topic provisioner.
*
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>> {
private final PulsarAdministration pulsarAdministration;
private final PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties;
private final PulsarTopicBuilder topicBuilder;
public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties) {
this(pulsarAdministration, pulsarBinderConfigurationProperties, new PulsarTopicBuilder());
}
public PulsarTopicProvisioner(PulsarAdministration pulsarAdministration,
PulsarBinderConfigurationProperties pulsarBinderConfigurationProperties,
PulsarTopicBuilder topicBuilder) {
this.pulsarAdministration = pulsarAdministration;
this.pulsarBinderConfigurationProperties = pulsarBinderConfigurationProperties;
this.topicBuilder = topicBuilder;
}
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<PulsarProducerProperties> pulsarProducerProperties)
throws ProvisioningException {
Integer partitionCountFromBinding = pulsarProducerProperties.getExtension().getPartitionCount();
var partitionCount = getPartitionCount(partitionCountFromBinding);
var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build();
this.pulsarAdministration.createOrModifyTopics(pulsarTopic);
return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
private int getPartitionCount(@Nullable Integer partitionCountConfig) {
var partitionCount = this.pulsarBinderConfigurationProperties.getPartitionCount();
if (partitionCountConfig != null && partitionCountConfig > 0) {
partitionCount = partitionCountConfig;
}
return partitionCount == null ? 0 : partitionCount;
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> pulsarConsumerProperties)
throws ProvisioningException {
var partitionCountFromBinding = pulsarConsumerProperties.getExtension().getPartitionCount();
var partitionCount = getPartitionCount(partitionCountFromBinding);
var pulsarTopic = this.topicBuilder.name(name).numberOfPartitions(partitionCount).build();
this.pulsarAdministration.createOrModifyTopics(pulsarTopic);
return new PulsarDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
private record PulsarDestination(String destinationName,
Integer partitions) implements ProducerDestination, ConsumerDestination {
@Override
public String getName() {
return this.destinationName;
}
@Override
public String getNameForPartition(int partition) {
return this.destinationName;
}
}
}