StatusTracker.java

package redis.clients.jedis.mcf;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import redis.clients.jedis.Endpoint;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisValidationException;

/**
 * StatusTracker is responsible for tracking and waiting for health status changes for specific
 * endpoints. It provides an event-driven approach to wait for health status transitions from
 * UNKNOWN to either HEALTHY or UNHEALTHY.
 */
public class StatusTracker {

  private final HealthStatusManager healthStatusManager;

  public StatusTracker(HealthStatusManager healthStatusManager) {
    this.healthStatusManager = healthStatusManager;
  }

  /**
   * Waits for a specific endpoint's health status to be determined (not UNKNOWN). Uses event-driven
   * approach with CountDownLatch to avoid polling.
   * @param endpoint the endpoint to wait for
   * @return the determined health status (HEALTHY or UNHEALTHY)
   * @throws JedisConnectionException if interrupted while waiting
   */
  public HealthStatus waitForHealthStatus(Endpoint endpoint) {
    // First check if status is already determined
    HealthStatus currentStatus = healthStatusManager.getHealthStatus(endpoint);
    if (currentStatus != HealthStatus.UNKNOWN) {
      return currentStatus;
    }

    // Set up event-driven waiting
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<HealthStatus> resultStatus = new AtomicReference<>();

    // Create a temporary listener for this specific endpoint
    HealthStatusListener tempListener = new HealthStatusListener() {
      @Override
      public void onStatusChange(HealthStatusChangeEvent event) {
        if (event.getEndpoint().equals(endpoint) && event.getNewStatus() != HealthStatus.UNKNOWN) {
          resultStatus.set(event.getNewStatus());
          latch.countDown();
        }
      }
    };

    // Register the temporary listener
    healthStatusManager.registerListener(endpoint, tempListener);

    try {
      // Double-check status after registering listener (race condition protection)
      currentStatus = healthStatusManager.getHealthStatus(endpoint);
      if (currentStatus != HealthStatus.UNKNOWN) {
        return currentStatus;
      }

      // Wait for the health status change event
      // just for safety to not block indefinitely
      boolean completed = latch.await(healthStatusManager.getMaxWaitFor(endpoint),
        TimeUnit.MILLISECONDS);
      if (!completed) {
        throw new JedisValidationException("Timeout while waiting for health check result");
      }
      return resultStatus.get();

    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new JedisConnectionException("Interrupted while waiting for health check result", e);
    } finally {
      // Clean up: unregister the temporary listener
      healthStatusManager.unregisterListener(endpoint, tempListener);
    }
  }
}