ConsumerConfigPropertiesTests.java
/*
* Copyright 2023-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.pulsar;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.cloud.stream.binder.pulsar.properties.ConsumerConfigProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.entry;
/**
* Unit tests for {@link ConsumerConfigProperties}.
*
* @author Soby Chacko
* @author Chris Bono
*/
class ConsumerConfigPropertiesTests {
@Test
void allBasePropsCanBeExtractedToMap() {
var inputProps = basePropsInputMap();
var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps);
var outputProps = consumerConfigProps.toBaseConsumerPropertiesMap();
verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps);
verifyBasePropsInOutputMap(outputProps);
}
@Test
void nullBasePropsSkippedWhenExtractedToMap() {
var inputProps = Map.of("spring.pulsar.consumer.name", "my-consumer");
var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps);
var outputProps = consumerConfigProps.toBaseConsumerPropertiesMap();
assertThat(outputProps).contains(entry("consumerName", "my-consumer"));
assertThat(outputProps).doesNotContainKey("deadLetterPolicy");
}
private Map<String, String> basePropsInputMap() {
Map<String, String> inputProps = new HashMap<>();
inputProps.put("spring.pulsar.consumer.name", "my-consumer");
inputProps.put("spring.pulsar.consumer.priority-level", "8");
inputProps.put("spring.pulsar.consumer.read-compacted", "true");
inputProps.put("spring.pulsar.consumer.retry-enable", "true");
inputProps.put("spring.pulsar.consumer.topics[0]", "my-topic");
inputProps.put("spring.pulsar.consumer.topics-pattern", "my-pattern");
// DeadLetterPolicy
inputProps.put("spring.pulsar.consumer.dead-letter-policy.max-redeliver-count", "4");
inputProps.put("spring.pulsar.consumer.dead-letter-policy.retry-letter-topic", "my-retry-topic");
inputProps.put("spring.pulsar.consumer.dead-letter-policy.dead-letter-topic", "my-dlt-topic");
inputProps.put("spring.pulsar.consumer.dead-letter-policy.initial-subscription-name",
"my-initial-subscription");
// Subscription
inputProps.put("spring.pulsar.consumer.subscription.name", "my-subscription");
inputProps.put("spring.pulsar.consumer.subscription.type", "exclusive");
inputProps.put("spring.pulsar.consumer.subscription.mode", "non-durable");
inputProps.put("spring.pulsar.consumer.subscription.initial-position", "earliest");
inputProps.put("spring.pulsar.consumer.subscription.topics-mode", "all-topics");
return inputProps;
}
private void verifyBasePropsInOutputMap(Map<String, Object> outputProps) {
assertThat(outputProps)
.containsEntry("consumerName", "my-consumer")
.containsEntry("priorityLevel", 8)
.containsEntry("readCompacted", true)
.containsEntry("retryEnable", true)
.hasEntrySatisfying("topicNames", topics ->
assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class))
.containsExactly("my-topic"))
.hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern"))
.containsEntry("subscriptionName", "my-subscription")
.containsEntry("subscriptionType", SubscriptionType.Exclusive)
.containsEntry("subscriptionMode", SubscriptionMode.NonDurable)
.containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest)
.containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics)
.hasEntrySatisfying("deadLetterPolicy", dlp -> {
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) dlp;
assertThat(deadLetterPolicy.getMaxRedeliverCount()).isEqualTo(4);
assertThat(deadLetterPolicy.getRetryLetterTopic()).isEqualTo("my-retry-topic");
assertThat(deadLetterPolicy.getDeadLetterTopic()).isEqualTo("my-dlt-topic");
assertThat(deadLetterPolicy.getInitialSubscriptionName()).isEqualTo("my-initial-subscription");
});
}
@Test
void extendedPropsCanBeExtractedToMap() {
var inputProps = basePropsInputMap();
inputProps.putAll(extendedPropsInputMap());
var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps);
var outputProps = consumerConfigProps.toExtendedConsumerPropertiesMap();
verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps);
verifyExtendedPropsInOutputMap(outputProps);
}
@Test
void allPropsCanBeExtractedToMap() {
var inputProps = basePropsInputMap();
inputProps.putAll(extendedPropsInputMap());
var consumerConfigProps = bindInputPropsToConsumerConfigProps(inputProps);
var outputProps = consumerConfigProps.toAllConsumerPropertiesMap();
verifyOutputPropsCanBeLoadedInConsumerBuilder(outputProps);
verifyBasePropsInOutputMap(outputProps);
verifyExtendedPropsInOutputMap(outputProps);
}
private Map<String, String> extendedPropsInputMap() {
Map<String, String> inputProps = new HashMap<>();
inputProps.put("spring.pulsar.consumer.auto-update-partitions", "true");
inputProps.put("spring.pulsar.consumer.auto-update-partitions-interval", "10s");
inputProps.put("spring.pulsar.consumer.crypto-failure-action", "discard");
inputProps.put("spring.pulsar.consumer.max-total-receiver-queue-size-across-partitions", "5");
inputProps.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9");
inputProps.put("spring.pulsar.consumer.pool-messages", "true");
inputProps.put("spring.pulsar.consumer.properties[my-prop]", "my-prop-value");
inputProps.put("spring.pulsar.consumer.receiver-queue-size", "1");
inputProps.put("spring.pulsar.consumer.reset-include-head", "true");
inputProps.put("spring.pulsar.consumer.start-paused", "true");
// Acknowledgment
inputProps.put("spring.pulsar.consumer.ack.group-time", "2s");
inputProps.put("spring.pulsar.consumer.ack.redelivery-delay", "3s");
inputProps.put("spring.pulsar.consumer.ack.timeout", "6s");
inputProps.put("spring.pulsar.consumer.ack.timeout-tick-duration", "7s");
inputProps.put("spring.pulsar.consumer.ack.batch-index-enabled", "true");
inputProps.put("spring.pulsar.consumer.ack.receipt-enabled", "true");
// Chunk
inputProps.put("spring.pulsar.consumer.chunk.expire-time-incomplete", "12s");
inputProps.put("spring.pulsar.consumer.chunk.auto-ack-oldest-on-queue-full", "false");
inputProps.put("spring.pulsar.consumer.chunk.max-pending-messages", "11");
// Subscription
inputProps.put("spring.pulsar.consumer.subscription.properties[my-sub-prop]", "my-sub-prop-value");
inputProps.put("spring.pulsar.consumer.subscription.replicate-state", "true");
return inputProps;
}
private void verifyExtendedPropsInOutputMap(Map<String, Object> outputProps) {
assertThat(outputProps).containsEntry("autoUpdatePartitions", true)
.containsEntry("autoUpdatePartitionsIntervalSeconds", 10L)
.containsEntry("cryptoFailureAction", ConsumerCryptoFailureAction.DISCARD)
.containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5)
.containsEntry("patternAutoDiscoveryPeriod", 9)
.containsEntry("poolMessages", true)
.hasEntrySatisfying("properties",
properties -> assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-prop", "my-prop-value"))
.containsEntry("receiverQueueSize", 1)
.containsEntry("resetIncludeHead", true)
.containsEntry("startPaused", true)
.containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L)
.containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L)
.containsEntry("ackTimeoutMillis", 6_000L)
.containsEntry("tickDurationMillis", 7_000L)
.containsEntry("batchIndexAckEnabled", true)
.containsEntry("ackReceiptEnabled", true)
.containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L)
.containsEntry("autoAckOldestChunkedMessageOnQueueFull", false)
.containsEntry("maxPendingChunkedMessage", 11)
.hasEntrySatisfying("subscriptionProperties",
properties -> assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-sub-prop", "my-sub-prop-value"))
.containsEntry("replicateSubscriptionState", true);
}
private void verifyOutputPropsCanBeLoadedInConsumerBuilder(Map<String, Object> outputProps) {
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(outputProps,
new ConsumerConfigurationData<>(), ConsumerConfigurationData.class));
}
private ConsumerConfigProperties bindInputPropsToConsumerConfigProps(Map<String, String> inputProps) {
return new Binder(new MapConfigurationPropertySource(inputProps))
.bind("spring.pulsar.consumer", Bindable.ofInstance(new ConsumerConfigProperties())).get();
}
}