RabbitStreamBinderModuleTests.java
/*
* Copyright 2021-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.stream;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ProducerBuilder;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.integration.autoconfigure.IntegrationAutoConfiguration;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.ProducerType;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/**
* @author Gary Russell
* @author Soby Chacko
*/
class RabbitStreamBinderModuleTests {
private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();
@Test
void streamContainer() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleProcessor.class)
.web(WebApplicationType.NONE)
.run("--server.port=0")) {
BinderFactory binderFactory = context.getBean(BinderFactory.class);
RabbitMessageChannelBinder rabbitBinder = (RabbitMessageChannelBinder) binderFactory.getBinder(null,
MessageChannel.class);
RabbitConsumerProperties rProps = new RabbitConsumerProperties();
rProps.setContainerType(ContainerType.STREAM);
ExtendedConsumerProperties<RabbitConsumerProperties> props =
new ExtendedConsumerProperties<RabbitConsumerProperties>(rProps);
props.setAutoStartup(false);
Binding<MessageChannel> binding = rabbitBinder.bindConsumer("testStream", "grp", new QueueChannel(), props);
Object container = TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer");
assertThat(container).isInstanceOf(StreamListenerContainer.class);
((StreamListenerContainer) container).start();
verify(context.getBean(ConsumerBuilder.class)).offset(OffsetSpecification.first());
((StreamListenerContainer) container).stop();
}
}
@Test
void superStreamContainer() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleProcessor.class)
.web(WebApplicationType.NONE)
.run("--server.port=0")) {
BinderFactory binderFactory = context.getBean(BinderFactory.class);
RabbitMessageChannelBinder rabbitBinder = (RabbitMessageChannelBinder) binderFactory.getBinder(null,
MessageChannel.class);
RabbitConsumerProperties rProps = new RabbitConsumerProperties();
rProps.setContainerType(ContainerType.STREAM);
rProps.setSuperStream(true);
ExtendedConsumerProperties<RabbitConsumerProperties> props =
new ExtendedConsumerProperties<RabbitConsumerProperties>(rProps);
props.setAutoStartup(false);
props.setInstanceCount(1);
Binding<MessageChannel> binding = rabbitBinder.bindConsumer("testSuperStream", "grp", new QueueChannel(), props);
Object container = TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer");
assertThat(container).isInstanceOf(StreamListenerContainer.class);
((StreamListenerContainer) container).start();
ConsumerBuilder builder = context.getBean(ConsumerBuilder.class);
verify(builder).singleActiveConsumer();
verify(builder).superStream("testSuperStream");
verify(builder).name("testSuperStream.grp");
((StreamListenerContainer) container).stop();
}
}
@Test
void streamHandler() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(IntegrationAutoConfiguration.class, SimpleProcessor.class)
.web(WebApplicationType.NONE)
.run("--server.port=0")) {
BinderFactory binderFactory = context.getBean(BinderFactory.class);
RabbitMessageChannelBinder rabbitBinder = (RabbitMessageChannelBinder) binderFactory.getBinder(null,
MessageChannel.class);
RabbitProducerProperties rProps = new RabbitProducerProperties();
rProps.setProducerType(ProducerType.STREAM_SYNC);
ExtendedProducerProperties<RabbitProducerProperties> props =
new ExtendedProducerProperties<RabbitProducerProperties>(rProps);
Binding<MessageChannel> binding = rabbitBinder.bindProducer("testStream", new DirectChannel(), props);
Object handler = TestUtils.getPropertyValue(binding, "val$producerMessageHandler");
assertThat(handler).isInstanceOf(RabbitStreamMessageHandler.class);
}
}
@EnableAutoConfiguration
@Configuration
@Import(IntegrationAutoConfiguration.class)
public static class SimpleProcessor {
@Bean
ConnectionFactory cf() {
return new CachingConnectionFactory(RABBITMQ.getMappedPort(5672));
}
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
};
}
@Bean
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
};
}
@Bean
Environment rabbitStreamEnvironment(ConsumerBuilder consumerBuilder, ProducerBuilder producerBuilder) {
Environment env = mock(Environment.class);
given(env.consumerBuilder()).willReturn(consumerBuilder);
given(env.producerBuilder()).willReturn(producerBuilder);
return env;
}
@Bean
ConsumerBuilder consumerBuilder() {
ConsumerBuilder mock = mock(ConsumerBuilder.class);
given(mock.superStream(anyString())).willReturn(mock);
given(mock.singleActiveConsumer()).willReturn(mock);
return mock;
}
@Bean
ProducerBuilder producerBuilder() {
return mock(ProducerBuilder.class);
}
}
}