SleuthRxJavaSchedulersHook.java

/*
 * Copyright 2013-2021 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.sleuth.instrument.rxjava;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;

/**
 * {@link RxJavaSchedulersHook} that wraps an {@link Action0} into its tracing
 * representation.
 *
 * @author Shivang Shah
 * @since 1.0.0
 */
public class SleuthRxJavaSchedulersHook extends RxJavaSchedulersHook {

	private static final Log log = LogFactory.getLog(SleuthRxJavaSchedulersHook.class);

	private static final String RXJAVA_COMPONENT = "rxjava";

	private final Tracer tracer;

	private final List<Pattern> threadsToIgnore;

	private RxJavaSchedulersHook delegate;

	public SleuthRxJavaSchedulersHook(Tracer tracer, List<String> threadsToIgnore) {
		this.tracer = tracer;
		this.threadsToIgnore = toPatternList(threadsToIgnore);
		try {
			this.delegate = RxJavaPlugins.getInstance().getSchedulersHook();
			if (this.delegate instanceof SleuthRxJavaSchedulersHook) {
				return;
			}
			RxJavaErrorHandler errorHandler = RxJavaPlugins.getInstance().getErrorHandler();
			RxJavaObservableExecutionHook observableExecutionHook = RxJavaPlugins.getInstance()
					.getObservableExecutionHook();
			logCurrentStateOfRxJavaPlugins(errorHandler, observableExecutionHook);
			RxJavaPlugins.getInstance().reset();
			RxJavaPlugins.getInstance().registerSchedulersHook(this);
			RxJavaPlugins.getInstance().registerErrorHandler(errorHandler);
			RxJavaPlugins.getInstance().registerObservableExecutionHook(observableExecutionHook);
		}
		catch (Exception ex) {
			log.error("Failed to register Sleuth RxJava SchedulersHook", ex);
		}
	}

	private List<Pattern> toPatternList(List<String> threadsToIgnore) {
		if (threadsToIgnore == null || threadsToIgnore.size() == 0) {
			return Collections.emptyList();
		}
		List<Pattern> patterns = new ArrayList<>(threadsToIgnore.size());
		for (String thread : threadsToIgnore) {
			patterns.add(Pattern.compile(thread));
		}
		return Collections.unmodifiableList(patterns);
	}

	private void logCurrentStateOfRxJavaPlugins(RxJavaErrorHandler errorHandler,
			RxJavaObservableExecutionHook observableExecutionHook) {
		if (log.isDebugEnabled()) {
			log.debug("Current RxJava plugins configuration is [" + "schedulersHook [" + this.delegate + "],"
					+ "errorHandler [" + errorHandler + "]," + "observableExecutionHook [" + observableExecutionHook
					+ "]," + "]");
			log.debug("Registering Sleuth RxJava Schedulers Hook.");
		}
	}

	@Override
	public Action0 onSchedule(Action0 action) {
		if (action instanceof TraceAction) {
			return action;
		}
		Action0 wrappedAction = this.delegate != null ? this.delegate.onSchedule(action) : action;
		if (wrappedAction instanceof TraceAction) {
			return action;
		}
		return super.onSchedule(new TraceAction(this.tracer, wrappedAction, this.threadsToIgnore));
	}

	/**
	 * Wrapped Action element.
	 *
	 * @author Marcin Grzejszczak
	 */
	static class TraceAction implements Action0 {

		private final Action0 actual;

		private final Tracer tracer;

		private final Span parent;

		private final List<Pattern> threadsToIgnore;

		TraceAction(Tracer tracer, Action0 actual, List<Pattern> threadsToIgnore) {
			this.tracer = tracer;
			this.threadsToIgnore = threadsToIgnore;
			this.parent = this.tracer.currentSpan();
			this.actual = actual;
		}

		@SuppressWarnings("Duplicates")
		@Override
		public void call() {
			// don't create a span if the thread name is on a list of threads to ignore
			String threadName = Thread.currentThread().getName();
			for (Pattern threadToIgnore : this.threadsToIgnore) {
				if (threadToIgnore.matcher(threadName).matches()) {
					if (log.isTraceEnabled()) {
						log.trace(String.format(
								"Thread with name [%s] matches the regex [%s]. A span will not be created for this Thread.",
								threadName, threadToIgnore));
					}
					this.actual.call();
					return;
				}
			}
			Span span = this.parent;
			boolean created = false;
			if (span == null) {
				span = SleuthRxJavaSpan.RX_JAVA_TRACE_ACTION_SPAN.wrap(this.tracer.nextSpan())
						.name(SleuthRxJavaSpan.RX_JAVA_TRACE_ACTION_SPAN.getName())
						.tag(SleuthRxJavaSpan.Tags.THREAD, Thread.currentThread().getName()).start();
				created = true;
			}
			try (Tracer.SpanInScope ws = this.tracer.withSpan(span)) {
				this.actual.call();
			}
			finally {
				if (created) {
					span.end();
				}
			}
		}

	}

}