ReactorSleuth.java

/*
 * Copyright 2013-2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.sleuth.instrument.reactor;

import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceContext;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.internal.LazyBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;

import static org.springframework.cloud.sleuth.instrument.reactor.ReactorHooksHelper.named;

/**
 * Reactive Span pointcuts factories.
 *
 * @author Stephane Maldini
 * @author Roman Matiushchenko
 * @since 2.0.0
 */
// TODO: this is public as it is used out of package, but unlikely intended to be
// non-internal
public abstract class ReactorSleuth {

	private static final Log log = LogFactory.getLog(ReactorSleuth.class);

	private static final String PENDING_SPAN_KEY = "sleuth.pending-span";

	private ReactorSleuth() {
	}

	/**
	 * Function that does additional wrapping of the Reactor context.
	 */
	public static Function<Context, Context> contextWrappingFunction = Function.identity();

	/**
	 * Return a span operator pointcut given a Tracing. This can be used in reactor via
	 * {@link reactor.core.publisher.Flux#transform(Function)},
	 * {@link reactor.core.publisher.Mono#transform(Function)},
	 * {@link reactor.core.publisher.Hooks#onLastOperator(Function)} or
	 * {@link reactor.core.publisher.Hooks#onLastOperator(Function)}. The Span operator
	 * pointcut will pass the Scope of the Span without ever creating any new spans.
	 * @param springContext the Spring context.
	 * @param <T> an arbitrary type that is left unchanged by the span operator
	 * @return a new lazy span operator pointcut
	 */
	// Much of Boot assumes that the Spring context will be a
	// ConfigurableApplicationContext, rooted in SpringApplication's
	// requirement for it to be so. Previous versions of Reactor
	// instrumentation injected both BeanFactory and also
	// ConfigurableApplicationContext. This chooses the more narrow
	// signature as it is simpler than explaining instanceof checks.
	public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> scopePassingSpanOperator(
			ConfigurableApplicationContext springContext) {
		if (log.isTraceEnabled()) {
			log.trace("Scope passing operator [" + springContext + "]");
		}

		// keep a reference outside the lambda so that any caching will be visible to
		// all publishers
		LazyBean<CurrentTraceContext> lazyCurrentTraceContext = LazyBean.create(springContext,
				CurrentTraceContext.class);

		LazyBean<Tracer> lazyTracer = LazyBean.create(springContext, Tracer.class);

		return Operators.liftPublisher(p -> !(p instanceof Fuseable.ScalarCallable),
				(BiFunction) liftFunction(springContext, lazyCurrentTraceContext, lazyTracer));
	}

	/**
	 * Creates scope passing span operator which applies only to not
	 * {@code Scannable.Attr.RunStyle.SYNC} {@code Publisher}s. Used by
	 * {@code InstrumentationType#DECORATE_ON_EACH}
	 * @param springContext the Spring context.
	 * @param <T> an arbitrary type that is left unchanged by the span operator.
	 * @return operator to apply to {@link Hooks#onEachOperator(Function)}.
	 */
	public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> onEachOperatorForOnEachInstrumentation(
			ConfigurableApplicationContext springContext) {
		if (log.isTraceEnabled()) {
			log.trace("Scope passing operator [" + springContext + "]");
		}

		// keep a reference outside the lambda so that any caching will be visible to
		// all publishers
		LazyBean<CurrentTraceContext> lazyCurrentTraceContext = LazyBean.create(springContext,
				CurrentTraceContext.class);

		LazyBean<Tracer> lazyTracer = LazyBean.create(springContext, Tracer.class);

		@SuppressWarnings("rawtypes")
		Predicate<Publisher> shouldDecorate = ReactorHooksHelper::shouldDecorate;
		@SuppressWarnings("rawtypes")
		BiFunction<Publisher, ? super CoreSubscriber<? super T>, ? extends CoreSubscriber<? super T>> lifter = liftFunction(
				springContext, lazyCurrentTraceContext, lazyTracer);

		return Operators.liftPublisher(shouldDecorate, named(ReactorHooksHelper.LIFTER_NAME, lifter));
	}

