KafkaBinderMetricsTest.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
 * @author Henryk Konsek
 * @author Thomas Cheyney
 * @author Soby Chacko
 * @author Lars Bilger
 * @author Tomek Szmytka
 * @author Nico Heller
 * @author Kurt Hong
 * @author Artem Bilan
 */
class KafkaBinderMetricsTest {

	private static final String TEST_TOPIC = "test";

	private KafkaBinderMetrics metrics;

	@Mock
	private DefaultKafkaConsumerFactory consumerFactory;

	@Mock
	private KafkaConsumer consumer;

	@Mock
	private KafkaMessageChannelBinder binder;

	private MeterRegistry meterRegistry = new SimpleMeterRegistry();

	private Map<String, TopicInformation> topicsInUse = new HashMap<>();

	@Mock(answer = Answers.RETURNS_DEEP_STUBS)
	private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;

	@BeforeEach
	public void setup() {
		MockitoAnnotations.openMocks(this);
		org.mockito.BDDMockito.given(consumerFactory
				.createConsumer(ArgumentMatchers.any(), ArgumentMatchers.any()))
			.willReturn(consumer);
		org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
		org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().isDefaultOffsetLagMetricsEnabled())
			.willReturn(true);
		org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval())
			.willReturn(Duration.ofSeconds(60));
		metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties,
			consumerFactory, meterRegistry
		);
		org.mockito.BDDMockito
			.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
			.willReturn(java.util.Collections
				.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
	}

	@Test
	void shouldIndicateLag() {
		final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
		TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
		committed.put(topicPartition, new OffsetAndMetadata(500));
		org.mockito.BDDMockito
			.given(consumer.committed(ArgumentMatchers.anySet()))
			.willReturn(committed);
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group1-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).hasSize(1);
		assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group1-metrics").tag("topic", TEST_TOPIC).gauge().value())
			.isEqualTo(500.0);
	}

	@Test
	void shouldFallbackToScheduledOffsetLagComputationWhenRealtimeOffsetLagIsDisabled() {
		final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
		TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
		committed.put(topicPartition, new OffsetAndMetadata(500));
		org.mockito.BDDMockito
			.given(consumer.committed(ArgumentMatchers.anySet()))
			.willReturn(committed);
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group1-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().isDefaultOffsetLagMetricsEnabled())
			.willReturn(false);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).hasSize(1);
		assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group1-metrics").tag("topic", TEST_TOPIC).gauge().value())
			.isEqualTo(0);

		org.mockito.BDDMockito.given(kafkaBinderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval())
			.willReturn(Duration.ofSeconds(1));
		metrics.bindTo(meterRegistry);
		Awaitility.waitAtMost(Duration.ofSeconds(5)).untilAsserted(() -> {
			assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
				.tag("group", "group1-metrics").tag("topic", TEST_TOPIC).gauge().value())
				.isEqualTo(500);
		});
	}

	@Test
	void shouldNotContainAnyMetricsWhenUsingNoopGauge() {
		// Adding NoopGauge for the offset metric.
		meterRegistry.config().meterFilter(
			MeterFilter.denyNameStartsWith("spring.cloud.stream.binder.kafka.offset"));

		// Because we have NoopGauge for the offset metric in the meter registry, none of these expectations matter.
		org.mockito.BDDMockito
			.given(consumer.committed(ArgumentMatchers.anySet()))
			.willReturn(java.util.Map.of(new TopicPartition(TEST_TOPIC, 0), new OffsetAndMetadata(500)));
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group1-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		metrics.bindTo(meterRegistry);

		// Because of the NoopGauge, the meterRegistry should contain no metric.
		assertThat(meterRegistry.getMeters()).hasSize(0);
	}

	@Test
	void shouldSumUpPartitionsLags() {
		Map<TopicPartition, Long> endOffsets = new HashMap<>();
		endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
		endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
		org.mockito.BDDMockito
			.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
			.willReturn(endOffsets);
		final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
		TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC, 0);
		TopicPartition topicPartition2 = new TopicPartition(TEST_TOPIC, 1);
		committed.put(topicPartition1, new OffsetAndMetadata(500));
		committed.put(topicPartition2, new OffsetAndMetadata(500));
		org.mockito.BDDMockito
			.given(consumer.committed(ArgumentMatchers.anySet()))
			.willReturn(committed);
		List<PartitionInfo> partitions = partitions(
			new Node(0, null, 0),
			new Node(0, null, 0)
		);
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group2-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).hasSize(1);
		assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group2-metrics").tag("topic", TEST_TOPIC).gauge().value())
			.isEqualTo(1000.0);
	}

	@Test
	void shouldIndicateFullLagForNotCommittedGroups() {
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group3-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).hasSize(1);
		assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group3-metrics").tag("topic", TEST_TOPIC).gauge().value())
			.isEqualTo(1000.0);
	}

	@Test
	void shouldNotCalculateLagForProducerTopics() {
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions, false));
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).isEmpty();
	}

	@Test
	void createsConsumerOnceWhenInvokedMultipleTimes() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group4-metrics", partitions, false)
		);

		metrics.bindTo(meterRegistry);

		Gauge gauge = meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group4-metrics").tag("topic", TEST_TOPIC).gauge();
		gauge.value();
		assertThat(gauge.value()).isEqualTo(1000.0);

		org.mockito.Mockito.verify(this.consumerFactory)
			.createConsumer(ArgumentMatchers.any(), ArgumentMatchers.any());
	}

	@Test
	void consumerCreationFailsFirstTime() {
		org.mockito.BDDMockito
			.given(consumerFactory.createConsumer(
				ArgumentMatchers.any(),
				ArgumentMatchers.any()
			))
			.willThrow(KafkaException.class).willReturn(consumer);

		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group5-metrics", partitions, false)
		);

		metrics.bindTo(meterRegistry);

		Gauge gauge = meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group5-metrics").tag("topic", TEST_TOPIC).gauge();
		assertThat(gauge.value()).isEqualTo(0);
		assertThat(gauge.value()).isEqualTo(1000.0);

		org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2))
			.createConsumer(ArgumentMatchers.any(), ArgumentMatchers.any());
	}

	@Test
	void createOneConsumerPerGroup() {
		final List<PartitionInfo> partitions1 = partitions(new Node(0, null, 0));
		final List<PartitionInfo> partitions2 = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group1-metrics", partitions1, false)
		);
		topicsInUse.put(
			"test2",
			new TopicInformation("group2-metrics", partitions2, false)
		);

		metrics.bindTo(meterRegistry);

		KafkaConsumer consumer2 = mock(KafkaConsumer.class);
		org.mockito.BDDMockito
			.given(consumerFactory.createConsumer(
				ArgumentMatchers.eq("group2-metrics"), ArgumentMatchers.any()))
			.willReturn(consumer2);
		org.mockito.BDDMockito
			.given(consumer2.endOffsets(ArgumentMatchers.anyCollection()))
			.willReturn(java.util.Collections
				.singletonMap(new TopicPartition("test2", 0), 50L));

		Gauge gauge1 = meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group1-metrics").tag("topic", TEST_TOPIC).gauge();
		Gauge gauge2 = meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group2-metrics").tag("topic", "test2").gauge();
		gauge1.value();
		gauge2.value();
		assertThat(gauge1.value()).isEqualTo(1000.0);
		assertThat(gauge2.value()).isEqualTo(50.0);

		org.mockito.Mockito.verify(this.consumerFactory).createConsumer(
			ArgumentMatchers.eq("group1-metrics"), ArgumentMatchers.any());
		org.mockito.Mockito.verify(this.consumerFactory).createConsumer(
			ArgumentMatchers.eq("group2-metrics"), ArgumentMatchers.any());
	}

	@Test
	public void usesBeginningOffsetIfNoCommittedOffsetFound() {
		org.mockito.BDDMockito
			.given(consumer.committed(ArgumentMatchers.anySet()))
			.willReturn(Collections.emptyMap());
		final Map<TopicPartition, Long> beginnings = new HashMap<>();
		TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
		beginnings.put(topicPartition, 500L);
		org.mockito.BDDMockito
			.given(consumer.beginningOffsets(ArgumentMatchers.anySet()))
			.willReturn(beginnings);
		List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group1-metrics", partitions, false)
		);
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
			.willReturn(partitions);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.getMeters()).hasSize(1);
		assertThat(meterRegistry.get(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME)
			.tag("group", "group1-metrics").tag("topic", TEST_TOPIC).gauge().value())
			.isEqualTo(500.0);
	}

	@Test
	public void shouldShutdownSchedulerOnClose() {
		metrics.bindTo(meterRegistry);
		assertThat(metrics.scheduler).isNotNull();
		metrics.close();
		assertThat(metrics.scheduler).isNull();
	}

	@Test
	public void shouldUnregisterMetersOnClose() throws Exception {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(
			TEST_TOPIC,
			new TopicInformation("group4-metrics", partitions, false)
		);
		metrics.bindTo(meterRegistry);
		assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).hasSize(1);
		metrics.close();
		assertThat(meterRegistry.find(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME).meters()).isEmpty();
	}

	private List<PartitionInfo> partitions(Node... nodes) {
		List<PartitionInfo> partitions = new ArrayList<>();
		for (int i = 0; i < nodes.length; i++) {
			partitions.add(new PartitionInfo(TEST_TOPIC, i, nodes[i], null, null));
		}
		return partitions;
	}

}