RabbitStreamBinderModuleIntegrationTests.java
/*
* Copyright 2021-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit.stream2;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.OffsetSpecification;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.amqp.autoconfigure.EnvironmentBuilderCustomizer;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties.ContainerType;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @author Soby Chacko
*/
class RabbitStreamBinderModuleIntegrationTests {
private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();
@Test
void superStreamContainer() throws InterruptedException {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleProcessor.class)
.web(WebApplicationType.NONE)
.run("--server.port=0")) {
BinderFactory binderFactory = context.getBean(BinderFactory.class);
RabbitMessageChannelBinder rabbitBinder = (RabbitMessageChannelBinder) binderFactory.getBinder(null,
MessageChannel.class);
RabbitConsumerProperties rProps = new RabbitConsumerProperties();
rProps.setContainerType(ContainerType.STREAM);
rProps.setSuperStream(true);
ExtendedConsumerProperties<RabbitConsumerProperties> props =
new ExtendedConsumerProperties<RabbitConsumerProperties>(rProps);
props.setAutoStartup(false);
props.setInstanceCount(3);
props.setConcurrency(3);
Binding<MessageChannel> binding = rabbitBinder.bindConsumer("testSuperStream", "grp", new QueueChannel(), props);
Object container = TestUtils.getPropertyValue(binding, "lifecycle.messageListenerContainer");
assertThat(container).isInstanceOf(StreamListenerContainer.class);
assertThat(container).extracting("concurrency").isEqualTo(3);
((StreamListenerContainer) container).start();
assertThat(context.getBean(SimpleProcessor.class).consumerCountLatch.await(10, TimeUnit.SECONDS)).isTrue();
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
System.out.println(RABBITMQ.getMappedPort(15672));
for (int i = 0; i < 9; i++) {
Properties qProps = admin.getQueueProperties("testSuperStream-" + i);
assertThat(qProps).describedAs("Expected queue with index %d to exist", i).isNotNull();
}
((StreamListenerContainer) container).stop();
}
}
@EnableAutoConfiguration
@Configuration
public static class SimpleProcessor {
final CountDownLatch consumerCountLatch = new CountDownLatch(3);
@Bean
ConnectionFactory cf() {
return new CachingConnectionFactory(RABBITMQ.getMappedPort(5672));
}
@Bean
RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
EnvironmentBuilderCustomizer environmenCustomizer() {
return env -> env
.addressResolver(add -> new Address("localhost", RABBITMQ.getMappedPort(5552)));
}
@Bean
ListenerContainerCustomizer<MessageListenerContainer> containerCustomizer() {
return (cont, dest, group) -> {
if (cont instanceof StreamListenerContainer container) {
container.setConsumerCustomizer((id, builder) -> {
builder.consumerUpdateListener(context -> {
this.consumerCountLatch.countDown();
return OffsetSpecification.first();
});
});
}
};
}
}
}