	static <O> BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> liftFunction(
			ConfigurableApplicationContext springContext, LazyBean<CurrentTraceContext> lazyCurrentTraceContext,
			LazyBean<Tracer> lazyTracer) {
		return (p, sub) -> {
			if (!springContext.isActive() || !springContext.isRunning()) {
				if (log.isTraceEnabled()) {
					String message = "Spring Context [" + springContext
							+ "] is not yet refreshed. This is unexpected. Reactor Context is [" + context(sub)
							+ "] and name is [" + name(sub) + "]";
					log.trace(message);
				}
				return sub;
			}

			Context context = context(sub);

			if (log.isTraceEnabled()) {
				log.trace("Spring context [" + springContext + "], Reactor context [" + context + "], name ["
						+ name(sub) + "]");
			}

			// Try to get the current trace context bean, lenient when there are problems
			CurrentTraceContext currentTraceContext = lazyCurrentTraceContext.get();
			if (currentTraceContext == null) {
				if (log.isTraceEnabled()) {
					String message = "Spring Context [" + springContext
							+ "] did not return a CurrentTraceContext. Reactor Context is [" + context
							+ "] and name is [" + name(sub) + "]";
					log.trace(message);
				}
				return sub;
			}

			TraceContext parent = traceContext(context, currentTraceContext);
			if (parent == null) {
				return sub; // no need to scope a null parent
			}

			// Handle scenarios such as Mono.defer
			if (sub instanceof ScopePassingSpanSubscriber) {
				ScopePassingSpanSubscriber<?> scopePassing = (ScopePassingSpanSubscriber<?>) sub;
				if (scopePassing.parent.equals(parent)) {
					return sub; // don't double-wrap
				}
			}

			context = contextWithBeans(context, lazyTracer, lazyCurrentTraceContext);
			if (log.isTraceEnabled()) {
				log.trace("Spring context [" + springContext + "], Reactor context [" + context + "], name ["
						+ name(sub) + "]");
			}

			if (log.isTraceEnabled()) {
				log.trace("Creating a scope passing span subscriber with Reactor Context " + "[" + context
						+ "] and name [" + name(sub) + "]");
			}

			return new ScopePassingSpanSubscriber<>(sub, context, currentTraceContext, parent);
		};
	}

	private static <T> Context contextWithBeans(Context context, LazyBean<Tracer> tracer,
			LazyBean<CurrentTraceContext> currentTraceContext) {
		if (!context.hasKey(Tracer.class)) {
			context = context.put(Tracer.class, tracer.getOrError());
		}
		if (!context.hasKey(CurrentTraceContext.class)) {
			context = context.put(CurrentTraceContext.class, currentTraceContext.getOrError());
		}
		return context;
	}

	/**
	 * Creates a context with beans in it.
	 * @param springContext spring context
	 * @param <T> an arbitrary type that is left unchanged by the span operator
	 * @return a new operator pointcut that has beans in the context
	 */
	public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> springContextSpanOperator(
			ConfigurableApplicationContext springContext) {
		if (log.isTraceEnabled()) {
			log.trace("Spring Context passing operator [" + springContext + "]");
		}

		LazyBean<Tracer> lazyTracer = LazyBean.create(springContext, Tracer.class);
		LazyBean<CurrentTraceContext> lazyCurrentTraceContext = LazyBean.create(springContext,
				CurrentTraceContext.class);

		return Operators.liftPublisher(p -> {
			// We don't scope scalar results as they happen in an instant. This prevents
			// excessive overhead when using Flux/Mono #just, #empty, #error, etc.
			return !(p instanceof Fuseable.ScalarCallable) && springContext.isActive();
		}, (p, sub) -> {
			Context ctxBefore = context(sub);
			Context context = contextWithBeans(ctxBefore, lazyTracer, lazyCurrentTraceContext);
			if (context == ctxBefore) {
				return sub;
			}
			return new SleuthContextOperator<>(context, sub);
		});
	}

