StreamUtils.java
/*
* Copyright 2021-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit;
import java.util.Map;
import java.util.function.Function;
import com.rabbitmq.stream.Environment;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitProducerProperties.ProducerType;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
/**
* Utilities for stream components. Used to prevent a hard runtime dependency on
* spring-rabbit-stream.
*
* @author Gary Russell
* @author Artem Bilan
* @since 3.2
*
*/
public final class StreamUtils {
private StreamUtils() {
}
/**
* Create a {@link StreamListenerContainer}.
*
* @param consumerDestination the destination.
* @param group the group.
* @param properties the properties.
* @param destination the destination.
* @param applicationContext the application context.
* @return the container.
*/
public static ObservableListenerContainer createContainer(ConsumerDestination consumerDestination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties, String destination,
ApplicationContext applicationContext) {
RabbitConsumerProperties extension = properties.getExtension();
StreamListenerContainer container = new StreamListenerContainer(applicationContext.getBean(Environment.class)) {
@Override
public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
super.setConsumerCustomizer((id, builder) -> {
if (!properties.getExtension().isSuperStream()) {
builder.name(consumerDestination.getName() + "." + group);
}
consumerCustomizer.accept(id, builder);
});
}
};
container.setBeanName(consumerDestination.getName() + "." + group + ".container");
String beanName = extension.getStreamStreamMessageConverterBeanName();
if (beanName != null) {
container.setStreamConverter(applicationContext.getBean(beanName, StreamMessageConverter.class));
}
if (properties.getExtension().isSuperStream()) {
container.superStream(consumerDestination.getName(), consumerDestination.getName() + "." + group,
properties.getConcurrency());
}
return container;
}
/**
* Configure the channel adapter for streams support.
* @param adapter the adapter.
*/
public static void configureAdapter(AmqpInboundChannelAdapter adapter) {
adapter.setHeaderMapper(new AmqpHeaderMapper() {
AmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
@Override
public Map<String, Object> toHeadersFromRequest(MessageProperties source) {
Map<String, Object> headers = this.mapper.toHeadersFromRequest(source);
headers.put("rabbitmq_streamContext", ((StreamMessageProperties) source).getContext());
return headers;
}
@Override
public Map<String, Object> toHeadersFromReply(MessageProperties source) {
return null;
}
@Override
public void fromHeadersToRequest(MessageHeaders headers, MessageProperties target) {
}
@Override
public void fromHeadersToReply(MessageHeaders headers, MessageProperties target) {
}
});
}
/**
* Create a {@link RabbitStreamMessageHandler}.
*
* @param producerDestination the destination.
* @param producerProperties the properties.
* @param errorChannel the error channel
* @param destination the destination.
* @param extendedProperties the extended properties.
* @param applicationContext the application context.
* @param headerMapperFunction the header mapper function.
* @return the handler.
*/
public static MessageHandler createStreamMessageHandler(ProducerDestination producerDestination,
ExtendedProducerProperties<RabbitProducerProperties> producerProperties, MessageChannel errorChannel,
String destination, RabbitProducerProperties extendedProperties,
AbstractApplicationContext applicationContext,
Function<RabbitProducerProperties, AmqpHeaderMapper> headerMapperFunction) {
RabbitStreamTemplate template = new RabbitStreamTemplate(applicationContext.getBean(Environment.class),
producerDestination.getName());
if (extendedProperties.isSuperStream()) {
template.setSuperStreamRouting(message -> {
Object property = message.getApplicationProperties().getOrDefault(BinderHeaders.PARTITION_HEADER, "0");
message.getApplicationProperties().remove(BinderHeaders.PARTITION_HEADER);
return "" + property;
});
}
String beanName = extendedProperties.getStreamMessageConverterBeanName();
if (beanName != null) {
template.setMessageConverter(applicationContext.getBean(beanName, MessageConverter.class));
}
beanName = extendedProperties.getStreamStreamMessageConverterBeanName();
if (beanName != null) {
template.setStreamConverter(applicationContext.getBean(beanName, StreamMessageConverter.class));
}
RabbitStreamMessageHandler handler = new RabbitStreamMessageHandler(template);
handler.setApplicationContext(applicationContext);
handler.setBeanFactory(applicationContext.getBeanFactory());
if (errorChannel != null) {
handler.setSendFailureChannel(errorChannel);
}
beanName = extendedProperties.getConfirmAckChannel();
if (beanName != null) {
handler.setSendSuccessChannelName(beanName);
}
handler.setHeaderMapper(headerMapperFunction.apply(extendedProperties));
handler.setSync(ProducerType.STREAM_SYNC.equals(producerProperties.getExtension().getProducerType()));
return handler;
}
}