RoundRobinConnectionResolver.java

package redis.clients.jedis.executors;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import redis.clients.jedis.CommandFlagsRegistry;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.Connection;
import redis.clients.jedis.ConnectionPool;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.providers.ClusterConnectionProvider;

/**
 * Connection resolver for keyless commands that acquires connections in round-robin fashion.
 * <p>
 * This resolver distributes keyless commands evenly across cluster nodes using round-robin
 * selection. Read operations can go to any node for load distribution, while write operations go to
 * primary nodes only.
 */
final class RoundRobinConnectionResolver implements ConnectionResolver {

  private final ClusterConnectionProvider provider;
  private final CommandFlagsRegistry flags;
  private final AtomicInteger roundRobinCounter = new AtomicInteger(0);

  RoundRobinConnectionResolver(ClusterConnectionProvider provider, CommandFlagsRegistry flags) {
    this.provider = provider;
    this.flags = flags;
  }

  @Override
  public Connection resolve(CommandObject<?> cmd) {
    ConnectionResolver.ConnectionIntent intent = getIntent(cmd, flags);
    List<Map.Entry<String, ConnectionPool>> nodeList = selectConnectionPool(intent);

    int size = nodeList.size();
    // Get and increment counter, then apply modulo with the current list size.
    // This handles the race condition where another thread may have updated the counter
    // based on a different (larger) node list size, which could result in an index
    // that is out of bounds for the current thread's smaller node list after topology change.
    int roundRobinIndex = Math.abs(roundRobinCounter.getAndIncrement() % size);
    Map.Entry<String, ConnectionPool> selectedEntry = nodeList.get(roundRobinIndex);
    ConnectionPool pool = selectedEntry.getValue();

    return pool.getResource();
  }

  private List<Map.Entry<String, ConnectionPool>> selectConnectionPool(
      ConnectionResolver.ConnectionIntent intent) {
    Map<String, ConnectionPool> connectionMap;

    if (intent == ConnectionResolver.ConnectionIntent.READ) {
      // For keyless READ commands, use all nodes for load distribution
      connectionMap = provider.getConnectionMap();
    } else {
      // Write operations always go to primary nodes
      connectionMap = provider.getPrimaryNodesConnectionMap();
    }

    if (connectionMap.isEmpty()) {
      throw new JedisClusterOperationException("No cluster nodes available.");
    }

    return new ArrayList<>(connectionMap.entrySet());
  }
}