RedisClusterClient.java
package redis.clients.jedis;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import redis.clients.jedis.builders.ClusterClientBuilder;
import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.params.MSetExParams;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.providers.ConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;
// @formatter:off
/**
* RedisClusterClient provides a high-level, unified interface for interacting with a Redis Cluster.
* <p>
* This class is intended as a modern replacement for the deprecated {@code JedisCluster} class. It
* supports all cluster operations and is designed to work seamlessly with the {@link UnifiedJedis}
* API, allowing for consistent usage patterns across standalone, sentinel, and cluster deployments.
* <p>
* <b>Usage:</b>
*
* <pre>{@code
* Set<HostAndPort> clusterNodes = new HashSet<>();
* clusterNodes.add(new HostAndPort("127.0.0.1", 7000));
* RedisClusterClient client = RedisClusterClient.create(clusterNodes);
* client.set("key", "value");
* String value = client.get("key");
* }</pre>
* <p>
* <b>Migration:</b> Users of {@code JedisCluster} are encouraged to migrate to this class for
* improved API consistency, better resource management, and enhanced support for future Redis
* features.
* <p>
* <b>Thread-safety:</b> This client is thread-safe and can be shared across multiple threads.
* <p>
* <b>Configuration:</b> Use the {@link #builder()} method for advanced configuration, or the
* {@link #create(HostAndPort)} and {@link #create(Set)} factory methods for simple use cases.
*/
// @formatter:on
public class RedisClusterClient extends UnifiedJedis {
public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";
/**
* Default timeout in milliseconds.
*/
public static final int DEFAULT_TIMEOUT = 2000;
/**
* Default amount of attempts for executing a command
*/
public static final int DEFAULT_MAX_ATTEMPTS = 5;
private final CommandFlagsRegistry commandFlagsRegistry;
private RedisClusterClient(CommandExecutor commandExecutor, ConnectionProvider connectionProvider,
CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache,
CommandFlagsRegistry commandFlagsRegistry) {
super(commandExecutor, connectionProvider, commandObjects, redisProtocol, cache);
this.commandFlagsRegistry = commandFlagsRegistry;
}
/**
* Creates a RedisClusterClient instance. The provided node is used to make the first contact with
* the cluster.
* <p>
* Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms
* is being used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
* attempts.
* <p>
* This is a convenience factory method that uses the builder pattern internally.
* @param node Node to first connect to.
* @return a new {@link RedisClusterClient} instance
*/
public static RedisClusterClient create(HostAndPort node) {
return builder().nodes(Collections.singleton(node))
.clientConfig(DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build())
.maxAttempts(DEFAULT_MAX_ATTEMPTS)
.maxTotalRetriesDuration(Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
.build();
}
/**
* Creates a RedisClusterClient with multiple entry points.
* <p>
* Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms
* is being used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
* attempts.
* <p>
* This is a convenience factory method that uses the builder pattern internally.
* @param nodes Nodes to connect to.
* @return a new {@link RedisClusterClient} instance
*/
public static RedisClusterClient create(Set<HostAndPort> nodes) {
return builder().nodes(nodes)
.clientConfig(DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build())
.maxAttempts(DEFAULT_MAX_ATTEMPTS)
.maxTotalRetriesDuration(Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
.build();
}
/**
* Creates a RedisClusterClient with multiple entry points and authentication.
* <p>
* Here, the default timeout of {@value redis.clients.jedis.Protocol#DEFAULT_TIMEOUT} ms is being
* used with {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum
* attempts.
* <p>
* This is a convenience factory method that uses the builder pattern internally.
* @param nodes Nodes to connect to.
* @param user Username for authentication.
* @param password Password for authentication.
* @return a new {@link RedisClusterClient} instance
*/
public static RedisClusterClient create(Set<HostAndPort> nodes, String user, String password) {
return builder().nodes(nodes)
.clientConfig(DefaultJedisClientConfig.builder().user(user).password(password).build())
.maxAttempts(DEFAULT_MAX_ATTEMPTS).maxTotalRetriesDuration(
Duration.ofMillis((long) Protocol.DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS))
.build();
}
/**
* Fluent builder for {@link RedisClusterClient} (Redis Cluster).
* <p>
* Obtain an instance via {@link #builder()}.
* </p>
*/
public static class Builder extends ClusterClientBuilder<RedisClusterClient> {
@Override
protected RedisClusterClient createClient() {
return new RedisClusterClient(commandExecutor, connectionProvider, commandObjects,
clientConfig.getRedisProtocol(), cache, getCommandFlags());
}
}
/**
* Create a new builder for configuring RedisClusterClient instances.
* @return a new {@link RedisClusterClient.Builder} instance
*/
public static Builder builder() {
return new Builder();
}
/**
* Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).<br>
* Key is the HOST:PORT and the value is the connection pool.
* @return the map of all connections.
*/
public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
/**
* Returns the connection for one of the 16,384 slots.
* @param slot the slot to retrieve the connection for.
* @return connection of the provided slot. {@code close()} of this connection must be called
* after use.
*/
public Connection getConnectionFromSlot(int slot) {
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
}
// commands
public long spublish(String channel, String message) {
return executeCommand(commandObjects.spublish(channel, message));
}
public long spublish(byte[] channel, byte[] message) {
return executeCommand(commandObjects.spublish(channel, message));
}
public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) {
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
jedisPubSub.proceed(connection, channels);
}
}
public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) {
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
jedisPubSub.proceed(connection, channels);
}
}
// commands
/**
* Creates a new pipeline for executing commands in a Redis Cluster.
*
* <p>Pipelining allows batching multiple commands for more efficient execution
* by reducing network round-trips. In a cluster environment, commands are routed
* to the appropriate nodes based on key hash slots.</p>
*
* <p>If the pipeline spans multiple nodes, a dedicated {@link ExecutorService} is
* created internally to execute requests in parallel and shutdown when the pipeline
* is synced.</p>
*
* @return a new {@link ClusterPipeline} instance
* @see #pipelined(ExecutorService)
*/
@Override
public ClusterPipeline pipelined() {
return pipelined(null);
}
/**
* Creates a new pipeline for executing commands in a Redis Cluster using the provided executor.
*
* <p>Pipelining allows batching multiple commands for more efficient execution
* by reducing network round-trips. In a cluster environment, commands are routed
* to the appropriate nodes based on key hash slots.</p>
*
* <p>If the pipeline spans multiple nodes, the provided {@link ExecutorService} is
* used to execute requests in parallel. The caller is responsible for managing
* the lifecycle of this executor (creation, shutdown, etc.).</p>
*
* <p>If {@code null} is provided, a dedicated executor is created and managed
* internally, similar to {@link #pipelined()}.</p>
*
* @param executorService the executor to use for multi-node execution, or {@code null}
* @return a new {@link ClusterPipeline} instance
*/
public ClusterPipeline pipelined(ExecutorService executorService) {
return new ClusterPipeline(
(ClusterConnectionProvider) provider,
(ClusterCommandObjects) commandObjects,
commandFlagsRegistry,
executorService
);
}
/**
* @param doMulti param
* @return nothing
* @throws UnsupportedOperationException
*/
@Override
public AbstractTransaction transaction(boolean doMulti) {
throw new UnsupportedOperationException();
}
public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
if (!(executor instanceof ClusterCommandExecutor)) {
throw new UnsupportedOperationException(
"Support only execute to replica in ClusterCommandExecutor");
}
return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject);
}
/**
* Broadcast a command to all primary nodes in the cluster.
* <p>
* This method is useful for administrative commands that need to be executed on all primary nodes,
* such as {@code PING}, {@code CONFIG SET}, {@code FLUSHALL}, etc.
* </p>
* @param commandObject the command to broadcast
* @param <T> the return type of the command
* @return the aggregated reply from all primary nodes
* @throws UnsupportedOperationException if the executor is not a ClusterCommandExecutor
*/
public final <T> T broadcastCommand(CommandObject<T> commandObject) {
if (!(executor instanceof ClusterCommandExecutor)) {
throw new UnsupportedOperationException(
"Broadcast command is only supported in ClusterCommandExecutor");
}
return ((ClusterCommandExecutor) executor).broadcastCommand(commandObject, true);
}
// ==================== Multi-Shard Command Methods ====================
// These methods execute commands across multiple Redis cluster shards when keys
// hash to different slots, aggregating the results appropriately.
private <T> T executeMultiShardCommand(List<CommandObject<T>> commandObjects) {
if (!(executor instanceof ClusterCommandExecutor)) {
throw new UnsupportedOperationException(
"Multi-shard command is only supported in ClusterCommandExecutor");
}
return ((ClusterCommandExecutor) executor).executeMultiShardCommand(commandObjects);
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes DEL on each shard,
* aggregating the results (sum of deleted keys).
* </p>
*/
@Override
public long del(String... keys) {
return executeMultiShardCommand(getClusterCommandObjects().delMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes DEL on each shard,
* aggregating the results (sum of deleted keys).
* </p>
*/
@Override
public long del(byte[]... keys) {
return executeMultiShardCommand(getClusterCommandObjects().delMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes EXISTS on each shard,
* aggregating the results (sum of existing keys).
* </p>
*/
@Override
public long exists(String... keys) {
return executeMultiShardCommand(getClusterCommandObjects().existsMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes EXISTS on each shard,
* aggregating the results (sum of existing keys).
* </p>
*/
@Override
public long exists(byte[]... keys) {
return executeMultiShardCommand(getClusterCommandObjects().existsMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes MGET on each shard,
* concatenating the results.
* </p>
*
* <p><b>Order Guarantee:</b> The returned values are in the same order as the input keys.
* Each {@code values.get(i)} corresponds to {@code keys[i]}.</p>
*
* <p><b>Performance Tip:</b> For better performance, pre-sort keys by hash slot before calling
* this method. This minimizes the number of separate Redis commands by allowing consecutive
* keys with the same slot to be batched together. Example:</p>
* <pre>{@code
* // Sort keys by hash slot for optimal batching
* String[] sortedKeys = Arrays.stream(keys)
* .sorted(Comparator.comparingInt(JedisClusterCRC16::getSlot))
* .toArray(String[]::new);
* List<String> values = client.mget(sortedKeys);
* }</pre>
*/
@Override
public List<String> mget(String... keys) {
return executeMultiShardCommand(getClusterCommandObjects().mgetMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes MGET on each shard,
* concatenating the results.
* </p>
*
* <p><b>Order Guarantee:</b> The returned values are in the same order as the input keys.
* Each {@code values.get(i)} corresponds to {@code keys[i]}.</p>
*
* <p><b>Performance Tip:</b> For better performance, pre-sort keys by hash slot before calling
* this method. This minimizes the number of separate Redis commands by allowing consecutive
* keys with the same slot to be batched together. Example:</p>
* <pre>{@code
* // Sort keys by hash slot for optimal batching
* byte[][] sortedKeys = Arrays.stream(keys)
* .sorted(Comparator.comparingInt(JedisClusterCRC16::getSlot))
* .toArray(byte[][]::new);
* List<byte[]> values = client.mget(sortedKeys);
* }</pre>
*/
@Override
public List<byte[]> mget(byte[]... keys) {
return executeMultiShardCommand(getClusterCommandObjects().mgetMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the key-value pairs by hash slot and executes MSET
* on each shard.
* </p>
*/
@Override
public String mset(String... keysvalues) {
return executeMultiShardCommand(getClusterCommandObjects().msetMultiShard(keysvalues));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the key-value pairs by hash slot and executes MSET
* on each shard.
* </p>
*/
@Override
public String mset(byte[]... keysvalues) {
return executeMultiShardCommand(getClusterCommandObjects().msetMultiShard(keysvalues));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes TOUCH on each shard,
* aggregating the results (sum of touched keys).
* </p>
*/
@Override
public long touch(String... keys) {
return executeMultiShardCommand(getClusterCommandObjects().touchMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes TOUCH on each shard,
* aggregating the results (sum of touched keys).
* </p>
*/
@Override
public long touch(byte[]... keys) {
return executeMultiShardCommand(getClusterCommandObjects().touchMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes UNLINK on each shard,
* aggregating the results (sum of unlinked keys).
* </p>
*/
@Override
public long unlink(String... keys) {
return executeMultiShardCommand(getClusterCommandObjects().unlinkMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the keys by hash slot and executes UNLINK on each shard,
* aggregating the results (sum of unlinked keys).
* </p>
*/
@Override
public long unlink(byte[]... keys) {
return executeMultiShardCommand(getClusterCommandObjects().unlinkMultiShard(keys));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the key-value pairs by hash slot and executes MSETEX
* on each shard with the provided parameters.
* </p>
*/
@Override
public boolean msetex(MSetExParams params, String... keysvalues) {
return executeMultiShardCommand(getClusterCommandObjects().msetexMultiShard(params, keysvalues));
}
/**
* {@inheritDoc}
* <p>
* This override automatically splits the key-value pairs by hash slot and executes MSETEX
* on each shard with the provided parameters.
* </p>
*/
@Override
public boolean msetex(MSetExParams params, byte[]... keysvalues) {
return executeMultiShardCommand(getClusterCommandObjects().msetexMultiShard(params, keysvalues));
}
private ClusterCommandObjects getClusterCommandObjects() {
return (ClusterCommandObjects) commandObjects;
}
}