ReliableTransaction.java
package redis.clients.jedis;
import static redis.clients.jedis.Protocol.Command.DISCARD;
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
/**
* A transaction where commands are immediately sent to Redis server and the {@code QUEUED} reply checked.
*/
public class ReliableTransaction extends TransactionBase {
private static final String QUEUED_STR = "QUEUED";
private final Queue<Response<?>> pipelinedResponses = new LinkedList<>();
protected final Connection connection;
private final boolean closeConnection;
private boolean broken = false;
private boolean inWatch = false;
private boolean inMulti = false;
/**
* Creates a new transaction.
*
* A MULTI command will be executed. WATCH/UNWATCH/MULTI commands must not be called with this object.
* @param connection connection
*/
public ReliableTransaction(Connection connection) {
this(connection, true);
}
/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public ReliableTransaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
}
/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, doMulti, closeConnection, createCommandObjects(connection));
}
/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects command objects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
if (doMulti) multi();
}
private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}
@Override
public final void multi() {
connection.sendCommand(MULTI);
String status = connection.getStatusCodeReply();
if (!"OK".equals(status)) {
throw new JedisException("MULTI command failed. Received response: " + status);
}
inMulti = true;
}
@Override
public String watch(final String... keys) {
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
@Override
public String watch(final byte[]... keys) {
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
@Override
public String unwatch() {
connection.sendCommand(UNWATCH);
String status = connection.getStatusCodeReply();
inWatch = false;
return status;
}
@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
connection.sendCommand(commandObject.getArguments());
String status = connection.getStatusCodeReply();
if (!QUEUED_STR.equals(status)) {
throw new JedisException(status);
}
Response<T> response = new Response<>(commandObject.getBuilder());
pipelinedResponses.add(response);
return response;
}
@Override
public final void close() {
try {
clear();
} finally {
if (closeConnection) {
connection.close();
}
}
}
@Deprecated // TODO: private
public final void clear() {
if (broken) {
return;
}
if (inMulti) {
discard();
} else if (inWatch) {
unwatch();
}
}
@Override
public List<Object> exec() {
if (!inMulti) {
throw new IllegalStateException("EXEC without MULTI");
}
try {
// processPipelinedResponses(pipelinedResponses.size());
// do nothing
connection.sendCommand(EXEC);
List<Object> unformatted = connection.getObjectMultiBulkReply();
if (unformatted == null) {
pipelinedResponses.clear();
return null;
}
List<Object> formatted = new ArrayList<>(unformatted.size());
for (Object o : unformatted) {
try {
Response<?> response = pipelinedResponses.poll();
response.set(o);
formatted.add(response.get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} catch (JedisConnectionException jce) {
broken = true;
throw jce;
} finally {
inMulti = false;
inWatch = false;
pipelinedResponses.clear();
}
}
@Override
public String discard() {
if (!inMulti) {
throw new IllegalStateException("DISCARD without MULTI");
}
try {
// processPipelinedResponses(pipelinedResponses.size());
// do nothing
connection.sendCommand(DISCARD);
String status = connection.getStatusCodeReply();
if (!"OK".equals(status)) {
throw new JedisException("DISCARD command failed. Received response: " + status);
}
return status;
} catch (JedisConnectionException jce) {
broken = true;
throw jce;
} finally {
inMulti = false;
inWatch = false;
pipelinedResponses.clear();
}
}
}