	/**
	 * Creates tracing context capturing reactor operator. Used by
	 * {@code InstrumentationType#DECORATE_ON_EACH}.
	 * @param springContext the Spring context.
	 * @param <T> an arbitrary type that is left unchanged by the span operator.
	 * @return operator to apply to {@link Hooks#onLastOperator(Function)} for
	 * {@code InstrumentationType#DECORATE_ON_EACH}
	 */
	public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> onLastOperatorForOnEachInstrumentation(
			ConfigurableApplicationContext springContext) {
		LazyBean<CurrentTraceContext> lazyCurrentTraceContext = LazyBean.create(springContext,
				CurrentTraceContext.class);
		LazyBean<Tracer> lazyTracer = LazyBean.create(springContext, Tracer.class);

		BiFunction<Publisher, ? super CoreSubscriber<? super T>, ? extends CoreSubscriber<? super T>> scopePassingSpanSubscriber = liftFunction(
				springContext, lazyCurrentTraceContext, lazyTracer);

		BiFunction<Publisher, ? super CoreSubscriber<? super T>, ? extends CoreSubscriber<? super T>> skipIfNoTraceCtx = (
				pub, sub) -> {
			// lazyCurrentTraceContext.get() is not null here. see predicate bellow
			TraceContext traceContext = lazyCurrentTraceContext.get().context();
			if (context(sub).getOrDefault(TraceContext.class, null) == traceContext) {
				return sub;
			}
			return scopePassingSpanSubscriber.apply(pub, sub);
		};

		return Operators.liftPublisher(p -> {
			/*
			 * this prevent double decoration when last operator in the chain is not SYNC
			 * like {@code Mono.fromSuppler(() -> ...).subscribeOn(Schedulers.parallel())}
			 */
			if (ReactorHooksHelper.isTraceContextPropagator(p)) {
				return false;
			}
			boolean addContext = !(p instanceof Fuseable.ScalarCallable) && springContext.isActive();
			if (addContext) {
				CurrentTraceContext currentTraceContext = lazyCurrentTraceContext.get();
				if (currentTraceContext != null) {
					addContext = currentTraceContext.context() != null;
				}
			}
			return addContext;
		}, named(ReactorHooksHelper.LIFTER_NAME, skipIfNoTraceCtx));
	}

	private static <T> Context context(CoreSubscriber<? super T> sub) {
		try {
			return sub.currentContext();
		}
		catch (Exception ex) {
			if (log.isDebugEnabled()) {
				log.debug("Exception occurred while trying to retrieve the context", ex);
			}
		}
		return Context.empty();
	}

	static String name(CoreSubscriber<?> sub) {
		return Scannable.from(sub).name();
	}

	/**
	 * Like {@link CurrentTraceContext#context()}, except it first checks the reactor
	 * context.
	 */
	static TraceContext traceContext(Context context, CurrentTraceContext fallback) {
		if (context.hasKey(TraceContext.class)) {
			return context.get(TraceContext.class);
		}
		return fallback.context();
	}

	public static Function<Runnable, Runnable> scopePassingOnScheduleHook(
			ConfigurableApplicationContext springContext) {
		LazyBean<CurrentTraceContext> lazyCurrentTraceContext = LazyBean.create(springContext,
				CurrentTraceContext.class);
		return delegate -> {
			if (springContext.isActive()) {
				final CurrentTraceContext currentTraceContext = lazyCurrentTraceContext.get();
				if (currentTraceContext == null) {
					return delegate;
				}
				final TraceContext traceContext = currentTraceContext.context();
				return () -> {
					try (CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(traceContext)) {
						delegate.run();
					}
					// extra step to ensure context is cleared when publishOn or similar
					// operators leaks different context and leaves it uncleared
					currentTraceContext.maybeScope(null);
				};
			}
			return delegate;
		};
	}

	/**
	 * Wraps the given Mono in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Mono} to be wrapped in tracing
	 * @param <T> - type returned by the Mono
	 * @param spanCustomizer - customizer for the child span
	 * @return traced Mono
	 */
	public static <T> Mono<T> tracedMono(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Mono<T>> supplier,
			@NonNull BiConsumer<T, Span> spanCustomizer) {
		return runMonoSupplierInScope(supplier, spanCustomizer).contextWrite(
				context -> ReactorSleuth.enhanceContext(tracer, currentTraceContext, context, childSpanName));
	}

