MultiClusterTransaction.java
package redis.clients.jedis.mcf;
import static redis.clients.jedis.Protocol.Command.DISCARD;
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.*;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.graph.ResultSet;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.KeyValue;
/**
* This is high memory dependent solution as all the appending commands will be hold in memory.
*/
@Experimental
public class MultiClusterTransaction extends TransactionBase {
private static final Builder<?> NO_OP_BUILDER = BuilderFactory.RAW_OBJECT;
private static final String GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE = "Graph commands are not supported.";
private final CircuitBreakerFailoverConnectionProvider failoverProvider;
private final AtomicInteger extraCommandCount = new AtomicInteger();
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();
private boolean inWatch = false;
private boolean inMulti = false;
/**
* A MULTI command will be added to be sent to server. WATCH/UNWATCH/MULTI commands must not be
* called with this object.
* @param provider
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
this(provider, true);
}
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) {
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);
try (Connection connection = failoverProvider.getConnection()) {
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
}
if (doMulti) multi();
}
/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param commandObjects command objects
*/
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);
if (doMulti) multi();
}
@Override
public final void multi() {
appendCommand(new CommandObject<>(new CommandArguments(MULTI), NO_OP_BUILDER));
extraCommandCount.incrementAndGet();
inMulti = true;
}
/**
* @param keys
* @return {@code null}
*/
@Override
public final String watch(String... keys) {
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
}
/**
* @param keys
* @return {@code null}
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
}
/**
* @return {@code null}
*/
@Override
public final String unwatch() {
appendCommand(new CommandObject<>(new CommandArguments(UNWATCH), NO_OP_BUILDER));
extraCommandCount.incrementAndGet();
inWatch = false;
return null;
}
@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Response<T> response = new Response<>(commandObject.getBuilder());
commands.add(KeyValue.of(args, response));
return response;
}
@Override
public void close() {
clear();
}
private void clear() {
if (inMulti) {
discard();
} else if (inWatch) {
unwatch();
}
}
@Override
public final List<Object> exec() {
if (!inMulti) {
throw new IllegalStateException("EXEC without MULTI");
}
try (Connection connection = failoverProvider.getConnection()) {
commands.forEach((command) -> connection.sendCommand(command.getKey()));
// following connection.getMany(int) flushes anyway, so no flush here.
// ignore QUEUED (or ERROR)
connection.getMany(commands.size());
// remove extra response builders
for (int idx = 0; idx < extraCommandCount.get(); ++idx) {
commands.poll();
}
connection.sendCommand(EXEC);
List<Object> unformatted = connection.getObjectMultiBulkReply();
if (unformatted == null) {
commands.clear();
return null;
}
List<Object> formatted = new ArrayList<>(unformatted.size() - extraCommandCount.get());
for (Object rawReply: unformatted) {
try {
Response<?> response = commands.poll().getValue();
response.set(rawReply);
formatted.add(response.get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} finally {
inMulti = false;
inWatch = false;
}
}
@Override
public final String discard() {
if (!inMulti) {
throw new IllegalStateException("DISCARD without MULTI");
}
try (Connection connection = failoverProvider.getConnection()) {
commands.forEach((command) -> connection.sendCommand(command.getKey()));
// following connection.getMany(int) flushes anyway, so no flush here.
// ignore QUEUED (or ERROR)
connection.getMany(commands.size());
connection.sendCommand(DISCARD);
return connection.getStatusCodeReply();
} finally {
inMulti = false;
inWatch = false;
}
}
// RedisGraph commands
@Override
public Response<ResultSet> graphQuery(String name, String query) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, long timeout) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<ResultSet> graphReadonlyQuery(String name, String query, Map<String, Object> params, long timeout) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<String> graphDelete(String name) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
@Override
public Response<List<String>> graphProfile(String graphName, String query) {
throw new UnsupportedOperationException(GRAPH_COMMANDS_NOT_SUPPORTED_MESSAGE);
}
// RedisGraph commands
}