ClusterPipeline.java
package redis.clients.jedis;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;
/**
* Pipeline implementation for Redis Cluster mode.
* <p>
* ClusterPipeline allows batching multiple commands for efficient execution in a Redis Cluster
* environment. Commands are automatically routed to the appropriate cluster nodes based on
* key hash slots.
* </p>
* <p>
* <strong>Important Limitations:</strong>
* </p>
* <ul>
* <li><strong>Single-node commands only:</strong> Only commands that can be routed to a single
* node are supported. Commands requiring execution on multiple nodes (ALL_SHARDS, MULTI_SHARD,
* ALL_NODES, or SPECIAL request policies) will throw {@link UnsupportedOperationException}.</li>
* <li><strong>Examples of unsupported commands:</strong>
* <ul>
* <li>{@code KEYS} - requires execution on all master shards</li>
* <li>{@code MGET} with keys in different slots - requires execution on multiple shards</li>
* <li>{@code SCRIPT LOAD} - requires execution on all nodes</li>
* </ul>
* </li>
* <li>For multi-node commands, use the non-pipelined mode
* of {@link RedisClusterClient} instead.</li>
* </ul>
* <p>
* <strong> Usage Pattern:</strong>
* </p>
* <pre>{@code
* try (RedisCluster cluster = new RedisCluster(nodes, config)) {
* // For single-node commands, use pipelined mode
* try (ClusterPipeline pipeline = cluster.pipelined()) {
* Response<String> r1 = pipeline.set("key1", "value1");
* Response<String> r2 = pipeline.get("key1");
* pipeline.sync();
*
* System.out.println(r1.get()); // "OK"
* System.out.println(r2.get()); // "value1"
* }
*
* // For multi-node commands, use non-pipelined mode
* Set<String> allKeys = cluster.keys("*"); // Executes on all master shards
* List<String> values = cluster.mget("key1", "key2", "key3"); // Cross-slot keys
* }
* }</pre>
*
* @see MultiNodePipelineBase
* @see redis.clients.jedis.RedisClusterClient
*/
public class ClusterPipeline extends MultiNodePipelineBase {
private final ClusterConnectionProvider provider;
private AutoCloseable closeable = null;
public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}
public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}
public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}
public ClusterPipeline(ClusterConnectionProvider provider) {
this(provider, new ClusterCommandObjects());
}
public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects) {
super(commandObjects);
this.provider = provider;
}
ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
CommandFlagsRegistry commandFlagsRegistry, ExecutorService executorService) {
super(commandObjects, commandFlagsRegistry, executorService);
this.provider = provider;
}
private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
ClusterCommandObjects cco = new ClusterCommandObjects();
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);
return cco;
}
@Override
public void close() {
try {
super.close();
} finally {
IOUtils.closeQuietly(closeable);
}
}
@Override
protected HostAndPort getNodeKey(CommandArguments args) {
Set<Integer> slots = args.getKeyHashSlots();
if (slots.size() > 1) {
throw new JedisClusterOperationException("Cannot get NodeKey for command with multiple hash slots");
}
if (slots.isEmpty()) {
return null; // Let getConnection(null) handle it by using a random node
}
return provider.getNode(slots.iterator().next());
}
@Override
protected Connection getConnection(HostAndPort nodeKey) {
return provider.getConnection(nodeKey);
}
public Response<Long> spublish(String channel, String message) {
return appendCommand(commandObjects.spublish(channel, message));
}
public Response<Long> spublish(byte[] channel, byte[] message) {
return appendCommand(commandObjects.spublish(channel, message));
}
}