KafkaBinderHealthIndicatorTest.java

/*
 * Copyright 2017-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import org.springframework.boot.health.contributor.Health;
import org.springframework.boot.health.contributor.Status;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;

import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;

/**
 * @author Barry Commins
 * @author Gary Russell
 * @author Laur Aliste
 * @author Soby Chacko
 * @author Chukwubuikem Ume-Ugwa
 * @author Taras Danylchuk
 */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaBinderHealthIndicatorTest {

	private static final String TEST_TOPIC = "test";

	private static final String REGEX_TOPIC = "regex*";

	private KafkaBinderHealthIndicator indicator;

	@Mock
	private DefaultKafkaConsumerFactory consumerFactory;

	@Mock
	private KafkaConsumer consumer;

	@Mock
	AbstractMessageListenerContainer<?, ?> listenerContainerA;

	@Mock
	AbstractMessageListenerContainer<?, ?> listenerContainerB;

	@Mock
	private KafkaMessageChannelBinder binder;

	private final Map<String, TopicInformation> topicsInUse = new HashMap<>();

	@BeforeEach
	public void setup() {
		MockitoAnnotations.initMocks(this);
		org.mockito.BDDMockito.given(consumerFactory.createConsumer())
				.willReturn(consumer);
		org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
		this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
		this.indicator.setTimeout(10);
	}

	@Test
	void kafkaBinderIsUpWithNoConsumers() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group1-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		org.mockito.BDDMockito.given(binder.getKafkaMessageListenerContainers())
				.willReturn(Collections.emptyList());

		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UP);
		assertThat(health.getDetails()).containsEntry("topicsInUse", singleton(TEST_TOPIC));
	}

	@Test
	void kafkaBinderIsUp() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group1-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		org.mockito.BDDMockito.given(binder.getKafkaMessageListenerContainers())
				.willReturn(Arrays.asList(listenerContainerA, listenerContainerB));
		mockContainer(listenerContainerA, true, true);
		mockContainer(listenerContainerB, true, true);

		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UP);
		assertThat(health.getDetails()).containsEntry("topicsInUse", singleton(TEST_TOPIC));
		assertThat(health.getDetails()).hasEntrySatisfying("listenerContainers", value ->
				assertThat((ArrayList<?>) value).hasSize(2));
	}

	@Test
	void kafkaBinderIsDownWhenOneOfConsumersIsNotRunning() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group1-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		org.mockito.BDDMockito.given(binder.getKafkaMessageListenerContainers())
				.willReturn(Arrays.asList(listenerContainerA, listenerContainerB));
		mockContainer(listenerContainerA, false, true);
		mockContainer(listenerContainerB, true, true);

		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UP);
		assertThat(health.getDetails()).containsEntry("topicsInUse", singleton(TEST_TOPIC));
		assertThat(health.getDetails()).hasEntrySatisfying("listenerContainers", value ->
				assertThat((ArrayList<?>) value).hasSize(2));
	}

	@Test
	void kafkaBinderIsDownWhenOneOfContainersWasStoppedAbnormally() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group1-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		org.mockito.BDDMockito.given(binder.getKafkaMessageListenerContainers())
				.willReturn(Arrays.asList(listenerContainerA, listenerContainerB));
		mockContainer(listenerContainerA, false, false);
		mockContainer(listenerContainerB, true, true);

		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);
		assertThat(health.getDetails()).containsEntry("topicsInUse", singleton(TEST_TOPIC));
		assertThat(health.getDetails()).hasEntrySatisfying("listenerContainers", value ->
				assertThat((ArrayList<?>) value).hasSize(2));
	}

	private void mockContainer(AbstractMessageListenerContainer<?, ?> container, boolean isRunning,
			boolean normalState) {

		org.mockito.BDDMockito.given(container.isRunning()).willReturn(isRunning);
		org.mockito.BDDMockito.given(container.isContainerPaused()).willReturn(true);
		org.mockito.BDDMockito.given(container.getListenerId()).willReturn("someListenerId");
		org.mockito.BDDMockito.given(container.getGroupId()).willReturn("someGroupId");
		org.mockito.BDDMockito.given(container.isInExpectedState()).willReturn(normalState);
	}

	@Test
	void kafkaBinderIsUpWithRegexTopic() {
		topicsInUse.put(REGEX_TOPIC, new TopicInformation(
				"regex-healthIndicator", null, true));
		Health health = indicator.health();
		// verify no consumer interaction for retrieving partitions
		org.mockito.BDDMockito.verify(consumer, Mockito.never())
				.partitionsFor(REGEX_TOPIC);
		// Ensuring the normal health check returns with status "up"
		assertThat(health.getStatus()).isEqualTo(Status.UP);
	}

	@Test
	void downWhenListTopicsThrowExceptionWithRegexTopic() {
		topicsInUse.put(REGEX_TOPIC, new TopicInformation(
			"regex-healthIndicator", null, true));
		org.mockito.BDDMockito.given(consumer.listTopics(any(Duration.class)))
			.willThrow(new IllegalStateException());

		Health health = indicator.health();

		// Ensuring the normal health check returns with status "up"
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);
		Map<String, Object> details = health.getDetails();
		assertThat(details.containsKey("Cluster not connected")).isTrue();
		assertThat(details
			.containsValue("Destination provided is a pattern, but cannot connect to the cluster for any verification")).isTrue();
	}


	@Test
	void kafkaBinderIsDown() {
		final List<PartitionInfo> partitions = partitions(null);
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group2-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);
	}

	@Test
	void kafkaBinderIsDownWhenConsiderDownWhenAnyPartitionHasNoLeaderIsTrue() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		partitions.add(new PartitionInfo(TEST_TOPIC, 0, null, null, null));
		indicator.setConsiderDownWhenAnyPartitionHasNoLeader(true);
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group2-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);
	}

	@Test
	void kafkaBinderIsUpWhenConsiderDownWhenAnyPartitionHasNoLeaderIsFalse() {
		Node node = new Node(0, null, 0);
		final List<PartitionInfo> partitions = partitions(node);
		partitions.add(new PartitionInfo(TEST_TOPIC, 0, null, null, null));
		indicator.setConsiderDownWhenAnyPartitionHasNoLeader(false);
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group2-healthIndicator", partitions(node), false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);
		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UP);
	}

	@Test
	@Timeout(5)
	void kafkaBinderDoesNotAnswer() {
		final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group3-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willAnswer(invocation -> {
					final int fiveMinutes = 1000 * 60 * 5;
					Thread.sleep(fiveMinutes);
					return partitions;
				});
		this.indicator.setTimeout(1);
		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);
	}

	@Test
	void createsConsumerOnceWhenInvokedMultipleTimes() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"group4-healthIndicator", partitions, false));
		org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC))
				.willReturn(partitions);

		indicator.health();
		Health health = indicator.health();

		assertThat(health.getStatus()).isEqualTo(Status.UP);
		org.mockito.Mockito.verify(this.consumerFactory).createConsumer();
	}

	@Test
	void consumerCreationFailsFirstTime() {
		final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
		topicsInUse.put(TEST_TOPIC, new TopicInformation(
				"foo-healthIndicator", partitions, false));

		org.mockito.BDDMockito.given(consumerFactory.createConsumer())
				.willThrow(KafkaException.class).willReturn(consumer);

		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.DOWN);

		health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UP);

		org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2))
				.createConsumer();
	}

	@Test
	void testIfNoTopicsRegisteredByTheBinderProvidesDownStatus() {
		Health health = indicator.health();
		assertThat(health.getStatus()).isEqualTo(Status.UNKNOWN);
	}

	private List<PartitionInfo> partitions(Node leader) {
		List<PartitionInfo> partitions = new ArrayList<>();
		partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));
		return partitions;
	}
}