RabbitBinderCleanerTests.java
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.RabbitMQContainer;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.rabbit.admin.RabbitAdminException;
import org.springframework.cloud.stream.binder.rabbit.admin.RabbitBindingCleaner;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @author Chris Bono
* @since 1.2
*/
class RabbitBinderCleanerTests {
private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();
private static final String BINDER_PREFIX = "binder.";
private static final WebClient client;
static {
client = WebClient.builder()
.filter(ExchangeFilterFunctions
.basicAuthentication(RABBITMQ.getAdminUsername(), RABBITMQ.getAdminPassword()))
.build();
}
@RegisterExtension
private final RabbitTestSupport rabbitTestSupport = new RabbitTestSupport(true, RABBITMQ.getAmqpPort(),
RABBITMQ.getHttpPort());
@Test
void cleanStream() {
final RabbitBindingCleaner cleaner = new RabbitBindingCleaner();
final String stream1 = new Base64UrlNamingStrategy("foo").generateName();
String stream2 = stream1 + "-1";
String firstQueue = null;
CachingConnectionFactory connectionFactory = rabbitTestSupport.getResource();
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
for (int i = 0; i < 5; i++) {
String queue1Name = AbstractBinder.applyPrefix(BINDER_PREFIX,
stream1 + ".default." + i);
String queue2Name = AbstractBinder.applyPrefix(BINDER_PREFIX,
stream2 + ".default." + i);
if (firstQueue == null) {
firstQueue = queue1Name;
}
rabbitAdmin.declareQueue(new Queue(queue1Name, true, false, false));
rabbitAdmin.declareQueue(new Queue(queue2Name, true, false, false));
rabbitAdmin.declareQueue(new Queue(AbstractBinder.constructDLQName(queue1Name), true, false, false));
TopicExchange exchange = new TopicExchange(queue1Name);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(queue1Name))
.to(exchange).with(queue1Name));
exchange = new TopicExchange(queue2Name);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(queue2Name))
.to(exchange).with(queue2Name));
}
final TopicExchange topic1 = new TopicExchange(
AbstractBinder.applyPrefix(BINDER_PREFIX, stream1 + ".foo.bar"));
rabbitAdmin.declareExchange(topic1);
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue(firstQueue)).to(topic1).with("#"));
String foreignQueue = UUID.randomUUID().toString();
rabbitAdmin.declareQueue(new Queue(foreignQueue));
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue(foreignQueue)).to(topic1).with("#"));
final TopicExchange topic2 = new TopicExchange(
AbstractBinder.applyPrefix(BINDER_PREFIX, stream2 + ".foo.bar"));
rabbitAdmin.declareExchange(topic2);
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue(firstQueue)).to(topic2).with("#"));
new RabbitTemplate(connectionFactory).execute(new ChannelCallback<Void>() {
@Override
public Void doInRabbit(Channel channel) throws Exception {
String queueName = AbstractBinder.applyPrefix(BINDER_PREFIX,
stream1 + ".default." + 4);
String consumerTag = channel.basicConsume(queueName,
new DefaultConsumer(channel));
try {
waitForConsumerState(queueName, 1);
doClean(cleaner, stream1, false);
Assertions.fail("Expected exception");
}
catch (RabbitAdminException e) {
assertThat(e)
.hasMessageContaining("Queue " + queueName + " is in use");
}
channel.basicCancel(consumerTag);
waitForConsumerState(queueName, 0);
try {
doClean(cleaner, stream1, false);
Assertions.fail("Expected exception");
}
catch (RabbitAdminException e) {
assertThat(e).hasMessageContaining("Cannot delete exchange ");
assertThat(e).hasMessageContaining("; it has bindings:");
}
return null;
}
private void waitForConsumerState(String queueName, long state)
throws InterruptedException, URISyntaxException {
int n = 0;
Map<String, Object> queue = getQueue("/", queueName);
while (n++ < 100 && !requiredState(state, queue)) {
Thread.sleep(100);
queue = getQueue("/", queueName);
}
assertThat(n).withFailMessage(
"Consumer state remained at " + state + " after 10 seconds")
.isLessThan(100);
}
private boolean requiredState(long state, Map<String, Object> queue) {
Object consumers = queue.get("consumers");
return state == 0
? consumers == null || (Integer) consumers == 0
: consumers != null && (Integer) consumers == state;
}
});
rabbitAdmin.deleteExchange(topic1.getName()); // easier than deleting the binding
rabbitAdmin.declareExchange(topic1);
rabbitAdmin.deleteQueue(foreignQueue);
connectionFactory.destroy();
Map<String, List<String>> cleanedMap = doClean(cleaner, stream1, false);
assertThat(cleanedMap).hasSize(2);
List<String> cleanedQueues = cleanedMap.get("queues");
// should *not* clean stream2
assertThat(cleanedQueues).hasSize(10);
for (int i = 0; i < 5; i++) {
assertThat(cleanedQueues.get(i * 2))
.isEqualTo(BINDER_PREFIX + stream1 + ".default." + i);
assertThat(cleanedQueues.get(i * 2 + 1))
.isEqualTo(BINDER_PREFIX + stream1 + ".default." + i + ".dlq");
}
List<String> cleanedExchanges = cleanedMap.get("exchanges");
assertThat(cleanedExchanges).hasSize(6);
// wild card *should* clean stream2
cleanedMap = doClean(cleaner, stream1 + "*", false);
assertThat(cleanedMap).hasSize(2);
cleanedQueues = cleanedMap.get("queues");
assertThat(cleanedQueues).hasSize(5);
for (int i = 0; i < 5; i++) {
assertThat(cleanedQueues.get(i))
.isEqualTo(BINDER_PREFIX + stream2 + ".default." + i);
}
cleanedExchanges = cleanedMap.get("exchanges");
assertThat(cleanedExchanges).hasSize(6);
}
protected Map<String, Object> getQueue(String string, String queueName) throws URISyntaxException {
URI uri = new URI(RABBITMQ.getHttpUrl())
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queueName);
return client.get()
.uri(uri)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
})
.block(Duration.ofSeconds(10));
}
private static Map<String, List<String>> doClean(RabbitBindingCleaner cleaner, String entity, boolean isJob) {
return cleaner.clean(RABBITMQ.getHttpUrl() + "/api", RABBITMQ.getAdminUsername(), RABBITMQ.getAdminPassword(),
"/", BINDER_PREFIX, entity, isJob);
}
}