	/**
	 * Wraps the given Mono in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Mono} to be wrapped in tracing
	 * @param <T> - type returned by the Mono
	 * @param spanCustomizer - customizer for the child span
	 * @param spanFunction - function that creates a new or child span
	 * @return traced Mono
	 */
	public static <T> Mono<T> tracedMono(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Mono<T>> supplier,
			@NonNull BiConsumer<T, Span> spanCustomizer, @NonNull Function<Span, Span> spanFunction) {
		return runMonoSupplierInScope(supplier, spanCustomizer).contextWrite(context -> ReactorSleuth
				.enhanceContext(tracer, currentTraceContext, context, childSpanName, spanFunction));
	}

	private static <T> Mono<T> runMonoSupplierInScope(Supplier<Mono<T>> supplier, BiConsumer<T, Span> spanCustomizer) {
		return Mono.deferContextual(contextView -> {
			Span span = contextView.get(Span.class);
			Tracer.SpanInScope scope = contextView.get(Tracer.SpanInScope.class);
			// @formatter:off
			return supplier.get()
					.map(t -> {
						spanCustomizer.accept(t, span);
						return t;
					})
					// TODO: Fix me when this is resolved in Reactor
//					.doOnSubscribe(__ -> scope.close())
					.doOnError(span::error)
					.doFinally(signalType -> {
						span.end();
						scope.close();
					});
			// @formatter:on
		});
	}

	/**
	 * Wraps the given Mono in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Mono} to be wrapped in tracing
	 * @param <T> - type returned by the Mono
	 * @return traced Mono
	 */
	public static <T> Mono<T> tracedMono(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Mono<T>> supplier) {
		return tracedMono(tracer, currentTraceContext, childSpanName, supplier, (o, span) -> {
		});
	}

	/**
	 * Wraps the given Mono in a trace representation. Puts the provided span to context.
	 * @param tracer - Tracer bean
	 * @param span - span to put in context
	 * @param supplier - supplier of a {@link Mono} to be wrapped in tracing
	 * @param <T> - type returned by the Mono
	 * @return traced Mono
	 */
	public static <T> Mono<T> tracedMono(@NonNull Tracer tracer, @NonNull Span span,
			@NonNull Supplier<Mono<T>> supplier) {
		return runMonoSupplierInScope(supplier, (o, span1) -> {
		}).contextWrite(context -> ReactorSleuth.putSpanInScope(tracer, context, span));
	}

	/**
	 * Wraps the given Flux in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Flux} to be wrapped in tracing
	 * @param <T> - type returned by the Flux
	 * @param spanCustomizer - customizer for the child span
	 * @return traced Flux
	 */
	public static <T> Flux<T> tracedFlux(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Flux<T>> supplier,
			@NonNull BiConsumer<T, Span> spanCustomizer) {
		return runFluxSupplierInScope(supplier, spanCustomizer).contextWrite(
				context -> ReactorSleuth.enhanceContext(tracer, currentTraceContext, context, childSpanName));
	}

	/**
	 * Wraps the given Flux in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Flux} to be wrapped in tracing
	 * @param <T> - type returned by the Flux
	 * @param spanCustomizer - customizer for the child span
	 * @param spanFunction - function that creates a new or child span
	 * @return traced Flux
	 */
	public static <T> Flux<T> tracedFlux(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Flux<T>> supplier,
			@NonNull BiConsumer<T, Span> spanCustomizer, @NonNull Function<Span, Span> spanFunction) {
		return runFluxSupplierInScope(supplier, spanCustomizer).contextWrite(context -> ReactorSleuth
				.enhanceContext(tracer, currentTraceContext, context, childSpanName, spanFunction));
	}

