RabbitBindingCleaner.java

/*
 * Copyright 2015-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.rabbit.admin;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BindingCleaner;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;

/**
 * Implementation of {@link org.springframework.cloud.stream.binder.BindingCleaner} for
 * the {@code RabbitBinder}.
 *
 * @author Gary Russell
 * @author David Turanski
 * @since 1.2
 */
public class RabbitBindingCleaner implements BindingCleaner {

	private static final Log logger = LogFactory.getLog(RabbitBindingCleaner.class);

	private static final String PREFIX_DELIMITER = ".";

	/**
	 * Binder prefix.
	 */
	public static final String BINDER_PREFIX = "binder" + PREFIX_DELIMITER;

	@Override
	public Map<String, List<String>> clean(String entity, boolean isJob) {
		return clean("http://localhost:15672/api", "guest", "guest", "/", BINDER_PREFIX,
				entity, isJob);
	}

	public Map<String, List<String>> clean(String adminUri, String user, String pw,
			String vhost, String binderPrefix, String entity, boolean isJob) {

		try {
			WebClient client = WebClient.builder()
					.filter(ExchangeFilterFunctions.basicAuthentication(user, pw))
					.build();
			URI uri = new URI(adminUri);
			return doClean(client, uri,
					vhost == null ? "/" : vhost,
					binderPrefix == null ? BINDER_PREFIX : binderPrefix, entity, isJob);
		}
		catch (URISyntaxException e) {
			throw new RabbitAdminException("Couldn't create a Client", e);
		}
	}

	private Map<String, List<String>> doClean(WebClient client,
			URI uri, String vhost, String binderPrefix, String entity, boolean isJob) {

		LinkedList<String> removedQueues = isJob ? null
				: findStreamQueues(client, uri, vhost, binderPrefix, entity);
		List<String> removedExchanges = findExchanges(client, uri, vhost, binderPrefix, entity);
		// Delete the queues in reverse order to enable re-running after a partial
		// success.
		// The queue search above starts with 0 and terminates on a not found.
		if (removedQueues != null) {
			removedQueues.descendingIterator().forEachRemaining(q -> {
				deleteQueue(client, uri, vhost, q);
				if (logger.isDebugEnabled()) {
					logger.debug("deleted queue: " + q);
				}
			});
		}
		Map<String, List<String>> results = new HashMap<>();
		if (removedQueues.size() > 0) {
			results.put("queues", removedQueues);
		}
		// Fanout exchanges for taps
		removedExchanges.forEach(exchange -> {
			deleteExchange(client, uri, vhost, exchange);
			if (logger.isDebugEnabled()) {
				logger.debug("deleted exchange: " + exchange);
			}
		});
		if (removedExchanges.size() > 0) {
			results.put("exchanges", removedExchanges);
		}
		return results;
	}

