RedisClusterClient.java

package redis.clients.jedis;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import redis.clients.jedis.builders.ClusterClientBuilder;
import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

// @formatter:off
/**
 * RedisClusterClient provides a high-level, unified interface for interacting with a Redis Cluster.
 * <p>
 * This class is intended as a modern replacement for the deprecated {@code JedisCluster} class. It
 * supports all cluster operations and is designed to work seamlessly with the {@link UnifiedJedis}
 * API, allowing for consistent usage patterns across standalone, sentinel, and cluster deployments.
 * <p>
 * <b>Usage:</b>
 *
 * <pre>{@code
 *   Set<HostAndPort> clusterNodes = new HashSet<>();
 *   clusterNodes.add(new HostAndPort("127.0.0.1", 7000));
 *   RedisClusterClient client = RedisClusterClient.create(clusterNodes);
 *   client.set("key", "value");
 *   String value = client.get("key");
 * }</pre>
 * <p>
 * <b>Migration:</b> Users of {@code JedisCluster} are encouraged to migrate to this class for
 * improved API consistency, better resource management, and enhanced support for future Redis
 * features.
 * <p>
 * <b>Thread-safety:</b> This client is thread-safe and can be shared across multiple threads.
 * <p>
 * <b>Configuration:</b> Use the {@link #builder()} method for advanced configuration, or the
 * {@link #create(HostAndPort)} and {@link #create(Set)} factory methods for simple use cases.
 */
// @formatter:on
public class RedisClusterClient extends UnifiedJedis {

  public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";

  /**
   * Default timeout in milliseconds.
   */
  public static final int DEFAULT_TIMEOUT = 2000;

  /**
   * Default amount of attempts for executing a command
   */
  public static final int DEFAULT_MAX_ATTEMPTS = 5;

  private RedisClusterClient(CommandExecutor commandExecutor, ConnectionProvider connectionProvider,
      CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache) {
    super(commandExecutor, connectionProvider, commandObjects, redisProtocol, cache);
  }

  /**
   * Creates a RedisClusterClient instance. The provided node is used to make the first contact with
   * the cluster.
   * <p>
   * Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms
   * is being used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
   * attempts.
   * <p>
   * This is a convenience factory method that uses the builder pattern internally.
   * @param node Node to first connect to.
   * @return a new {@link RedisClusterClient} instance
   */
  public static RedisClusterClient create(HostAndPort node) {
    return builder().nodes(Collections.singleton(node))
        .clientConfig(DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build())
        .maxAttempts(DEFAULT_MAX_ATTEMPTS)
        .maxTotalRetriesDuration(Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
        .build();
  }

  /**
   * Creates a RedisClusterClient with multiple entry points.
   * <p>
   * Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms
   * is being used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
   * attempts.
   * <p>
   * This is a convenience factory method that uses the builder pattern internally.
   * @param nodes Nodes to connect to.
   * @return a new {@link RedisClusterClient} instance
   */
  public static RedisClusterClient create(Set<HostAndPort> nodes) {
    return builder().nodes(nodes)
        .clientConfig(DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build())
        .maxAttempts(DEFAULT_MAX_ATTEMPTS)
        .maxTotalRetriesDuration(Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
        .build();
  }

  /**
   * Creates a RedisClusterClient with multiple entry points and authentication.
   * <p>
   * Here, the default timeout of {@value redis.clients.jedis.Protocol#DEFAULT_TIMEOUT} ms is being
   * used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
   * attempts.
   * <p>
   * This is a convenience factory method that uses the builder pattern internally.
   * @param nodes Nodes to connect to.
   * @param user Username for authentication.
   * @param password Password for authentication.
   * @return a new {@link RedisClusterClient} instance
   */
  public static RedisClusterClient create(Set<HostAndPort> nodes, String user, String password) {
    return builder().nodes(nodes)
        .clientConfig(DefaultJedisClientConfig.builder().user(user).password(password).build())
        .maxAttempts(DEFAULT_MAX_ATTEMPTS).maxTotalRetriesDuration(
          Duration.ofMillis((long) Protocol.DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
        .build();
  }

  /**
   * Fluent builder for {@link RedisClusterClient} (Redis Cluster).
   * <p>
   * Obtain an instance via {@link #builder()}.
   * </p>
   */
  public static class Builder extends ClusterClientBuilder<RedisClusterClient> {

    @Override
    protected RedisClusterClient createClient() {
      return new RedisClusterClient(commandExecutor, connectionProvider, commandObjects,
          clientConfig.getRedisProtocol(), cache);
    }
  }

  /**
   * Create a new builder for configuring RedisClusterClient instances.
   * @return a new {@link RedisClusterClient.Builder} instance
   */
  public static Builder builder() {
    return new Builder();
  }

  /**
   * Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).<br>
   * Key is the HOST:PORT and the value is the connection pool.
   * @return the map of all connections.
   */
  public Map<String, ConnectionPool> getClusterNodes() {
    return ((ClusterConnectionProvider) provider).getNodes();
  }

  /**
   * Returns the connection for one of the 16,384 slots.
   * @param slot the slot to retrieve the connection for.
   * @return connection of the provided slot. {@code close()} of this connection must be called
   *         after use.
   */
  public Connection getConnectionFromSlot(int slot) {
    return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
  }

  // commands
  public long spublish(String channel, String message) {
    return executeCommand(commandObjects.spublish(channel, message));
  }

  public long spublish(byte[] channel, byte[] message) {
    return executeCommand(commandObjects.spublish(channel, message));
  }

  public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) {
    try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
      jedisPubSub.proceed(connection, channels);
    }
  }

  public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) {
    try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
      jedisPubSub.proceed(connection, channels);
    }
  }
  // commands

  @Override
  public ClusterPipeline pipelined() {
    return new ClusterPipeline((ClusterConnectionProvider) provider,
        (ClusterCommandObjects) commandObjects);
  }

  /**
   * @param doMulti param
   * @return nothing
   * @throws UnsupportedOperationException
   */
  @Override
  public AbstractTransaction transaction(boolean doMulti) {
    throw new UnsupportedOperationException();
  }

  public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
    if (!(executor instanceof ClusterCommandExecutor)) {
      throw new UnsupportedOperationException(
          "Support only execute to replica in ClusterCommandExecutor");
    }
    return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject);
  }
}