	/**
	 * Wraps the given Flux in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param span - span to put in context
	 * @param supplier - supplier of a {@link Flux} to be wrapped in tracing
	 * @param <T> - type returned by the Flux
	 * @return traced Flux
	 */
	public static <T> Flux<T> tracedFlux(@NonNull Tracer tracer, @NonNull Span span,
			@NonNull Supplier<Flux<T>> supplier) {
		return runFluxSupplierInScope(supplier, (o, span1) -> {
		}).contextWrite(context -> ReactorSleuth.putSpanInScope(tracer, context, span));
	}

	private static <T> Flux<T> runFluxSupplierInScope(Supplier<Flux<T>> supplier, BiConsumer<T, Span> spanCustomizer) {
		return Flux.deferContextual(contextView -> {
			Span span = contextView.get(Span.class);
			Tracer.SpanInScope scope = contextView.get(Tracer.SpanInScope.class);
			// @formatter:off
			return supplier.get()
					.map(t -> {
						spanCustomizer.accept(t, span);
						return t;
					})
					// TODO: Fix me when this is resolved in Reactor
//					.doOnSubscribe(__ -> scope.close())
					.doOnError(span::error)
					.doFinally(signalType -> {
						span.end();
						scope.close();
					});
			// @formatter:on
		});
	}

	/**
	 * Wraps the given Flux in a trace representation. Retrieves the span from context,
	 * creates a child span with the given name.
	 * @param tracer - Tracer bean
	 * @param currentTraceContext - CurrentTraceContext bean
	 * @param childSpanName - name of the created child span
	 * @param supplier - supplier of a {@link Flux} to be wrapped in tracing
	 * @param <T> - type returned by the Flux
	 * @return traced Flux
	 */
	public static <T> Flux<T> tracedFlux(@NonNull Tracer tracer, @NonNull CurrentTraceContext currentTraceContext,
			@NonNull String childSpanName, @NonNull Supplier<Flux<T>> supplier) {
		return tracedFlux(tracer, currentTraceContext, childSpanName, supplier, (o, span) -> {
		});
	}

	private static Span childSpanFromContext(Tracer tracer, CurrentTraceContext currentTraceContext,
			reactor.util.context.Context context, String childSpanName) {
		return childSpanFromContext(currentTraceContext, context, childSpanName,
				span -> span == null ? tracer.nextSpan() : tracer.nextSpan(span));
	}

	private static Span childSpanFromContext(CurrentTraceContext currentTraceContext,
			reactor.util.context.Context context, String childSpanName, Function<Span, Span> spanSupplier) {
		TraceContext traceContext = context.getOrDefault(TraceContext.class, null);
		Span span = context.getOrDefault(Span.class, null);
		if (traceContext == null && span == null) {
			span = spanSupplier.apply(null);
			if (log.isDebugEnabled()) {
				log.debug("There was no previous span in reactor context, created a new one [" + span + "]");
			}
		}
		else if (traceContext != null && span == null) {
			// there was a previous span - we create a child one
			try (CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(traceContext)) {
				if (log.isDebugEnabled()) {
					log.debug("Found a trace context in reactor context [" + traceContext + "]");
				}
				span = spanSupplier.apply(null);
				if (log.isDebugEnabled()) {
					log.debug("Created a child span [" + span + "]");
				}
			}
		}
		else {
			if (log.isDebugEnabled()) {
				log.debug("Found a span in reactor context [" + span + "]");
			}
			span = spanSupplier.apply(span);
			if (log.isDebugEnabled()) {
				log.debug("Created a child span [" + span + "]");
			}
		}
		return span.name(childSpanName).start();
	}

	/**
	 * Updates the Reactor context with tracing information. Creates a new span if there
	 * is no current span. Creates a child span if there was an entry in the context
	 * already.
	 * @param tracer tracer
	 * @param currentTraceContext current trace context
	 * @param context Reactor context
	 * @param childSpanName child span name when there is no span in context
	 * @param spanSupplier function that creates a new or child span
	 * @return updated Reactor context
	 */
	public static Context enhanceContext(Tracer tracer, CurrentTraceContext currentTraceContext,
			reactor.util.context.Context context, String childSpanName, Function<Span, Span> spanSupplier) {
		Span span = childSpanFromContext(currentTraceContext, context, childSpanName, spanSupplier);
		return putSpanInScope(tracer, context, span);
	}

