PulsarMessageChannelBinder.java
/*
* Copyright 2022-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.pulsar;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarConsumerProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarExtendedBindingProperties;
import org.springframework.cloud.stream.binder.pulsar.properties.PulsarProducerProperties;
import org.springframework.cloud.stream.binder.pulsar.provisioning.PulsarTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TypedMessageBuilderCustomizer;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.support.header.PulsarHeaderMapper;
/**
* {@link Binder} implementation for Apache Pulsar.
*
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>, PulsarTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, PulsarConsumerProperties, PulsarProducerProperties> {
private final PulsarTemplate<Object> pulsarTemplate;
private final PulsarConsumerFactory<?> pulsarConsumerFactory;
private final PulsarBinderConfigurationProperties binderConfigProps;
private final SchemaResolver schemaResolver;
private final PulsarHeaderMapper headerMapper;
private PulsarExtendedBindingProperties extendedBindingProperties = new PulsarExtendedBindingProperties();
public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<?> pulsarConsumerFactory,
PulsarBinderConfigurationProperties binderConfigProps, SchemaResolver schemaResolver,
PulsarHeaderMapper headerMapper) {
super(null, provisioningProvider);
this.pulsarTemplate = pulsarTemplate;
this.pulsarConsumerFactory = pulsarConsumerFactory;
this.binderConfigProps = binderConfigProps;
this.schemaResolver = schemaResolver;
this.headerMapper = headerMapper;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<PulsarProducerProperties> producerProperties, MessageChannel errorChannel) {
final Schema<Object> schema;
if (producerProperties.isUseNativeEncoding()) {
var schemaType = Optional.ofNullable(producerProperties.getExtension().getSchemaType())
.orElse(SchemaType.NONE);
schema = this.schemaResolver
.resolveSchema(schemaType, producerProperties.getExtension().getMessageType(),
producerProperties.getExtension().getMessageKeyType())
.orElseThrow(() -> "Could not determine producer schema for " + destination.getName());
}
else {
schema = null;
}
var layeredProducerProps = PulsarBinderUtils.mergeModifiedProducerProperties(
this.binderConfigProps.getProducer(), producerProperties.getExtension());
var handler = new PulsarProducerConfigurationMessageHandler(this.pulsarTemplate, schema, destination.getName(),
(builder) -> PulsarBinderUtils.loadConf(builder, layeredProducerProps),
determineOutboundHeaderMapper(producerProperties));
handler.setApplicationContext(getApplicationContext());
handler.setBeanFactory(getBeanFactory());
return handler;
}
@Nullable
private PulsarBinderHeaderMapper determineOutboundHeaderMapper(
ExtendedProducerProperties<PulsarProducerProperties> extProducerProps) {
if (HeaderMode.none.equals(extProducerProps.getHeaderMode())) {
return null;
}
return new PulsarBinderHeaderMapper(this.headerMapper);
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> properties) {
var containerProperties = new PulsarContainerProperties();
containerProperties.setTopics(Set.of(destination.getName()));
var inboundHeaderMapper = determineInboundHeaderMapper(properties);
var messageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter();
containerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, pulsarMsg) -> {
var springMessage = (inboundHeaderMapper != null)
? MessageBuilder.createMessage(pulsarMsg.getValue(), inboundHeaderMapper.toSpringHeaders(pulsarMsg))
: MessageBuilder.withPayload(pulsarMsg.getValue()).build();
messageDrivenChannelAdapter.send(springMessage);
});
if (properties.isUseNativeDecoding()) {
var schemaType = Optional.ofNullable(properties.getExtension().getSchemaType()).orElse(SchemaType.NONE);
var schema = this.schemaResolver
.resolveSchema(schemaType, properties.getExtension().getMessageType(),
properties.getExtension().getMessageKeyType())
.orElseThrow(() -> "Could not determine consumer schema for " + destination.getName());
containerProperties.setSchema(schema);
}
else {
containerProperties.setSchema(Schema.BYTES);
}
var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
containerProperties.setSubscriptionName(subscriptionName);
var layeredConsumerProps = PulsarBinderUtils
.mergeModifiedConsumerProperties(this.binderConfigProps.getConsumer(), properties.getExtension());
containerProperties.getPulsarConsumerProperties().putAll(layeredConsumerProps);
containerProperties.updateContainerProperties();
var container = new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties);
messageDrivenChannelAdapter.setMessageListenerContainer(container);
messageDrivenChannelAdapter.setApplicationContext(getApplicationContext());
messageDrivenChannelAdapter.setBeanFactory(getApplicationContext().getBeanFactory());
return messageDrivenChannelAdapter;
}
@Nullable
private PulsarBinderHeaderMapper determineInboundHeaderMapper(
ExtendedConsumerProperties<PulsarConsumerProperties> extConsumerProps) {
if (HeaderMode.none.equals(extConsumerProps.getHeaderMode())) {
return null;
}
return new PulsarBinderHeaderMapper(this.headerMapper);
}
@Override
public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public PulsarProducerProperties getExtendedProducerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public String getDefaultsPrefix() {
return null;
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return null;
}
public PulsarExtendedBindingProperties getExtendedBindingProperties() {
return this.extendedBindingProperties;
}
public void setExtendedBindingProperties(PulsarExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
static class PulsarMessageDrivenChannelAdapter extends MessageProducerSupport {
AbstractPulsarMessageListenerContainer<?> messageListenerContainer;
public void send(Message<?> message) {
sendMessage(message);
}
@Override
protected void doStart() {
this.messageListenerContainer.start();
}
@Override
protected void doStop() {
this.messageListenerContainer.stop();
}
public void setMessageListenerContainer(AbstractPulsarMessageListenerContainer<?> messageListenerContainer) {
this.messageListenerContainer = messageListenerContainer;
}
}
static class PulsarProducerConfigurationMessageHandler extends AbstractMessageProducingHandler
implements ManageableLifecycle {
private final PulsarTemplate<Object> pulsarTemplate;
private final Schema<Object> schema;
private final String destination;
private final ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer;
private final PulsarHeaderMapper headerMapper;
private boolean running = true;
PulsarProducerConfigurationMessageHandler(PulsarTemplate<Object> pulsarTemplate, Schema<Object> schema,
String destination, ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer,
PulsarHeaderMapper headerMapper) {
this.pulsarTemplate = pulsarTemplate;
this.schema = schema;
this.destination = destination;
this.layeredProducerPropsCustomizer = layeredProducerPropsCustomizer;
this.headerMapper = headerMapper;
}
@Override
public void start() {
try {
super.onInit();
}
catch (Exception ex) {
this.logger.error(ex, "Initialization errors: ");
throw new RuntimeException(ex);
}
}
@Override
public void stop() {
// TODO - should we close the underlyiung producer?
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
protected void handleMessageInternal(Message<?> message) {
try {
// @formatter:off
this.pulsarTemplate.newMessage(message.getPayload())
.withTopic(this.destination)
.withSchema(this.schema)
.withProducerCustomizer(this.layeredProducerPropsCustomizer)
.withMessageCustomizer(this.applySpringHeadersAsPulsarProperties(message.getHeaders()))
.sendAsync();
// @formatter:on
}
catch (Exception ex) {
logger.trace(ex, "Failed to send message to destination: " + this.destination);
}
}
private TypedMessageBuilderCustomizer<Object> applySpringHeadersAsPulsarProperties(MessageHeaders headers) {
return (mb) -> {
if (this.headerMapper != null) {
this.headerMapper.toPulsarHeaders(headers).forEach(mb::property);
}
};
}
}
}