LagAwareStrategy.java

package redis.clients.jedis.mcf;

import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.RedisCredentials;
import redis.clients.jedis.SslOptions;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.Endpoint;

public class LagAwareStrategy implements HealthCheckStrategy {

  private static final Logger log = LoggerFactory.getLogger(LagAwareStrategy.class);

  private final Config config;
  private final RedisRestAPI redisRestAPI;
  private String bdbId;

  public LagAwareStrategy(Config config) {
    this.config = config;
    this.redisRestAPI = new RedisRestAPI(config.getRestEndpoint(), config.getCredentialsSupplier(),
        config.getTimeout(), config.getSslOptions());
  }

  @Override
  public int getInterval() {
    return config.interval;
  }

  @Override
  public int getTimeout() {
    return config.timeout;
  }

  @Override
  public int getNumProbes() {
    return config.getNumProbes();
  }

  @Override
  public ProbingPolicy getPolicy() {
    return config.getPolicy();
  }

  @Override
  public int getDelayInBetweenProbes() {
    return config.getDelayInBetweenProbes();
  }

  @Override
  public HealthStatus doHealthCheck(Endpoint endpoint) {
    try {
      String bdb = bdbId;
      if (bdb == null) {
        // Try to find BDB that matches the database host
        String dbHost = endpoint.getHost();
        List<RedisRestAPI.BdbInfo> bdbs = redisRestAPI.getBdbs();
        RedisRestAPI.BdbInfo matchingBdb = RedisRestAPI.BdbInfo.findMatchingBdb(bdbs, dbHost);

        if (matchingBdb == null) {
          String msg = String.format("No BDB found matching host '%s' for health check", dbHost);
          log.warn(msg);
          throw new JedisException(msg);
        } else {
          bdb = matchingBdb.getUid();
          log.debug("Found matching BDB '{}' for host '{}'", bdb, dbHost);
          bdbId = bdb;
        }
      }
      if (this.config.isExtendedCheckEnabled()) {
        // Use extended check with lag validation
        if (redisRestAPI.checkBdbAvailability(bdb, true,
          this.config.getAvailabilityLagTolerance().toMillis())) {
          return HealthStatus.HEALTHY;
        }
      } else {
        // Use standard datapath validation only
        if (redisRestAPI.checkBdbAvailability(bdb, false)) {
          return HealthStatus.HEALTHY;
        }
      }
    } catch (Exception e) {
      log.error("Error while checking database availability", e);
      bdbId = null;
      throw new JedisException("Error while checking availability", e);
    }
    return HealthStatus.UNHEALTHY;
  }

  public static class Config extends HealthCheckStrategy.Config {

    public static final boolean EXTENDED_CHECK_DEFAULT = true;
    public static final Duration AVAILABILITY_LAG_TOLERANCE_DEFAULT = Duration.ofMillis(5000);

    private final Endpoint restEndpoint;
    private final Supplier<RedisCredentials> credentialsSupplier;

    // SSL configuration for HTTPS connections to Redis Enterprise REST API
    private final SslOptions sslOptions;

    // Maximum acceptable lag in milliseconds (default: 5000);
    private final Duration availability_lag_tolerance;

    // Enable extended lag checking (default: true - performs lag validation in addition to standard
    // datapath
    // validation )
    private final boolean extendedCheckEnabled;

    public Config(Endpoint restEndpoint, Supplier<RedisCredentials> credentialsSupplier) {
      this(builder(restEndpoint, credentialsSupplier)
          .availabilityLagTolerance(AVAILABILITY_LAG_TOLERANCE_DEFAULT)
          .extendedCheckEnabled(EXTENDED_CHECK_DEFAULT));
    }

    private Config(ConfigBuilder builder) {
      super(builder);

      this.restEndpoint = builder.restEndpoint;
      this.credentialsSupplier = builder.credentialsSupplier;
      this.sslOptions = builder.sslOptions;
      this.availability_lag_tolerance = builder.availabilityLagTolerance;
      this.extendedCheckEnabled = builder.extendedCheckEnabled;
    }

    public Endpoint getRestEndpoint() {
      return restEndpoint;
    }

    public Supplier<RedisCredentials> getCredentialsSupplier() {
      return credentialsSupplier;
    }

    public SslOptions getSslOptions() {
      return sslOptions;
    }

    public Duration getAvailabilityLagTolerance() {
      return availability_lag_tolerance;
    }

    public boolean isExtendedCheckEnabled() {
      return extendedCheckEnabled;
    }

    /**
     * Create a new builder for LagAwareStrategy.Config.
     * @param restEndpoint the Redis Enterprise REST API endpoint
     * @param credentialsSupplier the credentials supplier
     * @return a new ConfigBuilder instance
     */
    public static ConfigBuilder builder(Endpoint restEndpoint,
        Supplier<RedisCredentials> credentialsSupplier) {
      return new ConfigBuilder(restEndpoint, credentialsSupplier);
    }