	/**
	 * Updates the Reactor context with tracing information. Creates a new span if there
	 * is no current span. Creates a child span if there was an entry in the context
	 * already.
	 * @param tracer tracer
	 * @param currentTraceContext current trace context
	 * @param context Reactor context
	 * @param childSpanName child span name when there is no span in context
	 * @return updated Reactor context
	 */
	public static Context enhanceContext(Tracer tracer, CurrentTraceContext currentTraceContext,
			reactor.util.context.Context context, String childSpanName) {
		Span span = childSpanFromContext(tracer, currentTraceContext, context, childSpanName);
		return putSpanInScope(tracer, context, span);
	}

	/**
	 * Puts the provided span in scope and in Reactor context.
	 * @param tracer tracer
	 * @param context Reactor context
	 * @param span span to put in Reactor context
	 * @return mutated context
	 */
	public static Context putSpanInScope(Tracer tracer, Context context, Span span) {
		Context newContext = context.put(Span.class, span).put(TraceContext.class, span.context())
				.put(Tracer.SpanInScope.class, tracer.withSpan(span));
		return wrapContext(newContext);
	}

	/**
	 * Mutates the Reactor context depending on the classpath contents.
	 * @param context Reactor context
	 * @return mutated context
	 */
	public static Context wrapContext(Context context) {
		return contextWrappingFunction.apply(context);
	}

	/**
	 * Retreives the {@link TraceContext} from the current context.
	 * @param context Reactor context
	 * @return {@link TraceContext} or {@code null} if none present
	 */
	@SuppressWarnings("unchecked")
	public static TraceContext getParentTraceContext(Context context, TraceContext fallback) {
		AtomicReference<Span> pendingSpanRef = getPendingSpan(context);
		if (pendingSpanRef == null || pendingSpanRef.get() == null) {
			return fallback;
		}
		return pendingSpanRef.get().context();
	}

	/**
	 * Retreives the pending span from the current context.
	 * @param context Reactor context
	 * @return {@code AtomicReference} to span or {@code null} if none present
	 * @see ReactorSleuth#putPendingSpan(Context, AtomicReference)
	 */
	@SuppressWarnings("unchecked")
	public static AtomicReference<Span> getPendingSpan(ContextView context) {
		Object objectSpan = context.getOrDefault(ReactorSleuth.PENDING_SPAN_KEY, null);
		if ((objectSpan instanceof AtomicReference)) {
			return ((AtomicReference<Span>) objectSpan);
		}
		return null;
	}

	/**
	 * Mutates the {@link Context} to include a mutable reference to a span. Can be used
	 * when you need to mutate the parent operator context with a span created by a child
	 * operator.
	 * @param context Reactor context
	 * @param span atomic reference of a span
	 * @return mutated context
	 */
	public static Context putPendingSpan(Context context, AtomicReference<Span> span) {
		return context.put(PENDING_SPAN_KEY, span);
	}

	/**
	 * Retrieves span from Reactor context.
	 * @param tracer tracer
	 * @param currentTraceContext current trace context
	 * @param context context view
	 * @return span from Reactor context or creates a new one if missing
	 */
	public static Span spanFromContext(Tracer tracer, CurrentTraceContext currentTraceContext, ContextView context) {
		Span span = context.getOrDefault(Span.class, null);
		if (span != null) {
			if (log.isDebugEnabled()) {
				log.debug("Found a span in reactor context [" + span + "]");
			}
			return span;
		}
		TraceContext traceContext = context.getOrDefault(TraceContext.class, null);
		if (traceContext != null) {
			try (CurrentTraceContext.Scope scope = currentTraceContext.maybeScope(traceContext)) {
				if (log.isDebugEnabled()) {
					log.debug("Found a trace context in reactor context [" + traceContext + "]");
				}
				return tracer.currentSpan();
			}
		}
		Span newSpan = tracer.nextSpan().start();
		if (log.isDebugEnabled()) {
			log.debug("No span was found - will create a new one [" + newSpan + "]");
		}
		return newSpan;
	}

