ClusterReplyAggregator.java

package redis.clients.jedis.executors.aggregators;

import redis.clients.jedis.CommandFlagsRegistry;
import redis.clients.jedis.exceptions.UnsupportedAggregationException;

import java.util.Objects;

/**
 * Stateful aggregator that keeps aggregating replies of the same type according to the given
 * ResponsePolicy. The proper underlying aggregator is lazily created on the first non-null value.
 * If all added values are null, getResult() returns null.
 */
class ClusterReplyAggregator<T> implements Aggregator<T, T> {

  private final CommandFlagsRegistry.ResponsePolicy policy;
  private Aggregator<T, T> delegate;

  public ClusterReplyAggregator(CommandFlagsRegistry.ResponsePolicy policy) {
    this.policy = Objects.requireNonNull(policy, "policy cannot be null");
  }

  /**
   * Adds a new value to the aggregator. Null values are ignored.
   */
  @Override
  @SuppressWarnings("unchecked")
  public void add(T newReply) {
    if (newReply == null) {
      return; // ignore nulls
    }

    // Lazy initialization of delegate based on policy and first non-null value
    if (delegate == null) {
      switch (policy) {
        case AGG_SUM:
          if (!(newReply instanceof Number)) {
            throw new UnsupportedAggregationException(
                "AGG_SUM policy requires numeric type, but got: " + newReply.getClass().getName());
          }
          delegate = (Aggregator<T, T>) new SumAggregator<>(); // safe cast
          break;
        case AGG_MIN:
          delegate = new MinAggregator<>();
          break;
        case AGG_MAX:
          delegate = new MaxAggregator<>();
          break;
        case AGG_LOGICAL_AND:
          delegate = new LogicalAndAggregator<>();
          break;
        case AGG_LOGICAL_OR:
          delegate = new LogicalOrAggregator<>();
          break;
        case DEFAULT:
          delegate = new DefaultPolicyAggregator<>();
          break;
        case ALL_SUCCEEDED:
          delegate = new FirstNonNullAggregator<>();
          break;
        case ONE_SUCCEEDED:
          delegate = new FirstNonNullAggregator<>();
          break;
        default:
          delegate = new FirstNonNullAggregator<>();
          break;
      }

    }

    // Delegate the addition
    delegate.add(newReply);
  }

  /**
   * Returns the aggregated result so far. Returns null if no meaningful value has been added.
   */
  @Override
  public T getResult() {
    return delegate == null ? null : delegate.getResult();
  }

}