PulsarBinderIntegrationTests.java

/*
 * Copyright 2023-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.pulsar;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.support.header.PulsarHeaderMapper;
import org.springframework.pulsar.support.header.ToStringPulsarHeaderMapper;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * Integration tests for {@link PulsarBinderIntegrationTests}.
 *
 * @author Soby Chacko
 * @author Chris Bono
 */
@ExtendWith(OutputCaptureExtension.class)
@SuppressWarnings("JUnitMalformedDeclaration")
class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {

	private static final int AWAIT_DURATION = 10;

	@Test
	void binderAndBindingPropsAreAppliedAndRespected(CapturedOutput output) {
		SpringApplication app = new SpringApplication(BinderAndBindingPropsTestConfig.class);
		app.setWebApplicationType(WebApplicationType.NONE);
		try (ConfigurableApplicationContext context = app.run(
				"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
				"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
				"--spring.pulsar.producer.cache.enabled=false",
				"--spring.cloud.function.definition=textSupplier;textLogger",
				"--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0",
				"--spring.pulsar.producer.name=textSupplierProducer-fromBase",
				"--spring.cloud.stream.pulsar.binder.producer.name=textSupplierProducer-fromBinder",
				"--spring.cloud.stream.pulsar.bindings.textSupplier-out-0.producer.name=textSupplierProducer-fromBinding",
				"--spring.cloud.stream.pulsar.binder.producer.max-pending-messages=1100",
				"--spring.cloud.stream.pulsar.binder.producer.block-if-queue-full=true",
				"--spring.cloud.stream.pulsar.binder.consumer.subscription.name=textLoggerSub-fromBinder",
				"--spring.cloud.stream.pulsar.binder.consumer.name=textLogger-fromBinder",
				"--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.name=textLogger-fromBinding")) {

			Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
					.until(() -> output.toString().contains("Hello binder: test-basic-scenario"));

			// now verify the properties were set onto producer and consumer as expected
			TrackingProducerFactory producerFactory = context.getBean(TrackingProducerFactory.class);
			assertThat(producerFactory.producersCreated).isNotEmpty().element(0)
					.hasFieldOrPropertyWithValue("producerName", "textSupplierProducer-fromBinding")
					.hasFieldOrPropertyWithValue("conf.maxPendingMessages", 1100)
					.hasFieldOrPropertyWithValue("conf.blockIfQueueFull", true);

			TrackingConsumerFactory consumerFactory = context.getBean(TrackingConsumerFactory.class);
			assertThat(consumerFactory.consumersCreated).isNotEmpty().element(0)
					.hasFieldOrPropertyWithValue("consumerName", "textLogger-fromBinding")
					.hasFieldOrPropertyWithValue("conf.subscriptionName", "textLoggerSub-fromBinder");
		}
	}

	@Nested
	class DefaultEncoding {

		@Test
		void primitiveTypeString(CapturedOutput output) {
			SpringApplication app = new SpringApplication(PrimitiveTextConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=textSupplier;textLogger",
					"--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0",
					"--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.subscription.name=pbit-text-sub1")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: test-basic-scenario"));
			}
		}

