PushConsumerChainImpl.java

package redis.clients.jedis;

import redis.clients.jedis.annots.Experimental;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * A chain of PushConsumers that processes events in order.
 * <p>
 * Uses a {@link PushConsumerContext} object for tracking the processed state.
 * </p>
 */
@Experimental
public final class PushConsumerChainImpl implements PushConsumerChain {
  /**
   * PushConsumer that marks all push events to be propagated to the caller.
   */
  static final PushConsumer PROPAGATE_ALL_CONSUMER = new PushConsumer() {
    @Override
    public PushConsumerContext handle(PushConsumerContext context) {
      context.propagate();

      return context;
    }
  };

  static final PushConsumerChain PROPAGATE_ALL_CONSUMER_CHAIN = of(PROPAGATE_ALL_CONSUMER);

  /**
   * PushConsumer that marks pub/sub related events to be propagated to the caller.
   * <p>
   * NOTE: If a new pub/sub push type is added to {@link PushMessageTypes}, the {@code switch} in
   * {@link #isPubSubType(byte[])} must be updated. {@code PushConsumerChainImplTest} discovers
   * pub/sub constants reflectively and will fail until the new type is handled here.
   */
  public static final PushConsumer PUBSUB_CONSUMER = context -> {
    if (isPubSubType(context.getMessage().getType())) {
      context.propagate();
    }
    return context;
  };

  /**
   * Returns {@code true} iff {@code t} is one of the pub/sub push message types declared in
   * {@link PushMessageTypes}.
   * <p>
   * Dispatch on length first ��� tableswitch over the dense range 7..12. {@code (length, firstByte)}
   * uniquely identifies each of the 9 pub/sub types, so each bucket needs at most two intrinsified
   * {@link Arrays#equals(byte[], byte[])} calls. Zero allocation.
   * @param t the message type byte array, may be null
   * @return true if t is a pub/sub type, false if t is null or not a pub/sub type
   */
  static boolean isPubSubType(byte[] t) {
    if (t == null) {
      return false;
    }
    switch (t.length) {
      case 7:
        return Arrays.equals(t, PushMessageTypes.MESSAGE_BYTES);
      case 8:
        return Arrays.equals(t, PushMessageTypes.PMESSAGE_BYTES)
            || Arrays.equals(t, PushMessageTypes.SMESSAGE_BYTES);
      case 9:
        return Arrays.equals(t, PushMessageTypes.SUBSCRIBE_BYTES);
      case 10:
        return Arrays.equals(t, PushMessageTypes.PSUBSCRIBE_BYTES)
            || Arrays.equals(t, PushMessageTypes.SSUBSCRIBE_BYTES);
      case 11:
        return Arrays.equals(t, PushMessageTypes.UNSUBSCRIBE_BYTES);
      case 12:
        return Arrays.equals(t, PushMessageTypes.PUNSUBSCRIBE_BYTES)
            || Arrays.equals(t, PushMessageTypes.SUNSUBSCRIBE_BYTES);
      default:
        return false;
    }
  }

  private final List<PushConsumer> consumers;

  /**
   * Create a chain with the specified consumers.
   * @param consumers The consumers to add to the chain
   */
  PushConsumerChainImpl(PushConsumer... consumers) {
    this.consumers = new ArrayList<>(Arrays.asList(consumers));
  }

  /**
   * Create a chain with the specified consumers.
   * @param consumers The consumers to add to the chain
   * @return A new consumer chain with the specified consumers
   */
  public static PushConsumerChainImpl of(PushConsumer... consumers) {
    return new PushConsumerChainImpl(consumers);
  }

  /**
   * Add a consumer to the end of the chain.
   * @param consumer The consumer to add
   * @return this chain for method chaining
   */
  public PushConsumerChain add(PushConsumer consumer) {
    if (consumer != null) {
      consumers.add(consumer);
    }
    return this;
  }

  /**
   * Return an unmodifiable list of consumers in the chain.
   */
  public List<PushConsumer> getConsumers() {

    return Collections.unmodifiableList(consumers);
  }

  public PushMessage process(PushMessage message) {

    PushConsumerContext context = new PushConsumerContext(message);

    for (PushConsumer consumer : consumers) {
      context = consumer.handle(context);

      // propagate ��� return to caller and skip the rest of consumers
      if (context.shouldPropagate()) {
        return context.getMessage();
      }

      // drop ��� consume and skip rest of consumers
      if (context.shouldDrop()) {
        return null;
      }
    }

    // end of chain ��� default: consume
    return null;
  }

}