TracingKafkaConsumer.java
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.kafka;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
/**
* This decorates a Kafka {@link Consumer}. It creates and completes a
* {@link Span.Kind#CONSUMER} span for each record received. This span will be a child
* span of the one extracted from the record headers.
*
* @author Anders Clausen
* @author Flaviu Muresan
* @since 3.1.0
*/
public class TracingKafkaConsumer<K, V> implements Consumer<K, V> {
private final BeanFactory beanFactory;
private final Consumer<K, V> delegate;
private Propagator propagator;
private Propagator.Getter<ConsumerRecord<?, ?>> extractor;
public TracingKafkaConsumer(Consumer<K, V> consumer, BeanFactory beanFactory) {
this.delegate = consumer;
this.beanFactory = beanFactory;
}
private Propagator propagator() {
if (this.propagator == null) {
this.propagator = this.beanFactory.getBean(Propagator.class);
}
return this.propagator;
}
private Propagator.Getter<ConsumerRecord<?, ?>> extractor() {
if (this.extractor == null) {
this.extractor = (Propagator.Getter<ConsumerRecord<?, ?>>) beanFactory
.getBeanProvider(ResolvableType.forClassWithGenerics(Propagator.Getter.class,
ResolvableType.forType(new ParameterizedTypeReference<ConsumerRecord<?, ?>>() {
})))
.getIfAvailable();
}
return this.extractor;
}
@Override
public Set<TopicPartition> assignment() {
return this.delegate.assignment();
}
@Override
public Set<String> subscription() {
return this.delegate.subscription();
}
@Override
public void subscribe(Collection<String> collection) {
this.delegate.subscribe(collection);
}
@Override
public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
this.delegate.subscribe(collection, consumerRebalanceListener);
}
@Override
public void assign(Collection<TopicPartition> collection) {
this.delegate.assign(collection);
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
this.delegate.subscribe(pattern, consumerRebalanceListener);
}
@Override
public void subscribe(Pattern pattern) {
this.delegate.subscribe(pattern);
}
@Override
public void unsubscribe() {
this.delegate.unsubscribe();
}
@Deprecated
@Override
public ConsumerRecords<K, V> poll(long l) {
ConsumerRecords<K, V> consumerRecords = this.delegate.poll(l);
for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
KafkaTracingUtils.buildAndFinishSpan(SleuthKafkaSpan.KAFKA_CONSUMER_SPAN, consumerRecord, propagator(),
extractor());
}
return consumerRecords;
}
@Override
public ConsumerRecords<K, V> poll(Duration duration) {
ConsumerRecords<K, V> consumerRecords = this.delegate.poll(duration);
for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
KafkaTracingUtils.buildAndFinishSpan(SleuthKafkaSpan.KAFKA_CONSUMER_SPAN, consumerRecord, propagator(),
extractor());
}
return consumerRecords;
}
@Override
public void commitSync() {
this.delegate.commitSync();
}
@Override
public void commitSync(Duration duration) {
this.delegate.commitSync(duration);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
this.delegate.commitSync(map);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
this.delegate.commitSync(map, duration);
}
@Override
public void commitAsync() {
this.delegate.commitAsync();
}
@Override
public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
this.delegate.commitAsync(offsetCommitCallback);
}
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
this.delegate.commitAsync(map, offsetCommitCallback);
}
@Override
public void seek(TopicPartition topicPartition, long l) {
this.delegate.seek(topicPartition, l);
}
@Override
public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
this.delegate.seek(topicPartition, offsetAndMetadata);
}
@Override
public void seekToBeginning(Collection<TopicPartition> collection) {
this.delegate.seekToBeginning(collection);
}
@Override
public void seekToEnd(Collection<TopicPartition> collection) {
this.delegate.seekToEnd(collection);
}
@Override
public long position(TopicPartition topicPartition) {
return this.delegate.position(topicPartition);
}
@Override
public long position(TopicPartition topicPartition, Duration duration) {
return this.delegate.position(topicPartition, duration);
}
@Override
@Deprecated
public OffsetAndMetadata committed(TopicPartition topicPartition) {
return this.delegate.committed(topicPartition);
}
@Override
@Deprecated
public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
return this.delegate.committed(topicPartition, duration);
}
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
return this.delegate.committed(set);
}
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set, Duration duration) {
return this.delegate.committed(set, duration);
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return this.delegate.metrics();
}
@Override
public List<PartitionInfo> partitionsFor(String s) {
return this.delegate.partitionsFor(s);
}
@Override
public List<PartitionInfo> partitionsFor(String s, Duration duration) {
return this.delegate.partitionsFor(s, duration);
}
@Override
public Map<String, List<PartitionInfo>> listTopics() {
return this.delegate.listTopics();
}
@Override
public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
return this.delegate.listTopics(duration);
}
@Override
public Set<TopicPartition> paused() {
return this.delegate.paused();
}
@Override
public void pause(Collection<TopicPartition> collection) {
this.delegate.pause(collection);
}
@Override
public void resume(Collection<TopicPartition> collection) {
this.delegate.resume(collection);
}
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
return this.delegate.offsetsForTimes(map);
}
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
return this.delegate.offsetsForTimes(map, duration);
}
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
return this.delegate.beginningOffsets(collection);
}
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
return this.delegate.beginningOffsets(collection, duration);
}
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
return this.delegate.endOffsets(collection);
}
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
return this.delegate.endOffsets(collection, duration);
}
@Override
public ConsumerGroupMetadata groupMetadata() {
return this.delegate.groupMetadata();
}
@Override
public void enforceRebalance() {
this.delegate.enforceRebalance();
}
@Override
public void close() {
this.delegate.close();
}
@Override
public void close(Duration duration) {
this.delegate.close(duration);
}
@Override
public void wakeup() {
this.delegate.wakeup();
}
@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
return this.delegate.currentLag(topicPartition);
}
}