ReactorHooksHelper.java
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.reactor;
import java.util.Objects;
import java.util.function.BiFunction;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import org.springframework.util.Assert;
/**
* Helper class for reactor ON_EACH instrumentation. Reduces number of sleuth operators
* time in the reactive chain during Assembly by using {@code Scannable.Attr.RUN_STYLE}.
* Below is the example of what it does <pre>{@code
* Mono.just(0) // (-)
* .map(it -> 1) // (-)
* .flatMap(it ->
* Mono.just(it) // (-)
* ) // (-)
* .flatMapMany(it ->
* asyncPublisher(it) // (+)(*)
* ) // (-)
* .filter(it -> true) // (-)
* .publishOn(Schedulers.single()) //(+)
* .map(it -> it) // (-)
* .map(it -> it * 10) // (-)
* .filter(it -> true) // (-)
* .scan((l, r) -> l + r) // (-)
* .doOnNext(it -> { // (-)
* //log
* })
* .doFirst(() -> { // (-)
* //log
* })
* .doFinally(signalType -> { // (-)
* //log
* })
* .subscribeOn(Schedulers.parallel()) //(+)
* .subscribe();//(*)
* (*) - captures tracing context if it differs from what was captured before at subscription and propagates it.
* As far as check is performed on Subscriber Context it allocates Publisher to access it.
* (+) - is ASYNC need to decorate Publisher
* (-) - is SYNC no need to decorate Publisher
*}</pre> So it creates 13 (out of 16) less Publisher (+ Subscriber and all their stuff)
* on assembly time comparing to the original logic (decorate every Publisher and checking
* only at subscription time).
* <p/>
* It also handles the case when onEachHook was not applied for some operator in the chain
* (It could be possible if custom operators or Processor are used) by checkin source
* source Publisher. <pre>{@code
* processor/customOperator // (?) is async and hook is not applied
* .map(it -> ...) // (+) is SYNC but should add hook as previous Processor/operator does not use hooks
* .doOnNext(it -> { // (-) is SYNC no need to wrap
* //log
* })
* .subscribe();
*}</pre>
*
* @author Roman Matiushchenko
*/
final class ReactorHooksHelper {
static final String LIFTER_NAME = "org.springframework.cloud.sleuth.instrument.reactor.ReactorHooksHelper.ScopePassingLifter";
// need a way to determine SYNC sources to not add redundant scope passing decorator
// most of reactor-core SYNC sources are marked with SourceProducer interface
static final Class<?> sourceProducerClass;
static {
Class<?> c;
try {
c = Class.forName("reactor.core.publisher.SourceProducer");
}
catch (ClassNotFoundException e) {
c = Void.class;
}
sourceProducerClass = c;
}
private ReactorHooksHelper() {
}
/**
* Determines whether to decorate input publisher with
* {@code ScopePassingSpanOperator}.
* @param p publisher to check
* @return returns true if input publisher or one of its source publishers is not
* {@code RunStyle.SYNC} and there is no {@link TraceContextPropagator} between input
* publisher and not SYNC publisher.
*/
public static boolean shouldDecorate(Publisher<?> p) {
Assert.notNull(p, "source Publisher is null");
Publisher<?> current = p;
while (true) {
if (current == null) {
// is start of the chain, Publisher without source or foreign Publisher
return true;
}
if (current instanceof Fuseable.ScalarCallable) {
return false;
}
if (isTraceContextPropagator(current)) {
return false;
}
if (!isSync(current)) {
boolean isLifter = getLifterName(current) != null;
if (isLifter) {
return shouldDecorateLifter(current);
}
return true;
}
if (isSourceProducer(current)) {
return false;
}
current = getParent(current);
}
}
/**
* xxxLift Publishers get their RunStyle from source Publisher. So need to check
* whether current chain was decorated with scope passing operator.
* @param p first not sync lifter Publisher in the chain.
* @return {@code true} if Publisher chain does not contain lifter with
* {@link #LIFTER_NAME} name.
*/
private static boolean shouldDecorateLifter(Publisher<?> p) {
Publisher<?> current = getParent(p);
while (true) {
if (current == null) {
// is start of the chain, Publisher without source or foreign Publisher
return true;
}
String lifterName = getLifterName(current);
if (isScopePassingLifter(lifterName)) {
return false;
}
if (lifterName == null) {
return true;
}
current = getParent(current);
}
}
private static boolean isScopePassingLifter(String lifterName) {
return lifterName == LIFTER_NAME;
}
private static String getLifterName(Publisher<?> current) {
return Scannable.from(current).scan(Scannable.Attr.LIFTER);
}
public static boolean isTraceContextPropagator(Publisher<?> current) {
return current instanceof TraceContextPropagator || isScopePassingLifter(getLifterName(current));
}
private static boolean isSourceProducer(Publisher<?> p) {
return sourceProducerClass.isInstance(p);
}
private static boolean isSync(Publisher<?> p) {
return !(p instanceof Processor)
&& Scannable.Attr.RunStyle.SYNC == Scannable.from(p).scan(Scannable.Attr.RUN_STYLE);
}
@Nullable
private static Publisher<?> getParent(Publisher<?> publisher) {
Object parent = Scannable.from(publisher).scanUnsafe(Scannable.Attr.PARENT);
if (parent instanceof Publisher) {
return (Publisher<?>) parent;
}
return null;
}
/**
* @param name function name.
* @param delegate delegate function.
* @param <T> the type of the first argument to the function
* @param <U> the type of the second argument to the function
* @param <R> the type of the result of the function
* @return function that {@link Object#toString()} returns provided name which is used
* as a value of {@link Scannable.Attr#LIFTER} attribute.
*/
static <T, U, R> BiFunction<T, U, R> named(String name, BiFunction<T, U, R> delegate) {
return new NamedLifter<>(name, delegate);
}
static class NamedLifter<T, U, R> implements BiFunction<T, U, R> {
private final BiFunction<T, U, R> delegate;
private final String name;
NamedLifter(String name, BiFunction<T, U, R> delegate) {
this.name = Objects.requireNonNull(name, "name");
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
@Override
public R apply(T t, U u) {
return delegate.apply(t, u);
}
@Override
public String toString() {
return name;
}
}
}