MultiNodePipelineBase.java

package redis.clients.jedis;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.util.IOUtils;

public abstract class MultiNodePipelineBase extends AbstractPipeline {

  private final Logger log = LoggerFactory.getLogger(getClass());

  /**
   * The number of processes for {@code sync()}. If you have enough cores for client (and you have
   * more than 3 cluster nodes), you may increase this number of workers.
   * Suggestion: ≤ cluster nodes.
   */
  public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;

  private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
  private final Map<HostAndPort, Connection> connections;
  private volatile boolean syncing = false;
  protected final CommandFlagsRegistry commandFlagsRegistry;

  /**
   * External executor service to use for {@code sync()}. If not set, a new executor service will be
   * created for each {@code sync()} call.
   */
  private final ExecutorService sharedExecutorService;

  public MultiNodePipelineBase(CommandObjects commandObjects) {
    this(commandObjects, StaticCommandFlagsRegistry.registry());
  }

  protected MultiNodePipelineBase(CommandObjects commandObjects, CommandFlagsRegistry commandFlagsRegistry) {
    this(commandObjects, commandFlagsRegistry, null);
  }

  MultiNodePipelineBase(CommandObjects commandObjects, CommandFlagsRegistry commandFlagsRegistry, ExecutorService executorService) {
    super(commandObjects);
    this.commandFlagsRegistry = commandFlagsRegistry;
    pipelinedResponses = new LinkedHashMap<>();
    connections = new LinkedHashMap<>();
    this.sharedExecutorService = executorService;
  }


  protected abstract HostAndPort getNodeKey(CommandArguments args);

  protected abstract Connection getConnection(HostAndPort nodeKey);

  @Override
  protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
    // Validate that the command is supported in pipeline mode
    validatePipelineCommand(commandObject.getArguments());

    HostAndPort nodeKey = getNodeKey(commandObject.getArguments());

    Queue<Response<?>> queue;
    Connection connection;
    if (pipelinedResponses.containsKey(nodeKey)) {
      queue = pipelinedResponses.get(nodeKey);
      connection = connections.get(nodeKey);
    } else {
      Connection newOne = getConnection(nodeKey);
      connections.putIfAbsent(nodeKey, newOne);
      connection = connections.get(nodeKey);
      if (connection != newOne) {
        log.debug("Duplicate connection to {}, closing it.", nodeKey);
        IOUtils.closeQuietly(newOne);
      }

      pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>());
      queue = pipelinedResponses.get(nodeKey);
    }

    connection.sendCommand(commandObject.getArguments());
    Response<T> response = new Response<>(commandObject.getBuilder());
    queue.add(response);
    return response;
  }

  @Override
  public void close() {
    try {
      sync();
    } finally {
      connections.values().forEach(IOUtils::closeQuietly);
    }
  }

  @Override
  public final void sync() {
    if (syncing) {
      return;
    }
    syncing = true;

    boolean multiNode = pipelinedResponses.size() > 1;
    Executor executor;
    ExecutorService executorService = null;
    if (multiNode) {
      executorService = getPipelineExecutor();
      executor = executorService;
    } else {
      executor = Runnable::run;
    }
    CountDownLatch countDownLatch = multiNode
        ? new CountDownLatch(pipelinedResponses.size())
        : null;

    Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator = pipelinedResponses.entrySet()
        .iterator();
    while (pipelinedResponsesIterator.hasNext()) {
      Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
      HostAndPort nodeKey = entry.getKey();
      Queue<Response<?>> queue = entry.getValue();
      Connection connection = connections.get(nodeKey);
      executor.execute(() -> {
        try {
          List<Object> unformatted = connection.getMany(queue.size());
          for (Object o : unformatted) {
            queue.poll().set(o);
          }
        } catch (JedisConnectionException jce) {
          log.error("Error with connection to " + nodeKey, jce);
          // cleanup the connection
          // TODO these operations not thread-safe and when executed here, the iter may moved
          pipelinedResponsesIterator.remove();
          connections.remove(nodeKey);
          IOUtils.closeQuietly(connection);
        } finally {
          if (multiNode) {
            countDownLatch.countDown();
          }
        }
      });
    }

    if (multiNode) {
      try {
        countDownLatch.await();
      } catch (InterruptedException e) {
        log.error("Thread is interrupted during sync.", e);
      }

      releasePipelineExecutor(executorService);
    }

    syncing = false;
  }

  /**
   * Acquires the executor service to run multi-node pipeline commands.
   * <p>
   * If a shared executor is provided by the user, it is returned.
   * Otherwise, a new dedicated executor is created for this pipeline.
   * </p>
   */
  private ExecutorService getPipelineExecutor() {
    return isUsingSharedExecutor()
            ? this.sharedExecutorService
            : createDedicatedPipelineExecutor();
  }

  /**
   * Releases the executor service used by the pipeline.
   * <p>
   * Dedicated executors are shut down after use.
   * Shared executors are managed externally and not shut down.
   * </p>
   */
  private void releasePipelineExecutor(ExecutorService executorService) {
    if (!isUsingSharedExecutor()) {
      executorService.shutdownNow();
    }
  }

  /**
   * Returns true if this pipeline is using a shared executor service
   * provided externally.
   */
  private boolean isUsingSharedExecutor() {
    return this.sharedExecutorService != null;
  }

  /**
   * Creates a new dedicated executor for multi-node pipeline execution.
   */
  private ExecutorService createDedicatedPipelineExecutor() {
    return Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);
  }

  /**
   * Validates that a command can be executed in a multi-node pipeline.
   * <p>
   * Commands with multi-node request policies (ALL_SHARDS, MULTI_SHARD, ALL_NODES, SPECIAL)
   * are rejected UNLESS they have keys that route to a single slot, in which case they can
   * be executed on that single node.
   * </p>
   *
   * @param args the command arguments
   * @throws UnsupportedOperationException if the command requires multi-node execution
   */
  private void validatePipelineCommand(CommandArguments args) {
    CommandFlagsRegistry.RequestPolicy policy =
        commandFlagsRegistry.getRequestPolicy(args);

    // For multi-node policies, check if the command can be routed to a single slot
    switch (policy) {
      case ALL_SHARDS:
      case MULTI_SHARD:
      case ALL_NODES:
      case SPECIAL:
        // If the command has keys that route to a single slot, allow it
        Set<Integer> slots = args.getKeyHashSlots();
        if (slots.size() == 1) {
          // Command can be routed to a single slot - allow it
          return;
        }

        // Command cannot be routed to a single slot - reject it
        String policyName = policy.name();
        throw new UnsupportedOperationException(
            "Command '" + args.getCommand() + "' with " + policyName + " request policy "
                + "cannot be executed in pipeline mode because it cannot be routed to a single slot. "
                + (slots.isEmpty()
                    ? "This command has no keys to determine routing. "
                    : "This command's keys map to multiple slots (" + slots.size() + " slots). ")
                + "Use non-pipeline cluster client for this command.");

      case DEFAULT:
      default:
        // DEFAULT policy and unknown policies - allow standard command execution
        // Routes to single node based on key hash
        break;
    }
  }

  @Deprecated
  public Response<Long> waitReplicas(int replicas, long timeout) {
    return appendCommand(commandObjects.waitReplicas(replicas, timeout));
  }
}