	public static Queue<?> traceQueue(ConfigurableApplicationContext springContext, Queue<?> queue) {
		if (!springContext.isActive()) {
			return queue;
		}
		CurrentTraceContext currentTraceContext = springContext.getBean(CurrentTraceContext.class);
		@SuppressWarnings("unchecked")
		Queue envelopeQueue = queue;
		return new AbstractQueue<Object>() {

			boolean cleanOnNull;

			boolean hasPrevious = false;

			Thread lastReader;

			@Override
			public int size() {
				return envelopeQueue.size();
			}

			@Override
			public boolean offer(Object o) {
				TraceContext traceContext = currentTraceContext.context();
				return envelopeQueue.offer(new Envelope(o, traceContext));
			}

			@Override
			public Object poll() {
				Object object = envelopeQueue.poll();
				if (object == null) {
					if (cleanOnNull) {
						// to clear thread-local if was just restored
						currentTraceContext.maybeScope(null);
					}
					cleanOnNull = true;
					lastReader = Thread.currentThread();
					hasPrevious = false;
					return null;
				}
				else if (object instanceof Envelope) {
					Envelope envelope = (Envelope) object;
					restoreTheContext(envelope);
					hasPrevious = true;
					return envelope.body;
				}
				hasPrevious = true;
				return object;
			}

			private void restoreTheContext(Envelope envelope) {
				TraceContext traceContext = envelope.traceContext;
				if (traceContext != null) {
					if (!traceContext.equals(currentTraceContext.context())) {
						if (!hasPrevious || !Thread.currentThread().equals(this.lastReader)) {
							// means context was restored form the envelope, thus it has
							// to be cleared
							cleanOnNull = true;
							lastReader = Thread.currentThread();
						}
						currentTraceContext.maybeScope(traceContext);
					}
					else if (!hasPrevious || !Thread.currentThread().equals(this.lastReader)) {
						// means same context was already available, no need to clean
						// anything
						cleanOnNull = false;
						lastReader = Thread.currentThread();
					}
				}
			}

			@Override
			public Object peek() {
				Object peek = queue.peek();
				if (peek instanceof Envelope) {
					Envelope envelope = (Envelope) peek;
					restoreTheContext(envelope);
					return (envelope).body;
				}
				return peek;
			}

			@Override
			@SuppressWarnings("unchecked")
			public Iterator<Object> iterator() {
				Iterator<?> iterator = queue.iterator();
				return new Iterator<Object>() {
					@Override
					public boolean hasNext() {
						return iterator.hasNext();
					}

					@Override
					public Object next() {
						Object next = iterator.next();
						if (next instanceof Envelope) {
							Envelope envelope = (Envelope) next;
							restoreTheContext(envelope);
							return (envelope).body;
						}
						return next;
					}
				};
			}
		};
	}

	static class Envelope {

		final Object body;

		final TraceContext traceContext;

		Envelope(Object body, TraceContext traceContext) {
			this.body = body;
			this.traceContext = traceContext;
		}

	}

}

class SleuthContextOperator<T> implements Subscription, CoreSubscriber<T>, Scannable {

	private final Context context;

	private final Subscriber<? super T> subscriber;

	private Subscription s;

	SleuthContextOperator(Context context, Subscriber<? super T> subscriber) {
		this.context = context;
		this.subscriber = subscriber;
	}

	@Override
	public void onSubscribe(Subscription subscription) {
		this.s = subscription;
		this.subscriber.onSubscribe(this);
	}

	@Override
	public void request(long n) {
		this.s.request(n);
	}

	@Override
	public void cancel() {
		this.s.cancel();
	}

	@Override
	public void onNext(T o) {
		this.subscriber.onNext(o);
	}

	@Override
	public void onError(Throwable throwable) {
		this.subscriber.onError(throwable);
	}

	@Override
	public void onComplete() {
		this.subscriber.onComplete();
	}

	@Override
	public Context currentContext() {
		return this.context;
	}

	@Nullable
	@Override
	public Object scanUnsafe(Attr key) {
		if (key == Attr.RUN_STYLE) {
			return Attr.RunStyle.SYNC;
		}
		return null;
	}

}