OutputDestination.java

/*
 * Copyright 2017-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.test;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.messaging.Message;
import org.springframework.util.StringUtils;

/**
 * Implementation of binder endpoint that represents the target destination (e.g.,
 * destination which receives messages sent to Processor.OUTPUT) <br>
 * You can interact with it by calling {@link #receive()} operation.
 *
 * @author Oleg Zhurakousky
 *
 */
public class OutputDestination extends AbstractDestination {

	private final Log log = LogFactory.getLog(OutputDestination.class);

	private final ConcurrentHashMap<String, BlockingQueue<Message<byte[]>>> messageQueues = new ConcurrentHashMap<>();

	public Message<byte[]> receive(long timeout, String bindingName) {
		try {
			bindingName = bindingName.endsWith(".destination") ? bindingName : bindingName + ".destination";
			return this.outputQueue(bindingName).poll(timeout, TimeUnit.MILLISECONDS);
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
		return null;
	}

	/**
	 * Will clear all output destinations.
	 *
	 * @since 3.0.6
	 */
	public void clear() {
		this.messageQueues.values().forEach(v -> v.clear());
	}

	/**
	 * Will clear output destination with specified name.
	 *
	 * @param destinationName the name of the output destination to be cleared.
	 * @return true if attempt to clear specific destination is successful otherwise false.
	 * @since 3.0.6
	 */
	public boolean clear(String destinationName) {
		String queueName = destinationName.endsWith(".destination") ? destinationName : destinationName + ".destination";
		if (StringUtils.hasText(destinationName) && this.messageQueues.containsKey(queueName)) {
			this.messageQueues.get(queueName).clear();
			return true;
		}
		return false;
	}

	/**
	 * Allows to access {@link Message}s received by this {@link OutputDestination}.
	 * This is a convenience method for cases when you only have one binding.
	 * For all other cases use {@link #receive(long, String)} method.
	 *
	 * @return received message
	 */
	public Message<byte[]> receive() {
		return this.receive(0, 0);
	}

	/**
	 * Allows to access {@link Message}s received by this {@link OutputDestination}.
	 * This is a convenience method for cases when you only have one binding.
	 * For all other cases use {@link #receive(long, String)} method.
	 *
	 * @param timeout timeout for receiving message
	 * @return received message
	 */
	public Message<byte[]> receive(long timeout) {
		return this.receive(timeout, 0);
	}

	@SuppressWarnings("unchecked")
	@Override
	void afterChannelIsSet(int channelIndex, String bindingName) {
		if (((AbstractSubscribableChannel) this.getChannelByName(bindingName)).getSubscriberCount() < 1) {
			this.getChannelByName(bindingName).subscribe(message -> this.outputQueue(bindingName).offer((Message<byte[]>) message));
		}
	}

	private BlockingQueue<Message<byte[]>> outputQueue(String bindingName) {
		this.messageQueues.putIfAbsent(bindingName, new LinkedTransferQueue<>());
		return this.messageQueues.get(bindingName);
	}

	private Message<byte[]> receive(long timeout, int bindingIndex) {
		log.warn("!!!While 'receive(long timeout, int bindingIndex)' method may still work it is deprecated no longer supported. "
			+ "It will be removed after 3.1.3 release. Please use 'receive(long timeout, String bindingName)'");
		try {
			BlockingQueue<Message<byte[]>> destinationQueue = (new ArrayList<>(this.messageQueues.values())).get(bindingIndex);
			return destinationQueue.poll(timeout, TimeUnit.MILLISECONDS);
		}
		catch (Exception e) {
			Thread.currentThread().interrupt();
		}
		return null;
	}
}