	private void deleteQueue(WebClient client, URI uri, String vhost, String q) {
		URI deleteURI = uri
				.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + q);
		client.delete()
				.uri(deleteURI)
				.retrieve()
				.toEntity(Void.class)
				.block(Duration.ofSeconds(10));
	}

	private void deleteExchange(WebClient client, URI uri, String vhost, String ex) {
		URI deleteURI = uri
				.resolve("/api/exchanges/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + ex);
		client.delete()
				.uri(deleteURI)
				.retrieve()
				.toEntity(Void.class)
				.block(Duration.ofSeconds(10));
	}

	private LinkedList<String> findStreamQueues(WebClient client, URI uri, String vhost, String binderPrefix,
			String stream) {

		String queueNamePrefix = adjustPrefix(AbstractBinder.applyPrefix(binderPrefix, stream));
		List<Map<String, Object>> queues = getQueues(client, uri, vhost);
		return queues.stream()
			.filter(q -> ((String) q.get("name")).startsWith(queueNamePrefix))
			.map(q -> checkNoConsumers(q))
			.collect(Collectors.toCollection(LinkedList::new));
	}

	private List<Map<String, Object>> getQueues(WebClient client, URI uri, String vhost) {
		URI getUri = uri
				.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/");
		return client.get()
				.uri(getUri)
				.retrieve()
				.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
				})
				.block(Duration.ofSeconds(10));
	}

	private String adjustPrefix(String prefix) {
		if (prefix.endsWith("*")) {
			return prefix.substring(0, prefix.length() - 1);
		}
		else {
			return prefix + PREFIX_DELIMITER;
		}
	}

	private String checkNoConsumers(Map<String, Object> queue) {
		if ((Integer) queue.get("consumers") != 0) {
			throw new RabbitAdminException("Queue " + queue.get("name") + " is in use");
		}
		return (String) queue.get("name");
	}

	private List<String> findExchanges(WebClient client, URI uri, String vhost, String binderPrefix, String entity) {
		List<Map<String, Object>> exchanges = getExchanges(client, uri, vhost);
		String exchangeNamePrefix = adjustPrefix(AbstractBinder.applyPrefix(binderPrefix, entity));
		List<String> exchangesToRemove = exchanges.stream()
				.filter(e -> ((String) e.get("name")).startsWith(exchangeNamePrefix))
				.map(e -> {
					List<Map<String, Object>> bindingsBySource =
							getBindingsBySource(client, uri, vhost, (String) e.get("name"));
					return Collections.singletonMap((String) e.get("name"), bindingsBySource);
				})
				.map(bindingsMap -> hasNoForeignBindings(bindingsMap, exchangeNamePrefix))
				.collect(Collectors.toList());
		exchangesToRemove.stream()
				.map(exchange -> getExchangeBindingsByDestination(client, uri, vhost, exchange))
				.forEach(bindings -> {
					if (bindings.size() > 0) {
						throw new RabbitAdminException("Cannot delete exchange "
								+ bindings.get(0).get("destination") + "; it is a destination: " + bindings);
					}
				});
		return exchangesToRemove;
	}

	private List<Map<String, Object>> getExchangeBindingsByDestination(WebClient client, URI uri, String vhost,
			String name) {

		String exchange = "".equals(name) ? "amq.default" : name;
		URI getUri = uri
				.resolve("/api/exchanges/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/"
						+ UriUtils.encodePathSegment(exchange, StandardCharsets.UTF_8) + "/bindings/destination");
		return client.get()
				.uri(getUri)
				.retrieve()
				.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
				})
				.block(Duration.ofSeconds(10));
	}

	private List<Map<String, Object>> getBindingsBySource(WebClient client, URI uri, String vhost, String name) {
		String exchange = "".equals(name) ? "amq.default" : name;
		URI getUri = uri
				.resolve("/api/exchanges/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/"
						+ UriUtils.encodePathSegment(exchange, StandardCharsets.UTF_8) + "/bindings/source");
		return client.get()
				.uri(getUri)
				.retrieve()
				.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
				})
				.block(Duration.ofSeconds(10));
	}

	private List<Map<String, Object>> getExchanges(WebClient client, URI uri, String vhost) {
		URI getUri = uri
				.resolve("/api/exchanges/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/");
		return client.get()
				.uri(getUri)
				.retrieve()
				.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
				})
				.block(Duration.ofSeconds(10));
	}

	private String hasNoForeignBindings(Map<String, List<Map<String, Object>>> bindings, String exchangeNamePrefix) {
		Entry<String, List<Map<String, Object>>> next = bindings.entrySet().iterator().next();
		for (Map<String, Object> binding : next.getValue()) {
			if (!"queue".equals(binding.get("destination_type"))
					|| !((String) binding.get("destination")).startsWith(exchangeNamePrefix)) {
				throw new RabbitAdminException("Cannot delete exchange "
						+ next.getKey() + "; it has bindings: " + bindings);
			}
		}
		return next.getKey();
	}

}