		@Test
		void primitiveTypeFloat(CapturedOutput output) {
			SpringApplication app = new SpringApplication(PrimitiveFloatConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=piSupplier;piLogger",
					"--spring.cloud.stream.bindings.piSupplier-out-0.destination=pi-stream",
					"--spring.cloud.stream.bindings.piLogger-in-0.destination=pi-stream",
					"--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription.name=pbit-float-sub1")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 3.14"));
			}
		}

	}

	@Nested
	class NativeEncoding {

		@Test
		void primitiveTypeFloat(CapturedOutput output) {
			SpringApplication app = new SpringApplication(PrimitiveFloatConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=piSupplier;piLogger",
					"--spring.cloud.stream.bindings.piLogger-in-0.destination=piSupplier-out-0",
					"--spring.cloud.stream.bindings.piSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.piSupplier-out-0.producer.schema-type=FLOAT",
					"--spring.cloud.stream.bindings.piLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.schema-type=FLOAT",
					"--spring.cloud.stream.pulsar.bindings.piLogger-in-0.consumer.subscription.name=pbit-float-sub2")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 3.14"));
			}
		}

		@Test
		void jsonTypeFooWithSchemaType(CapturedOutput output) {
			SpringApplication app = new SpringApplication(JsonFooConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=fooSupplier;fooLogger",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.destination=foo-stream-1",
					"--spring.cloud.stream.bindings.fooLogger-in-0.destination=foo-stream-1",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooSupplier-out-0.producer.schema-type=JSON",
					"--spring.cloud.stream.pulsar.bindings.fooSupplier-out-0.producer.message-type="
							+ Foo.class.getName(),
					"--spring.cloud.stream.bindings.fooLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.schema-type=JSON",
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-foo-sub1")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: Foo[value=5150]"));
			}
		}

		@Test
		void jsonTypeFooWithoutSchemaTypeDefaultsToJsonSchema(CapturedOutput output) {
			SpringApplication app = new SpringApplication(JsonFooConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=fooSupplier;fooLogger",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.destination=foo-stream-2",
					"--spring.cloud.stream.bindings.fooLogger-in-0.destination=foo-stream-2",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooSupplier-out-0.producer.message-type="
							+ Foo.class.getName(),
					"--spring.cloud.stream.bindings.fooLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-foo-sub2")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: Foo[value=5150]"));
			}
		}

		@Test
		void avroTypeUserWithSchemaType(CapturedOutput output) {
			SpringApplication app = new SpringApplication(AvroUserConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-1",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-1",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.schema-type=AVRO",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.schema-type=AVRO",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub1")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}"));
			}
		}

		@Test
		void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaProps(CapturedOutput output) {
			SpringApplication app = new SpringApplication(AvroUserConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-2",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-2",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub2",
					"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
					"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}"));
			}
		}

		@Test
		void avroTypeUserWithoutSchemaTypeWithCustomMappingsViaCustomizer(CapturedOutput output) {
			SpringApplication app = new SpringApplication(AvroUserConfigCustomMappings.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=user-stream-3",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=user-stream-3",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-user-sub3")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}"));
			}
		}

		@Test
		void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutput output) {
			SpringApplication app = new SpringApplication(KeyValueAvroUserConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-1",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-1",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.schema-type=KEY_VALUE",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.schema-type=KEY_VALUE",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub1",
					"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
					"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
			}
		}

		@Test
		void keyValueAvroTypeWithoutSchemaTypeAndCustomTypeMappingsViaProps(CapturedOutput output) {
			SpringApplication app = new SpringApplication(KeyValueAvroUserConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-2",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-2",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub2",
					"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
					"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
			}
		}

		@Test
		void keyValueAvroTypeWithSchemaTypeAndCustomTypeMappingsViaCustomizer(CapturedOutput output) {
			SpringApplication app = new SpringApplication(KeyValueAvroUserConfigCustomMappings.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=userSupplier;userLogger",
					"--spring.cloud.stream.bindings.userSupplier-out-0.destination=kv-stream-3",
					"--spring.cloud.stream.bindings.userLogger-in-0.destination=kv-stream-3",
					"--spring.cloud.stream.bindings.userSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.schema-type=KEY_VALUE",
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userSupplier-out-0.producer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.bindings.userLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.schema-type=KEY_VALUE",
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-type="
							+ User.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription.name=pbit-kv-sub3")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
			}
		}

		@Test
		void keyValueJsonTypeWithoutSchemaTypeAndWithoutCustomTypeMappings(CapturedOutput output) {
			SpringApplication app = new SpringApplication(KeyValueJsonFooConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=fooSupplier;fooLogger",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.destination=kv-stream-4",
					"--spring.cloud.stream.bindings.fooLogger-in-0.destination=kv-stream-4",
					"--spring.cloud.stream.bindings.fooSupplier-out-0.producer.use-native-encoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooSupplier-out-0.producer.message-type="
							+ Foo.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.fooSupplier-out-0.producer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.bindings.fooLogger-in-0.consumer.use-native-decoding=true",
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-type=" + Foo.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.message-key-type="
							+ String.class.getName(),
					"--spring.cloud.stream.pulsar.bindings.fooLogger-in-0.consumer.subscription.name=pbit-kv-sub4")) {
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: 5150->Foo[value=5150]"));
			}
		}

	}

	@Nested
	class CustomMessageHeaders {

		@Test
		void headersPropagatedSendAndReceive(CapturedOutput output) {
			SpringApplication app = new SpringApplication(CustomSimpleHeadersConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=springMessageSupplier;springMessageLogger",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-1",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-1",
					"--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh1-sub1")) {
				// Wait for a few of the messages to flow through (check for index = 5)
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until(
						() -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: 5150-5"));
			}
		}

		@Test
		void complexHeadersAreEncodedAndPropagated(CapturedOutput output) {
			SpringApplication app = new SpringApplication(CustomComplexHeadersConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=springMessageSupplier;springMessageLogger",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-2",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-2",
					"--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh2-sub1")) {
				// Wait for a few of the messages to flow through (check for index = 5)
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until(() -> output.toString()
						.contains("Hello binder: test-headers-msg-5 w/ custom-id: FooHeader[value=5150-5]"));
			}
		}

		@Test
		void producerHeaderModeNone(CapturedOutput output) {
			SpringApplication app = new SpringApplication(CustomComplexHeadersConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=springMessageSupplier;springMessageLogger",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-3",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.producer.header-mode=none",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-3",
					"--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh3-sub1")) {
				// Wait for a few of the messages to flow through (check for index = 5)
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: null"));
			}
		}

		@Test
		void consumerHeaderModeNone(CapturedOutput output) {
			SpringApplication app = new SpringApplication(CustomComplexHeadersConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=springMessageSupplier;springMessageLogger",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-4",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-4",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.consumer.header-mode=none",
					"--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh4-sub1")) {
				// Wait for a few of the messages to flow through (check for index = 5)
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
						.until(() -> output.toString().contains("Hello binder: test-headers-msg-5 w/ custom-id: null"));
			}
		}

		@Test
		void customHeaderMapperRespected(CapturedOutput output) {
			SpringApplication app = new SpringApplication(CustomHeaderMapperConfig.class);
			app.setWebApplicationType(WebApplicationType.NONE);
			try (ConfigurableApplicationContext ignored = app.run(
					"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
					"--spring.pulsar.admin.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
					"--spring.cloud.function.definition=springMessageSupplier;springMessageLogger",
					"--spring.cloud.stream.bindings.springMessageSupplier-out-0.destination=cmh-5",
					"--spring.cloud.stream.bindings.springMessageLogger-in-0.destination=cmh-5",
					"--spring.cloud.stream.pulsar.bindings.springMessageLogger-in-0.consumer.subscription.name=pbit-cmh5-sub1")) {
				// Wait for a few of the messages to flow through (check for index = 5)
				Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)).until(() -> output.toString()
						.contains("Hello binder: test-headers-msg-5 w/ custom-id: tsh->tph->FooHeader[value=5150-5]"));
			}
		}

		@EnableAutoConfiguration
		@SpringBootConfiguration
		static class CustomSimpleHeadersConfig {

			private final Logger logger = LoggerFactory.getLogger(getClass());

			private int msgCount = 0;

			@Bean
			public Supplier<Message<String>> springMessageSupplier() {
				return () -> {
					msgCount++;
					return MessageBuilder.withPayload("test-headers-msg-" + msgCount)
							.setHeader("custom-id", "5150-" + msgCount).build();
				};
			}

			@Bean
			public Consumer<Message<String>> springMessageLogger() {
				return s -> this.logger.info("Hello binder: {} w/ custom-id: {}", s.getPayload(),
						s.getHeaders().get("custom-id"));
			}

		}

		@EnableAutoConfiguration
		@SpringBootConfiguration
		static class CustomComplexHeadersConfig {

			private final Logger logger = LoggerFactory.getLogger(getClass());

			private int msgCount = 0;

			@Bean
			public Supplier<Message<String>> springMessageSupplier() {
				return () -> {
					msgCount++;
					return MessageBuilder.withPayload("test-headers-msg-" + msgCount)
							.setHeader("custom-id", new FooHeader("5150-" + msgCount)).build();
				};
			}

			@Bean
			public Consumer<Message<String>> springMessageLogger() {
				return s -> {
					var header = s.getHeaders().get("custom-id");
					if (header != null) {
						assertThat(header).isInstanceOf(FooHeader.class);
					}
					this.logger.info("Hello binder: {} w/ custom-id: {}", s.getPayload(), header);
				};
			}

			record FooHeader(String value) {
			}

		}

		@EnableAutoConfiguration
		@SpringBootConfiguration
		static class CustomHeaderMapperConfig {

			private final Logger logger = LoggerFactory.getLogger(getClass());

			private int msgCount = 0;

			@Bean
			public PulsarHeaderMapper extendedToStringHeaderMapper() {
				return new ToStringPulsarHeaderMapper(List.of("custom-id"), List.of("foo", "custom-id")) {
					@Override
					public Map<String, String> toPulsarHeaders(MessageHeaders springHeaders) {
						Map<String, String> pulsarHeaders = super.toPulsarHeaders(springHeaders);
						// foo and custom-id are allowed and expected
						assertThat(pulsarHeaders).containsKeys("foo", "custom-id");
						return pulsarHeaders;
					}

					@Override
					public MessageHeaders toSpringHeaders(org.apache.pulsar.client.api.Message<?> pulsarMessage) {
						MessageHeaders springHeaders = super.toSpringHeaders(pulsarMessage);
						// foo not allowed, custom-id allowed
						assertThat(springHeaders).doesNotContainKey("foo").containsKey("custom-id");
						return springHeaders;
					}

					@Override
					protected String toPulsarHeaderValue(String name, Object value, Object context) {
						return "tph->" + super.toPulsarHeaderValue(name, value, context);
					}

					@Override
					protected Object toSpringHeaderValue(String headerName, String rawHeader, Object context) {
						return "tsh->" + super.toSpringHeaderValue(headerName, rawHeader, context);
					}
				};
			}

			@Bean
			public Supplier<Message<String>> springMessageSupplier() {
				return () -> {
					msgCount++;
					return MessageBuilder.withPayload("test-headers-msg-" + msgCount)
							.setHeader("foo", "bar-" + msgCount)
							.setHeader("custom-id", new FooHeader("5150-" + msgCount)).build();
				};
			}

			@Bean
			public Consumer<Message<String>> springMessageLogger() {
				return s -> {
					var header = s.getHeaders().get("custom-id");
					if (header != null) {
						assertThat(header).isInstanceOf(String.class);
					}
					var fooHeader = s.getHeaders().get("foo");
					assertThat(fooHeader).isNull();
					this.logger.info("Hello binder: {} w/ custom-id: {}", s.getPayload(), header);
				};
			}

			record FooHeader(String value) {
			}

		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class PrimitiveTextConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<String> textSupplier() {
			return () -> "test-basic-scenario";
		}

		@Bean
		public Consumer<String> textLogger() {
			return s -> this.logger.info("Hello binder: " + s);
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	@Import(PrimitiveTextConfig.class)
	static class BinderAndBindingPropsTestConfig {

		@Bean
		TrackingProducerFactoryBeanPostProcessor trackingProducerFactory(PulsarClient pulsarClient) {
			return new TrackingProducerFactoryBeanPostProcessor(pulsarClient);
		}

		@Bean
		TrackingConsumerFactoryBeanPostProcessor trackingConsumerFactory() {
			return new TrackingConsumerFactoryBeanPostProcessor();
		}

	}

	static class TrackingProducerFactoryBeanPostProcessor implements BeanPostProcessor {

		private final PulsarClient pulsarClient;

		TrackingProducerFactoryBeanPostProcessor(PulsarClient pulsarClient) {
			this.pulsarClient = pulsarClient;
		}

		@Override
		public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
			if (bean instanceof DefaultPulsarProducerFactory defaultFactory) {
				return new TrackingProducerFactory(defaultFactory, this.pulsarClient);
			}
			return bean;
		}

	}

	static class TrackingProducerFactory implements PulsarProducerFactory<String> {

		private final DefaultPulsarProducerFactory<String> trackedProducerFactory;

		private final PulsarClient pulsarClient;

		List<Producer<String>> producersCreated = new ArrayList<>();

		TrackingProducerFactory(DefaultPulsarProducerFactory<String> trackedProducerFactory, PulsarClient pulsarClient) {
			this.trackedProducerFactory = trackedProducerFactory;
			this.pulsarClient = pulsarClient;
		}

		// This is required in PulsarProducerFactory in Spring Pulsar 1.1.x (i.e. Spring Boot 3.3.x)
		public PulsarClient getPulsarClient() {
			return this.pulsarClient;
		}

		@Override
		public Producer<String> createProducer(Schema<String> schema, String topic) {
			Producer<String> producer = null;
			try {
				producer = this.trackedProducerFactory.createProducer(schema, topic);
			}
			catch (Exception e) {
				// pass through
			}
			this.producersCreated.add(producer);
			return producer;
		}

		@Override
		public Producer<String> createProducer(Schema<String> schema, String topic,
				ProducerBuilderCustomizer<String> customizer) {
			Producer<String> producer = null;
			try {
				producer = this.trackedProducerFactory.createProducer(schema, topic, customizer);
			}
			catch (Exception e) {
				// pass through
			}
			this.producersCreated.add(producer);
			return producer;
		}

		@Override
		public Producer<String> createProducer(Schema<String> schema, String topic, Collection<String> encryptionKeys,
				List<ProducerBuilderCustomizer<String>> producerBuilderCustomizers) {
			Producer<String> producer = null;
			try {
				producer = this.trackedProducerFactory.createProducer(schema, topic, encryptionKeys,
						producerBuilderCustomizers);
			}
			catch (Exception e) {
				// pass through
			}
			this.producersCreated.add(producer);
			return producer;
		}

		@Override
		public String getDefaultTopic() {
			return this.trackedProducerFactory.getDefaultTopic();
		}

	}

	static class TrackingConsumerFactoryBeanPostProcessor implements BeanPostProcessor {

		@Override
		public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
			if (bean instanceof DefaultPulsarConsumerFactory defaultFactory) {
				return new TrackingConsumerFactory(defaultFactory);
			}
			return bean;
		}

	}

	static class TrackingConsumerFactory implements PulsarConsumerFactory<String> {

		private final DefaultPulsarConsumerFactory<String> trackedConsumerFactory;

		List<org.apache.pulsar.client.api.Consumer<String>> consumersCreated = new ArrayList<>();

		TrackingConsumerFactory(DefaultPulsarConsumerFactory<String> trackedConsumerFactory) {
			this.trackedConsumerFactory = trackedConsumerFactory;
		}

		@Override
		public org.apache.pulsar.client.api.Consumer<String> createConsumer(Schema<String> schema,
				Collection<String> topics, String subscriptionName, ConsumerBuilderCustomizer<String> customizer) {
			org.apache.pulsar.client.api.Consumer<String> consumer = null;
			try {
				consumer = this.trackedConsumerFactory.createConsumer(schema, topics, subscriptionName, customizer);
			}
			catch (Exception e) {
				// pass through
			}
			this.consumersCreated.add(consumer);
			return consumer;
		}

		@Override
		public org.apache.pulsar.client.api.Consumer<String> createConsumer(Schema<String> schema,
				Collection<String> topics, String subscriptionName, Map<String, String> metadataProperties,
				List<ConsumerBuilderCustomizer<String>> consumerBuilderCustomizers) {
			org.apache.pulsar.client.api.Consumer<String> consumer = null;
			try {
				consumer = this.trackedConsumerFactory.createConsumer(schema, topics, subscriptionName,
						metadataProperties, consumerBuilderCustomizers);
			}
			catch (Exception e) {
				// pass through
			}
			this.consumersCreated.add(consumer);
			return consumer;
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class PrimitiveFloatConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<Float> piSupplier() {
			return () -> 3.14f;
		}

		@Bean
		public Consumer<Float> piLogger() {
			return f -> this.logger.info("Hello binder: " + f);
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class JsonFooConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<Foo> fooSupplier() {
			return () -> new Foo("5150");
		}

		@Bean
		public Consumer<Foo> fooLogger() {
			return f -> this.logger.info("Hello binder: " + f);
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	@Import(JsonFooConfig.class)
	static class JsonFooWithCustomMappingConfig {

		@Bean
		public SchemaResolverCustomizer<DefaultSchemaResolver> customMappings() {
			return (resolver) -> resolver.addCustomSchemaMapping(Foo.class, JSONSchema.of(Foo.class));
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class AvroUserConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<User> userSupplier() {
			return () -> new User("user21", 21);
		}

		@Bean
		public Consumer<User> userLogger() {
			return f -> this.logger.info("Hello binder: " + f);
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	@Import(AvroUserConfig.class)
	static class AvroUserConfigCustomMappings {

		@Bean
		public SchemaResolverCustomizer<DefaultSchemaResolver> customMappings() {
			return (resolver) -> resolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class KeyValueAvroUserConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<KeyValue<String, User>> userSupplier() {
			return () -> new KeyValue<>("21", new User("user21", 21));
		}

		@Bean
		public Consumer<KeyValue<String, User>> userLogger() {
			return f -> this.logger.info("Hello binder: " + f.getKey() + "->" + f.getValue());
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	@Import(KeyValueAvroUserConfig.class)
	static class KeyValueAvroUserConfigCustomMappings {

		@Bean
		public SchemaResolverCustomizer<DefaultSchemaResolver> customMappings() {
			return (resolver) -> resolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		}

	}

	@EnableAutoConfiguration
	@SpringBootConfiguration
	static class KeyValueJsonFooConfig {

		private final Logger logger = LoggerFactory.getLogger(getClass());

		@Bean
		public Supplier<KeyValue<String, Foo>> fooSupplier() {
			return () -> new KeyValue<>("5150", new Foo("5150"));
		}

		@Bean
		public Consumer<KeyValue<String, Foo>> fooLogger() {
			return f -> this.logger.info("Hello binder: " + f.getKey() + "->" + f.getValue());
		}

	}

	record Foo(String value) {
	}

	/**
	 * Do not convert this to a Record as Avro does not seem to work well w/ records.
	 */
	static class User {

		private String name;

		private int age;

		User() {
		}

		User(String name, int age) {
			this.name = name;
			this.age = age;
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public int getAge() {
			return age;
		}

		public void setAge(int age) {
			this.age = age;
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			if (o == null || getClass() != o.getClass()) {
				return false;
			}
			User user = (User) o;
			return age == user.age && Objects.equals(name, user.name);
		}

		@Override
		public int hashCode() {
			return Objects.hash(name, age);
		}

		@Override
		public String toString() {
			return "User{" + "name='" + name + '\'' + ", age=" + age + '}';
		}

	}

}