RabbitMultiBinderObservationTests.java

/*
 * Copyright 2015-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.rabbit.integration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import brave.handler.SpanHandler;
import brave.test.TestSpanHandler;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
import io.micrometer.tracing.test.simple.SpansAssert;
import org.junit.jupiter.api.Test;
import org.springframework.boot.micrometer.tracing.test.autoconfigure.AutoConfigureTracing;
import org.testcontainers.containers.RabbitMQContainer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.rabbit.RabbitTestContainer;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
 * @author Artem Bilan
 * @since 4.1.1
 */
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
		args = "--spring.config.location=classpath:/rabbit-multi-binder-observation.yml")
@DirtiesContext
@AutoConfigureTracing
public class RabbitMultiBinderObservationTests {

	private static final TestSpanHandler SPANS = new TestSpanHandler();

	private static final RabbitMQContainer RABBITMQ = RabbitTestContainer.sharedInstance();

	@Autowired
	StreamBridge streamBridge;

	@Autowired
	ObservationRegistry observationRegistry;

	@Autowired
	TestConfiguration testConfiguration;

	@DynamicPropertySource
	static void rabbitProperties(DynamicPropertyRegistry registry) {
		registry.add("spring.rabbitmq.port", RABBITMQ::getAmqpPort);
	}

	@Test
	void observationIsPropagatedInMultiBinderConfiguration() throws InterruptedException {
		Observation.createNotStarted("test parent observation", this.observationRegistry)
			.observe(() -> this.streamBridge.send("test-out-0", "test data"));

		assertThat(this.testConfiguration.messageReceived.await(10, TimeUnit.SECONDS)).isTrue();

		// There is a race condition when we already have a reply, but the span in the
		// Rabbit listener is not closed yet.
		// parent -> StreamBridge -> AmqpOutboundEndpoint -> RabbitTemplate -> Rabbit Listener -> Consumer
		await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(6));
		SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).toList())
			.haveSameTraceId();
	}

	@SpringBootConfiguration
	@EnableAutoConfiguration
	public static class TestConfiguration {

		final CountDownLatch messageReceived = new CountDownLatch(1);

		@Bean
		SpanHandler testSpanHandler() {
			return SPANS;
		}

		@Bean
		public Consumer<Message<?>> testListener() {
			return message -> this.messageReceived.countDown();
		}

	}

}