MessagingSleuthOperators.java
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.messaging.Message;
/**
* Messaging helpers to manually parse and inject spans. We're treating message headers as
* a context that gets passed through.
*
* IMPORTANT: This API is experimental and might change in the future.
*
* The {@code forInputMessage} factory methods will retrieve the tracer context from the
* message headers and set up a a child span in the header under key {@code Span.class}
* name. If you need to continue it or tag it, it's enough to retrieve it from the
* headers.
*
* The first messaging span (the one that was first found in the input message) is present
* under the {@code traceHandlerParentSpan} header key.
*
* @author Marcin Grzejszczak
* @since 3.0.0
*/
public final class MessagingSleuthOperators {
private static final Log log = LogFactory.getLog(MessagingSleuthOperators.class);
private MessagingSleuthOperators() {
throw new IllegalStateException("You can't instantiate a utility class");
}
/**
* Executes a span wrapped operation for an input message.
* @param beanFactory - bean factory
* @param message - message to wrap
* @param withSpanInScope - an operation that will be wrapped in a span and the span
* will be reported at the end
* @param <T> - type of payload
* @return message with tracer context
*/
public static <T> Message<T> forInputMessage(BeanFactory beanFactory, Message<T> message,
Consumer<Message<T>> withSpanInScope) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
MessageAndSpans wrappedInputMessage = traceMessageHandler.wrapInputMessage(message, "");
if (log.isDebugEnabled()) {
log.debug("Wrapped input msg " + wrappedInputMessage);
}
Throwable t = null;
try (Tracer.SpanInScope ws = traceMessageHandler.tracer.withSpan(wrappedInputMessage.childSpan.start())) {
withSpanInScope.accept(wrappedInputMessage.msg);
}
catch (Exception e) {
t = e;
throw e;
}
finally {
traceMessageHandler.afterMessageHandled(wrappedInputMessage.childSpan, t);
}
return wrappedInputMessage.msg;
}
/**
* Processes the input message and returns a message with a header containing a span.
* @param beanFactory - bean factory
* @param message - input message to process
* @param <T> - payload type
* @return message with tracer context
*/
public static <T> Message<T> forInputMessage(BeanFactory beanFactory, Message<T> message) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
MessageAndSpans wrappedInputMessage = traceMessageHandler.wrapInputMessage(message, "");
if (log.isDebugEnabled()) {
log.debug("Wrapped input msg " + wrappedInputMessage);
}
return wrappedInputMessage.msg;
}
/**
* Function converting an input message to a message with tracer headers.
* @param beanFactory - bean factory
* @param inputMessage - input message to process
* @param <T> input message type
* @return function representation of input message with tracer context
*/
public static <T> Function<Message<T>, Message<T>> asFunction(BeanFactory beanFactory, Message<T> inputMessage) {
return stringMessage -> MessagingSleuthOperators.forInputMessage(beanFactory, inputMessage);
}
/**
* Retrieves tracer information from message headers.
* @param beanFactory - bean factory
* @param message - message to process
* @param <T> - payload type
* @return span retrieved from message or {@code null} if there was no span
*/
public static <T> Span spanFromMessage(BeanFactory beanFactory, Message<T> message) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
return spanFromMessage(traceMessageHandler, message);
}
private static <T> Span spanFromMessage(TraceMessageHandler traceMessageHandler, Message<T> message) {
Span span = traceMessageHandler.spanFromMessage(message);
if (log.isDebugEnabled()) {
log.debug("Found the following span in message " + span);
}
return span;
}
/**
* Retrieves tracer information from message headers and applies the operation.
* @param beanFactory - bean factory
* @param message - message to process
* @param withSpanInScope - an operation that will be wrapped in a span but will not
* be reported
* @param <T> - payload type
*/
public static <T> void withSpanInScope(BeanFactory beanFactory, Message<T> message,
Consumer<Message<T>> withSpanInScope) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
Span span = spanFromMessage(traceMessageHandler, message);
try (Tracer.SpanInScope ws = traceMessageHandler.tracer.withSpan(span)) {
withSpanInScope.accept(message);
}
}
/**
* Retrieves tracer information from message headers and applies the operation.
* @param beanFactory - bean factory
* @param message - message to process
* @param withSpanInScope - an operation that will be wrapped in a span but will not
* be reported
* @param <T> - payload type
* @return a message with tracer headers.
*/
public static <T> Message<T> withSpanInScope(BeanFactory beanFactory, Message<T> message,
Function<Message<T>, Message<T>> withSpanInScope) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
Span span = spanFromMessage(traceMessageHandler, message);
try (Tracer.SpanInScope ws = traceMessageHandler.tracer.withSpan(span)) {
return withSpanInScope.apply(message);
}
}
/**
* Creates an output message with tracer headers and reports the corresponding
* producer span. If the message contains a header called {@code destination} it will
* be used to tag the span with destination name.
* @param beanFactory - bean factory
* @param message - message to which tracer headers should be injected
* @param <T> - message payload
* @return instrumented message
*/
public static <T> Message<T> handleOutputMessage(BeanFactory beanFactory, Message<T> message) {
return handleOutputMessage(beanFactory, message, null);
}
/**
* Creates an output message with tracer headers and reports the corresponding
* producer span. If the message contains a header called {@code destination} it will
* be used to tag the span with destination name.
* @param beanFactory - bean factory
* @param message - message to which tracer headers should be injected
* @param throwable - exception that took place while processing the message
* @param <T> - message payload
* @return instrumented message
*/
public static <T> Message<T> handleOutputMessage(BeanFactory beanFactory, Message<T> message, Throwable throwable) {
return handleOutputMessage(beanFactory, message, span -> {
}, throwable);
}
/**
* Creates an output message with tracer headers and reports the corresponding
* producer span. If the message contains a header called {@code destination} it will
* be used to tag the span with destination name.
* @param beanFactory - bean factory
* @param message - message to which tracer headers should be injected
* @param spanCustomizer - customizer of the output span
* @param throwable - exception that took place while processing the message
* @param <T> - message payload
* @return instrumented message
*/
public static <T> Message<T> handleOutputMessage(BeanFactory beanFactory, Message<T> message,
Consumer<Span> spanCustomizer, Throwable throwable) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
Span span = traceMessageHandler.parentSpan(message);
span = span != null ? span : traceMessageHandler.consumerSpan(message);
if (span == null) {
log.warn(
"Can't find neither parent nor consumer span. Will return the message with no tracer header changes");
return message;
}
MessageAndSpan messageAndSpan = traceMessageHandler.wrapOutputMessage(message, span,
String.valueOf(message.getHeaders().getOrDefault("destination", "")));
spanCustomizer.accept(messageAndSpan.span);
traceMessageHandler.afterMessageHandled(messageAndSpan.span, throwable);
return messageAndSpan.msg;
}
/**
* Reports the span stored in the message.
* @param beanFactory - bean factory
* @param message - message with tracer context
* @param ex - potential exception that took place while processing
* @param <T> - message payload
* @return instrumented message
*/
public static <T> Message<T> afterMessageHandled(BeanFactory beanFactory, Message<T> message, Throwable ex) {
TraceMessageHandler traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(beanFactory);
Span span = traceMessageHandler.spanFromMessage(message);
traceMessageHandler.afterMessageHandled(span, ex);
return message;
}
}