StatusTrackerTest.java

package redis.clients.jedis.mcf;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.awaitility.Durations;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import redis.clients.jedis.HostAndPort;

public class StatusTrackerTest {

  @Mock
  private HealthStatusManager mockHealthStatusManager;

  private StatusTracker statusTracker;
  private HostAndPort testEndpoint;

  @BeforeEach
  void setUp() {
    MockitoAnnotations.openMocks(this);
    statusTracker = new StatusTracker(mockHealthStatusManager);
    testEndpoint = new HostAndPort("localhost", 6379);
  }

  @Test
  void testWaitForHealthStatus_AlreadyDetermined() {
    // Given: Health status is already HEALTHY
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.HEALTHY);

    // When: Waiting for health status
    HealthStatus result = statusTracker.waitForHealthStatus(testEndpoint);

    // Then: Should return immediately without waiting
    assertEquals(HealthStatus.HEALTHY, result);
    verify(mockHealthStatusManager, never()).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));
  }

  @Test
  void testWaitForHealthStatus_EventDriven() throws InterruptedException {
    // Given: Health status is initially UNKNOWN
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN) // First
                                                                                                 // call
        .thenReturn(HealthStatus.UNKNOWN); // Second call after registering listener
    when(mockHealthStatusManager.getMaxWaitFor(testEndpoint)).thenReturn(3000L);

    // Capture the registered listener
    final HealthStatusListener[] capturedListener = new HealthStatusListener[1];
    doAnswer(invocation -> {
      capturedListener[0] = invocation.getArgument(1);
      return null;
    }).when(mockHealthStatusManager).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));

    // When: Start waiting in a separate thread
    CountDownLatch testLatch = new CountDownLatch(1);
    final HealthStatus[] result = new HealthStatus[1];

    Thread waitingThread = new Thread(() -> {
      result[0] = statusTracker.waitForHealthStatus(testEndpoint);
      testLatch.countDown();
    });
    waitingThread.start();

    // Give some time for the listener to be registered
    Thread.sleep(50);

    // Simulate health status change event
    assertNotNull(capturedListener[0], "Listener should have been registered");
    HealthStatusChangeEvent event = new HealthStatusChangeEvent(testEndpoint, HealthStatus.UNKNOWN,
        HealthStatus.HEALTHY);
    capturedListener[0].onStatusChange(event);

    // Then: Should complete and return the new status
    assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete within timeout");
    assertEquals(HealthStatus.HEALTHY, result[0]);

    // Verify cleanup
    verify(mockHealthStatusManager).unregisterListener(eq(testEndpoint), eq(capturedListener[0]));
  }

  @Test
  void testWaitForHealthStatus_IgnoresUnknownStatus() throws InterruptedException {
    // Given: Health status is initially UNKNOWN
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN);
    when(mockHealthStatusManager.getMaxWaitFor(testEndpoint)).thenReturn(3000L);

    // Capture the registered listener
    final HealthStatusListener[] capturedListener = new HealthStatusListener[1];
    doAnswer(invocation -> {
      capturedListener[0] = invocation.getArgument(1);
      return null;
    }).when(mockHealthStatusManager).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));

    // When: Start waiting in a separate thread
    CountDownLatch testLatch = new CountDownLatch(1);
    final HealthStatus[] result = new HealthStatus[1];

    Thread waitingThread = new Thread(() -> {
      result[0] = statusTracker.waitForHealthStatus(testEndpoint);
      testLatch.countDown();
    });
    waitingThread.start();

    // Give some time for the listener to be registered
    Thread.sleep(50);

    // Simulate UNKNOWN status change (should be ignored)
    assertNotNull(capturedListener[0], "Listener should have been registered");
    HealthStatusChangeEvent unknownEvent = new HealthStatusChangeEvent(testEndpoint,
        HealthStatus.UNKNOWN, HealthStatus.UNKNOWN);
    capturedListener[0].onStatusChange(unknownEvent);

    // Should not complete yet
    assertFalse(testLatch.await(100, TimeUnit.MILLISECONDS),
      "Should not complete with UNKNOWN status");

    // Now send a real status change
    HealthStatusChangeEvent realEvent = new HealthStatusChangeEvent(testEndpoint,
        HealthStatus.UNKNOWN, HealthStatus.UNHEALTHY);
    capturedListener[0].onStatusChange(realEvent);

    // Then: Should complete now
    assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete with real status");
    assertEquals(HealthStatus.UNHEALTHY, result[0]);
  }

  @Test
  void testWaitForHealthStatus_IgnoresOtherEndpoints() throws InterruptedException {
    // Given: Health status is initially UNKNOWN
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN);
    when(mockHealthStatusManager.getMaxWaitFor(testEndpoint)).thenReturn(3000L);
    HostAndPort otherEndpoint = new HostAndPort("other", 6379);

    // Capture the registered listener
    final HealthStatusListener[] capturedListener = new HealthStatusListener[1];
    doAnswer(invocation -> {
      capturedListener[0] = invocation.getArgument(1);
      return null;
    }).when(mockHealthStatusManager).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));

    // When: Start waiting in a separate thread
    CountDownLatch testLatch = new CountDownLatch(1);
    final HealthStatus[] result = new HealthStatus[1];

    Thread waitingThread = new Thread(() -> {
      result[0] = statusTracker.waitForHealthStatus(testEndpoint);
      testLatch.countDown();
    });
    waitingThread.start();

    // Give some time for the listener to be registered
    await().atMost(Durations.FIVE_HUNDRED_MILLISECONDS)
        .pollInterval(Durations.ONE_HUNDRED_MILLISECONDS).untilAsserted(() -> {
          assertNotNull(capturedListener[0], "Listener should have been registered");
        });

    // Simulate status change for different endpoint (should be ignored)
    HealthStatusChangeEvent otherEvent = new HealthStatusChangeEvent(otherEndpoint,
        HealthStatus.UNKNOWN, HealthStatus.HEALTHY);
    capturedListener[0].onStatusChange(otherEvent);

    // Should not complete yet
    assertFalse(testLatch.await(100, TimeUnit.MILLISECONDS),
      "Should not complete with other endpoint");

    // Now send event for correct endpoint
    HealthStatusChangeEvent correctEvent = new HealthStatusChangeEvent(testEndpoint,
        HealthStatus.UNKNOWN, HealthStatus.HEALTHY);
    capturedListener[0].onStatusChange(correctEvent);

    // Then: Should complete now
    assertTrue(testLatch.await(1, TimeUnit.SECONDS), "Should complete with correct endpoint");
    assertEquals(HealthStatus.HEALTHY, result[0]);
  }

  @Test
  void testWaitForHealthStatus_InterruptHandling() {
    // Given: Health status is initially UNKNOWN and will stay that way
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN);
    when(mockHealthStatusManager.getMaxWaitFor(any())).thenReturn(3000L);

    AtomicReference<String> interruptedThreadName = new AtomicReference<>();
    AtomicReference<Throwable> thrownException = new AtomicReference<>();
    AtomicReference<Boolean> isInterrupted = new AtomicReference<>();
    // When: Interrupt thse waiting thread
    Thread testThread = new Thread(() -> {
      try {
        statusTracker.waitForHealthStatus(testEndpoint);
        fail("Should have thrown JedisConnectionException due to interrupt");
      } catch (Exception e) {
        interruptedThreadName.set(Thread.currentThread().getName());
        thrownException.set(e);
        isInterrupted.set(Thread.currentThread().isInterrupted());
      }
    });

    testThread.start();

    // Give thread time to start waiting
    try {
      Thread.sleep(50);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    // Interrupt the waiting thread
    testThread.interrupt();

    try {
      testThread.join(1000);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    assertFalse(testThread.isAlive(), "Test thread should have completed");
    assertTrue(thrownException.get().getMessage().contains("Interrupted while waiting"));
    assertTrue(isInterrupted.get(), "Thread should be interrupted");
  }

  @Test
  void testWaitForHealthStatus_RaceConditionProtection() {
    // Given: Health status changes between first check and listener registration
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN) // First
                                                                                                 // call
        .thenReturn(HealthStatus.HEALTHY); // Second call after registering listener

    // When: Waiting for health status
    HealthStatus result = statusTracker.waitForHealthStatus(testEndpoint);

    // Then: Should return the status from the second check without waiting
    assertEquals(HealthStatus.HEALTHY, result);

    // Verify listener was registered and unregistered
    verify(mockHealthStatusManager).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));
    verify(mockHealthStatusManager).unregisterListener(eq(testEndpoint),
      any(HealthStatusListener.class));
  }

  @Test
  void testWaitForHealthStatus_ListenerCleanupOnException() {
    // Given: Health status is initially UNKNOWN
    when(mockHealthStatusManager.getHealthStatus(testEndpoint)).thenReturn(HealthStatus.UNKNOWN);

    // Mock registerListener to throw an exception
    doThrow(new RuntimeException("Registration failed")).when(mockHealthStatusManager)
        .registerListener(eq(testEndpoint), any(HealthStatusListener.class));

    // When: Waiting for health status
    assertThrows(RuntimeException.class, () -> {
      statusTracker.waitForHealthStatus(testEndpoint);
    });

    // Then: Should still attempt to unregister (cleanup in finally block)
    verify(mockHealthStatusManager).registerListener(eq(testEndpoint),
      any(HealthStatusListener.class));
    // Note: unregisterListener might not be called if registerListener fails,
    // but the finally block should handle this gracefully
  }
}