    /**
     * Use {@link LagAwareStrategy.Config#builder(Endpoint, Supplier)} instead.
     * @return a new Builder instance
     */
    public static ConfigBuilder builder() {
      throw new UnsupportedOperationException(
          "Endpoint and credentials are required to build LagAwareStrategy.Config.");
    }

    /**
     * Create a new Config instance with default values.
     * <p>
     * Extended checks like lag validation is enabled by default. With a default lag tolerance of
     * 100ms. To perform only standard datapath validation, use
     * {@link #databaseAvailability(Endpoint, Supplier)}. To configure a custom lag tolerance, use
     * {@link #lagAwareWithTolerance(Endpoint, Supplier, Duration)}
     * </p>
     */
    public static Config create(Endpoint restEndpoint,
        Supplier<RedisCredentials> credentialsSupplier) {
      return new ConfigBuilder(restEndpoint, credentialsSupplier).build();
    }

    /**
     * Perform standard datapath validation only.
     * <p>
     * Extended checks like lag validation is disabled by default. To enable extended checks, use
     * {@link #lagAware(Endpoint, Supplier)} or
     * {@link #lagAwareWithTolerance(Endpoint, Supplier, Duration)}
     * </p>
     */
    public static Config databaseAvailability(Endpoint restEndpoint,
        Supplier<RedisCredentials> credentialsSupplier) {
      return new ConfigBuilder(restEndpoint, credentialsSupplier).extendedCheckEnabled(false)
          .build();
    }

    /**
     * Perform standard datapath validation and lag validation using the default lag tolerance.
     * <p>
     * To configure a custom lag tolerance, use
     * {@link #lagAwareWithTolerance(Endpoint, Supplier, Duration)}
     * </p>
     */
    public static Config lagAware(Endpoint restEndpoint,
        Supplier<RedisCredentials> credentialsSupplier) {
      return new ConfigBuilder(restEndpoint, credentialsSupplier).extendedCheckEnabled(true)
          .build();
    }

    /**
     * Perform standard datapath validation and lag validation using the specified lag tolerance.
     */
    public static Config lagAwareWithTolerance(Endpoint restEndpoint,
        Supplier<RedisCredentials> credentialsSupplier, Duration availabilityLagTolerance) {
      return new ConfigBuilder(restEndpoint, credentialsSupplier).extendedCheckEnabled(true)
          .availabilityLagTolerance(availabilityLagTolerance).build();
    }

    /**
     * Builder for LagAwareStrategy.Config.
     */
    public static class ConfigBuilder
        extends HealthCheckStrategy.Config.Builder<ConfigBuilder, Config> {
      private final Endpoint restEndpoint;
      private final Supplier<RedisCredentials> credentialsSupplier;

      // SSL configuration for HTTPS connections
      private SslOptions sslOptions;

      // Maximum acceptable lag in milliseconds (default: 100);
      private Duration availabilityLagTolerance = AVAILABILITY_LAG_TOLERANCE_DEFAULT;

      // Enable extended lag checking
      private boolean extendedCheckEnabled = EXTENDED_CHECK_DEFAULT;

      private ConfigBuilder(Endpoint restEndpoint, Supplier<RedisCredentials> credentialsSupplier) {
        this.restEndpoint = restEndpoint;
        this.credentialsSupplier = credentialsSupplier;
      }

      /**
       * Set SSL options for HTTPS connections to Redis Enterprise REST API. This allows
       * configuration of custom truststore, keystore, and SSL parameters for secure connections.
       * @param sslOptions the SSL configuration options
       * @return this builder
       */
      public ConfigBuilder sslOptions(SslOptions sslOptions) {
        this.sslOptions = sslOptions;
        return this;
      }

      /**
       * Set the maximum acceptable lag in milliseconds.
       * @param availabilityLagTolerance the lag tolerance in milliseconds (default: 100)
       * @return this builder
       */
      public ConfigBuilder availabilityLagTolerance(Duration availabilityLagTolerance) {
        this.availabilityLagTolerance = availabilityLagTolerance;
        return this;
      }

      /**
       * Enable extended lag checking. When enabled, performs lag validation in addition to standard
       * datapath validation. When disabled performs only standard datapath validation - all slots
       * are available.
       * @param extendedCheckEnabled true to enable extended lag checking (default: false)
       * @return this builder
       */
      public ConfigBuilder extendedCheckEnabled(boolean extendedCheckEnabled) {
        this.extendedCheckEnabled = extendedCheckEnabled;
        return this;
      }

      /**
       * Build the Config instance.
       * @return a new Config instance
       */
      @Override
      public Config build() {
        return new Config(this);
      }
    }
  }
}