MultiClusterPipeline.java
package redis.clients.jedis.mcf;
import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import redis.clients.jedis.*;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.KeyValue;
/**
* This is high memory dependent solution as all the appending commands will be hold in memory until
* {@link MultiClusterPipeline#sync() SYNC} (or {@link MultiClusterPipeline#close() CLOSE}) gets called.
*/
@Experimental
public class MultiClusterPipeline extends PipelineBase implements Closeable {
private final CircuitBreakerFailoverConnectionProvider failoverProvider;
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();
@Deprecated
public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) {
super(new CommandObjects());
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
try (Connection connection = failoverProvider.getConnection()) {
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
}
}
public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
}
@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Response<T> response = new Response<>(commandObject.getBuilder());
commands.add(KeyValue.of(args, response));
return response;
}
@Override
public void close() {
sync();
// connection prepared and closed (in try-with-resources) in sync()
}
/**
* Synchronize pipeline by reading all responses. This operation close the pipeline. In order to get return values
* from pipelined commands, capture the different Response<?> of the commands you execute.
*/
@Override
public void sync() {
if (commands.isEmpty()) return;
try (Connection connection = failoverProvider.getConnection()) {
commands.forEach((command) -> connection.sendCommand(command.getKey()));
// following connection.getMany(int) flushes anyway, so no flush here.
List<Object> unformatted = connection.getMany(commands.size());
unformatted.forEach((rawReply) -> commands.poll().getValue().set(rawReply));
}
}
public Response<Long> waitReplicas(int replicas, long timeout) {
return appendCommand(commandObjects.waitReplicas(replicas, timeout));
}
public Response<KeyValue<Long, Long>> waitAOF(long numLocal, long numReplicas, long timeout) {
return appendCommand(commandObjects.waitAOF(numLocal, numReplicas, timeout));
}
}