PulsarExtendedBindingDefaultPropertiesTests.java
/*
* Copyright 2023-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.pulsar;
import java.util.function.Function;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.config.ExtendedBindingHandlerMappingsProviderConfiguration;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarExtendedBindingProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarProducerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link ExtendedBindingHandlerMappingsProviderConfiguration}.
*
* @author Soby Chacko
*/
class PulsarExtendedBindingDefaultPropertiesTests implements PulsarTestContainerSupport {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(DefaultPropertiesTestApp.class)
.withPropertyValues(
"spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
"spring.cloud.stream.pulsar.binder.partitionCount: 1",
"spring.cloud.stream.pulsar.default.consumer.schema-type: JSON",
"spring.cloud.stream.pulsar.default.consumer.receiverQueueSize: 5000",
"spring.cloud.stream.pulsar.default.consumer.startPaused: true",
"spring.cloud.stream.pulsar.default.consumer.subscription.name: my-subscription",
"spring.cloud.stream.pulsar.default.producer.schema-type: JSON",
"spring.cloud.stream.pulsar.default.producer.blockIfQueueFull: true",
"spring.cloud.stream.pulsar.default.producer.maxPendingMessages: 200",
"spring.cloud.stream.pulsar.default.producer.name: my-producer");
@Test
void defaultsUsedWhenNoCustomBindingProperties() {
this.contextRunner.run((context) -> {
assertThat(context)
.hasNotFailed();
BinderFactory binderFactory = context.getBeanFactory()
.getBean(BinderFactory.class);
Binder<MessageChannel, ? extends ConsumerProperties, ? extends ProducerProperties> pulsarBinder = binderFactory
.getBinder("pulsar", MessageChannel.class);
PulsarExtendedBindingProperties extendedBindingProperties = ((PulsarMessageChannelBinder) pulsarBinder).getExtendedBindingProperties();
PulsarConsumerProperties extendedConsumerProperties = extendedBindingProperties.getExtendedConsumerProperties("process-in-0");
assertThat(extendedConsumerProperties)
.hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON)
.hasFieldOrPropertyWithValue("receiverQueueSize", 5000)
.hasFieldOrPropertyWithValue("subscription.name", "my-subscription")
.hasFieldOrPropertyWithValue("startPaused", true);
PulsarProducerProperties extendedProducerProperties = extendedBindingProperties.getExtendedProducerProperties("process-out-0");
assertThat(extendedProducerProperties)
.hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON)
.hasFieldOrPropertyWithValue("blockIfQueueFull", true)
.hasFieldOrPropertyWithValue("maxPendingMessages", 200)
.hasFieldOrPropertyWithValue("name", "my-producer");
});
}
@Test
void defaultsRespectedWhenCustomBindingProperties() {
this.contextRunner
.withPropertyValues(
"spring.cloud.stream.pulsar.bindings.process-in-0.consumer.receiverQueueSize: 8000",
"spring.cloud.stream.pulsar.bindings.process-out-0.producer.blockIfQueueFull: false",
"spring.cloud.stream.pulsar.bindings.process-out-0.producer.maxPendingMessages: 400")
.run((context) -> {
assertThat(context)
.hasNotFailed();
BinderFactory binderFactory = context.getBeanFactory()
.getBean(BinderFactory.class);
Binder<MessageChannel, ? extends ConsumerProperties, ? extends ProducerProperties> pulsarBinder = binderFactory
.getBinder("pulsar", MessageChannel.class);
PulsarExtendedBindingProperties extendedBindingProperties = ((PulsarMessageChannelBinder) pulsarBinder).getExtendedBindingProperties();
PulsarConsumerProperties extendedConsumerProperties = extendedBindingProperties.getExtendedConsumerProperties("process-in-0");
assertThat(extendedConsumerProperties)
.hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON)
.hasFieldOrPropertyWithValue("receiverQueueSize", 8000)
.hasFieldOrPropertyWithValue("subscription.name", "my-subscription")
.hasFieldOrPropertyWithValue("startPaused", true);
PulsarProducerProperties extendedProducerProperties = extendedBindingProperties.getExtendedProducerProperties("process-out-0");
assertThat(extendedProducerProperties)
.hasFieldOrPropertyWithValue("schemaType", SchemaType.JSON)
.hasFieldOrPropertyWithValue("blockIfQueueFull", false)
.hasFieldOrPropertyWithValue("maxPendingMessages", 400)
.hasFieldOrPropertyWithValue("name", "my-producer");
});
}
@EnableAutoConfiguration
static class DefaultPropertiesTestApp {
@Bean
public Function<String, String> process() {
return s -> s;
}
}
}