StreamsCommandsTest.java

package redis.clients.jedis.commands.jedis;

import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;

import io.redis.test.annotations.SinceRedisVersion;
import io.redis.test.utils.RedisVersion;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;

import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.Response;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.*;
import redis.clients.jedis.resps.*;
import redis.clients.jedis.util.RedisVersionUtil;
import redis.clients.jedis.util.SafeEncoder;

@ParameterizedClass
@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions")
public class StreamsCommandsTest extends JedisCommandsTestBase {

  public StreamsCommandsTest(RedisProtocol protocol) {
    super(protocol);
  }

  @Test
  public void xadd() {

    try {
      Map<String, String> map1 = new HashMap<>();
      jedis.xadd("stream1", (StreamEntryID) null, map1);
      fail();
    } catch (JedisDataException expected) {
      assertTrue(expected.getMessage().contains("wrong number of arguments"));
    }

    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xadd-stream1", (StreamEntryID) null, map1);
    assertNotNull(id1);

    Map<String, String> map2 = new HashMap<>();
    map2.put("f1", "v1");
    map2.put("f2", "v2");
    StreamEntryID id2 = jedis.xadd("xadd-stream1", (StreamEntryID) null, map2);
    assertTrue(id2.compareTo(id1) > 0);

    Map<String, String> map3 = new HashMap<>();
    map3.put("f2", "v2");
    map3.put("f3", "v3");
    StreamEntryID id3 = jedis.xadd("xadd-stream2", (StreamEntryID) null, map3);

    Map<String, String> map4 = new HashMap<>();
    map4.put("f2", "v2");
    map4.put("f3", "v3");
    StreamEntryID idIn = new StreamEntryID(id3.getTime() + 1, 1L);
    StreamEntryID id4 = jedis.xadd("xadd-stream2", idIn, map4);
    assertEquals(idIn, id4);
    assertTrue(id4.compareTo(id3) > 0);

    Map<String, String> map5 = new HashMap<>();
    map5.put("f4", "v4");
    map5.put("f5", "v5");
    StreamEntryID id5 = jedis.xadd("xadd-stream2", (StreamEntryID) null, map5);
    assertTrue(id5.compareTo(id4) > 0);

    Map<String, String> map6 = new HashMap<>();
    map6.put("f4", "v4");
    map6.put("f5", "v5");
    StreamEntryID id6 = jedis.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(3));
    assertTrue(id6.compareTo(id5) > 0);
    assertEquals(3L, jedis.xlen("xadd-stream2"));
  }

  @Test
  public void xaddWithParams() {

    try {
      jedis.xadd("stream1", new HashMap<>(), XAddParams.xAddParams());
      fail();
    } catch (JedisDataException expected) {
      assertTrue(expected.getMessage().contains("wrong number of arguments"));
    }

    try {
      jedis.xadd("stream1", XAddParams.xAddParams(), new HashMap<>());
      fail();
    } catch (JedisDataException expected) {
      assertTrue(expected.getMessage().contains("wrong number of arguments"));
    }

    StreamEntryID id1 = jedis.xadd("xadd-stream1", (StreamEntryID) null, singletonMap("f1", "v1"));
    assertNotNull(id1);

    Map<String, String> map2 = new HashMap<>();
    map2.put("f1", "v1");
    map2.put("f2", "v2");
    StreamEntryID id2 = jedis.xadd("xadd-stream1", map2, XAddParams.xAddParams());
    assertTrue(id2.compareTo(id1) > 0);

    Map<String, String> map3 = new HashMap<>();
    map3.put("f2", "v2");
    map3.put("f3", "v3");
    StreamEntryID id3 = jedis.xadd("xadd-stream2", XAddParams.xAddParams(), map3);

    Map<String, String> map4 = new HashMap<>();
    map4.put("f2", "v2");
    map4.put("f3", "v3");
    StreamEntryID idIn = new StreamEntryID(id3.getTime() + 1, 1L);
    StreamEntryID id4 = jedis.xadd("xadd-stream2", map4, XAddParams.xAddParams().id(idIn));
    assertEquals(idIn, id4);
    assertTrue(id4.compareTo(id3) > 0);

    Map<String, String> map5 = new HashMap<>();
    map5.put("f4", "v4");
    map5.put("f5", "v5");
    StreamEntryID id5 = jedis.xadd("xadd-stream2", XAddParams.xAddParams(), map5);
    assertTrue(id5.compareTo(id4) > 0);

    Map<String, String> map6 = new HashMap<>();
    map6.put("f4", "v4");
    map6.put("f5", "v5");
    StreamEntryID id6 = jedis.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(3).exactTrimming());
    assertTrue(id6.compareTo(id5) > 0);
    assertEquals(3L, jedis.xlen("xadd-stream2"));

    // nomkstream
    StreamEntryID id7 = jedis.xadd("xadd-stream3", XAddParams.xAddParams().noMkStream().maxLen(3).exactTrimming(), map6);
    assertNull(id7);
    assertFalse(jedis.exists("xadd-stream3"));

    // minid
    jedis.xadd("xadd-stream3", map6, XAddParams.xAddParams().minId("2").id(new StreamEntryID(2)));
    assertEquals(1L, jedis.xlen("xadd-stream3"));
    jedis.xadd("xadd-stream3", XAddParams.xAddParams().minId("4").id(new StreamEntryID(3)), map6);
    assertEquals(0L, jedis.xlen("xadd-stream3"));
  }

  @Test
  @SinceRedisVersion(value = "7.0.0")
  public void xaddParamsId() {
    StreamEntryID id;
    String key = "kk";
    Map<String, String> map = singletonMap("ff", "vv");

    id = jedis.xadd(key, XAddParams.xAddParams().id(new StreamEntryID(0, 1)), map);
    assertNotNull(id);
    assertEquals("0-1", id.toString());
    assertEquals(0, id.getTime());
    assertEquals(1, id.getSequence());

    id = jedis.xadd(key, XAddParams.xAddParams().id(2, 3), map);
    assertNotNull(id);
    assertEquals(2, id.getTime());
    assertEquals(3, id.getSequence());

    id = jedis.xadd(key, XAddParams.xAddParams().id(4), map);
    assertNotNull(id);
    assertEquals(4, id.getTime());
    assertEquals(0, id.getSequence());

    id = jedis.xadd(key, XAddParams.xAddParams().id("5-6"), map);
    assertNotNull(id);
    assertEquals(5, id.getTime());
    assertEquals(6, id.getSequence());

    id = jedis.xadd(key, XAddParams.xAddParams().id("7-8".getBytes()), map);
    assertNotNull(id);
    assertEquals(7, id.getTime());
    assertEquals(8, id.getSequence());

    id = jedis.xadd(key, XAddParams.xAddParams(), map);
    assertNotNull(id);
  }

  @Test
  public void xdel() {
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");

    StreamEntryID id1 = jedis.xadd("xdel-stream", (StreamEntryID) null, map1);
    assertNotNull(id1);

    StreamEntryID id2 = jedis.xadd("xdel-stream", (StreamEntryID) null, map1);
    assertNotNull(id2);
    assertEquals(2L, jedis.xlen("xdel-stream"));

    assertEquals(1L, jedis.xdel("xdel-stream", id1));
    assertEquals(1L, jedis.xlen("xdel-stream"));
  }

  @Test
  public void xlen() {
    assertEquals(0L, jedis.xlen("xlen-stream"));

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xlen-stream", (StreamEntryID) null, map);
    assertEquals(1L, jedis.xlen("xlen-stream"));

    jedis.xadd("xlen-stream", (StreamEntryID) null, map);
    assertEquals(2L, jedis.xlen("xlen-stream"));
  }

  @Test
  public void xrange() {
    List<StreamEntry> range = jedis.xrange("xrange-stream", (StreamEntryID) null,
      (StreamEntryID) null, Integer.MAX_VALUE);
    assertEquals(0, range.size());

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, map);
    StreamEntryID id2 = jedis.xadd("xrange-stream", (StreamEntryID) null, map);
    List<StreamEntry> range2 = jedis.xrange("xrange-stream", (StreamEntryID) null,
      (StreamEntryID) null, 3);
    assertEquals(2, range2.size());
    assertEquals(range2.get(0).toString(), id1 + " " + map);

    List<StreamEntry> range3 = jedis.xrange("xrange-stream", id1, null, 2);
    assertEquals(2, range3.size());

    List<StreamEntry> range4 = jedis.xrange("xrange-stream", id1, id2, 2);
    assertEquals(2, range4.size());

    List<StreamEntry> range5 = jedis.xrange("xrange-stream", id1, id2, 1);
    assertEquals(1, range5.size());

    List<StreamEntry> range6 = jedis.xrange("xrange-stream", id2, null, 4);
    assertEquals(1, range6.size());

    StreamEntryID id3 = jedis.xadd("xrange-stream", (StreamEntryID) null, map);
    List<StreamEntry> range7 = jedis.xrange("xrange-stream", id3, id3, 4);
    assertEquals(1, range7.size());

    List<StreamEntry> range8 = jedis.xrange("xrange-stream", (StreamEntryID) null, (StreamEntryID) null);
    assertEquals(3, range8.size());
    range8 = jedis.xrange("xrange-stream", StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID);
    assertEquals(3, range8.size());
  }

  @Test
  public void xrangeExclusive() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    String id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, map).toString();
    jedis.xadd("xrange-stream", (StreamEntryID) null, map);

    List<StreamEntry> range2 = jedis.xrange("xrange-stream", id1, "+", 2);
    assertEquals(2, range2.size());

    List<StreamEntry> range3 = jedis.xrange("xrange-stream", "(" + id1, "+", 2);
    assertEquals(1, range3.size());
  }

  @Test
  public void xreadWithParams() {

    final String key1 = "xread-stream1";
    final String key2 = "xread-stream2";

    Map<String, StreamEntryID> streamQeury1 = singletonMap(key1, new StreamEntryID());

    // Before creating Stream
    assertNull(jedis.xread(XReadParams.xReadParams().block(1), streamQeury1));
    assertNull(jedis.xread(XReadParams.xReadParams(), streamQeury1));

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map);
    StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map);

    // Read only a single Stream
    List<Entry<String, List<StreamEntry>>> streams1 = jedis.xread(XReadParams.xReadParams().count(1).block(1), streamQeury1);
    assertEquals(1, streams1.size());
    assertEquals(key1, streams1.get(0).getKey());
    assertEquals(1, streams1.get(0).getValue().size());
    assertEquals(id1, streams1.get(0).getValue().get(0).getID());
    assertEquals(map, streams1.get(0).getValue().get(0).getFields());

    assertNull(jedis.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1)));
    assertNull(jedis.xread(XReadParams.xReadParams(), singletonMap(key1, id1)));

    // Read from two Streams
    Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
    streamQuery2.put(key1, new StreamEntryID());
    streamQuery2.put(key2, new StreamEntryID());
    List<Entry<String, List<StreamEntry>>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2);
    assertEquals(2, streams2.size());
  }

  @Test
  public void xreadAsMap() {

    final String stream1 = "xread-stream1";
    final String stream2 = "xread-stream2";

    Map<String, StreamEntryID> streamQeury1 = singletonMap(stream1, new StreamEntryID());

    // Before creating Stream
    assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1));
    assertNull(jedis.xreadAsMap(XReadParams.xReadParams(), streamQeury1));

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = new StreamEntryID(1);
    StreamEntryID id2 = new StreamEntryID(2);
    StreamEntryID id3 = new StreamEntryID(3);

    assertEquals(id1, jedis.xadd(stream1, id1, map));
    assertEquals(id2, jedis.xadd(stream2, id2, map));
    assertEquals(id3, jedis.xadd(stream1, id3, map));

    // Read only a single Stream
    Map<String, List<StreamEntry>> streams1 = jedis.xreadAsMap(XReadParams.xReadParams().count(2), streamQeury1);
    assertEquals(singleton(stream1), streams1.keySet());
    List<StreamEntry> list1 = streams1.get(stream1);
    assertEquals(2, list1.size());
    assertEquals(id1, list1.get(0).getID());
    assertEquals(map, list1.get(0).getFields());
    assertEquals(id3, list1.get(1).getID());
    assertEquals(map, list1.get(1).getFields());

    // Read from two Streams
    Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
    streamQuery2.put(stream1, new StreamEntryID());
    streamQuery2.put(stream2, new StreamEntryID());
    Map<String, List<StreamEntry>> streams2 = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQuery2);
    assertEquals(2, streams2.size());
    assertEquals(id1, streams2.get(stream1).get(0).getID());
    assertEquals(id2, streams2.get(stream2).get(0).getID());
  }

  @Test
  @SinceRedisVersion(value = "7.4.0", message = "From Redis 7.4, you can use the + sign as a special ID to request last entry")
  public void xreadAsMapLastEntry() {

    final String stream1 = "xread-stream1";
    final String stream2 = "xread-stream2";

    Map<String, StreamEntryID> streamQeury1 = singletonMap(stream1, new StreamEntryID());

    // Before creating Stream
    assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1));
    assertNull(jedis.xreadAsMap(XReadParams.xReadParams(), streamQeury1));

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = new StreamEntryID(1);
    StreamEntryID id2 = new StreamEntryID(2);
    StreamEntryID id3 = new StreamEntryID(3);

    assertEquals(id1, jedis.xadd(stream1, id1, map));
    assertEquals(id2, jedis.xadd(stream2, id2, map));
    assertEquals(id3, jedis.xadd(stream1, id3, map));


    // Read from last entry
    Map<String, StreamEntryID> streamQueryLE = singletonMap(stream1, StreamEntryID.XREAD_LAST_ENTRY);
    Map<String, List<StreamEntry>> streamsLE = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQueryLE);
    assertEquals(singleton(stream1), streamsLE.keySet());
    assertEquals(1, streamsLE.get(stream1).size());
    assertEquals(id3, streamsLE.get(stream1).get(0).getID());
    assertEquals(map, streamsLE.get(stream1).get(0).getFields());
  }

  @Test
  public void xreadBlockZero() throws InterruptedException {
    final AtomicReference<StreamEntryID> readId = new AtomicReference<>();
    Thread t = new Thread(new Runnable() {
      @Override
      public void run() {
        try (Jedis blockJedis = createJedis()) {
          long startTime = System.currentTimeMillis();
          List<Entry<String, List<StreamEntry>>> read = blockJedis.xread(XReadParams.xReadParams().block(0),
              singletonMap("block0-stream", new StreamEntryID()));
          long endTime = System.currentTimeMillis();
          assertTrue(endTime - startTime > 500);
          assertNotNull(read);
          readId.set(read.get(0).getValue().get(0).getID());
        }
      }
    }, "xread-block-0-thread");
    t.start();
    Thread.sleep(1000);
    StreamEntryID addedId = jedis.xadd("block0-stream", (StreamEntryID) null, singletonMap("foo", "bar"));
    t.join();
    assertEquals(addedId, readId.get());
  }

  @Test
  public void xtrim() {
    Map<String, String> map1 = new HashMap<String, String>();
    map1.put("f1", "v1");

    for (int i = 1; i <= 5; i++) {
      jedis.xadd("xtrim-stream", (StreamEntryID) null, map1);
    }
    assertEquals(5L, jedis.xlen("xtrim-stream"));

    jedis.xtrim("xtrim-stream", 3, false);
    assertEquals(3L, jedis.xlen("xtrim-stream"));
  }

  @Test
  public void xtrimWithParams() {
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");
    for (int i = 1; i <= 5; i++) {
      jedis.xadd("xtrim-stream", new StreamEntryID("0-" + i), map1);
    }
    assertEquals(5L, jedis.xlen("xtrim-stream"));

    jedis.xtrim("xtrim-stream", XTrimParams.xTrimParams().maxLen(3).exactTrimming());
    assertEquals(3L, jedis.xlen("xtrim-stream"));

    // minId
    jedis.xtrim("xtrim-stream", XTrimParams.xTrimParams().minId("0-4").exactTrimming());
    assertEquals(2L, jedis.xlen("xtrim-stream"));
  }

  @Test
  public void xrevrange() {
    List<StreamEntry> range = jedis.xrevrange("xrevrange-stream", (StreamEntryID) null,
      (StreamEntryID) null, Integer.MAX_VALUE);
    assertEquals(0, range.size());

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xrevrange-stream", (StreamEntryID) null, map);
    StreamEntryID id2 = jedis.xadd("xrevrange-stream", (StreamEntryID) null, map);
    List<StreamEntry> range2 = jedis.xrange("xrevrange-stream", (StreamEntryID) null,
      (StreamEntryID) null, 3);
    assertEquals(2, range2.size());

    List<StreamEntry> range3 = jedis.xrevrange("xrevrange-stream", null, id1, 2);
    assertEquals(2, range3.size());

    List<StreamEntry> range4 = jedis.xrevrange("xrevrange-stream", id2, id1, 2);
    assertEquals(2, range4.size());

    List<StreamEntry> range5 = jedis.xrevrange("xrevrange-stream", id2, id1, 1);
    assertEquals(1, range5.size());

    List<StreamEntry> range6 = jedis.xrevrange("xrevrange-stream", null, id2, 4);
    assertEquals(1, range6.size());

    StreamEntryID id3 = jedis.xadd("xrevrange-stream", (StreamEntryID) null, map);
    List<StreamEntry> range7 = jedis.xrevrange("xrevrange-stream", id3, id3, 4);
    assertEquals(1, range7.size());

    List<StreamEntry> range8 = jedis.xrevrange("xrevrange-stream", (StreamEntryID) null, (StreamEntryID) null);
    assertEquals(3, range8.size());
    range8 = jedis.xrevrange("xrevrange-stream", StreamEntryID.MAXIMUM_ID, StreamEntryID.MINIMUM_ID);
    assertEquals(3, range8.size());
  }

  @Test
  public void xrevrangeExclusive() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    String id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, map).toString();
    jedis.xadd("xrange-stream", (StreamEntryID) null, map);

    List<StreamEntry> range2 = jedis.xrevrange("xrange-stream", "+", id1, 2);
    assertEquals(2, range2.size());

    List<StreamEntry> range3 = jedis.xrevrange("xrange-stream", "+", "(" + id1, 2);
    assertEquals(1, range3.size());
  }

  @Test
  public void xgroup() {

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xgroup-stream", (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name", null, false));
    assertEquals("OK", jedis.xgroupSetID("xgroup-stream", "consumer-group-name", id1));
    assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.XGROUP_LAST_ENTRY, false));

    jedis.xgroupDestroy("xgroup-stream", "consumer-group-name");
    assertEquals(0L, jedis.xgroupDelConsumer("xgroup-stream", "consumer-group-name1","myconsumer1"));
    assertTrue(jedis.xgroupCreateConsumer("xgroup-stream", "consumer-group-name1","myconsumer2"));
    assertEquals(0L, jedis.xgroupDelConsumer("xgroup-stream", "consumer-group-name1","myconsumer2"));
  }

  @Test
  public void xreadGroupWithParams() {

    // Simple xreadGroup with NOACK
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
    jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false);
    Map<String, StreamEntryID> streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQeury1);
    assertEquals(1, range.size());
    assertEquals(1, range.get(0).getValue().size());

    jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
    jedis.xadd("xreadGroup-stream2", (StreamEntryID) null, map);
    jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false);

    // Read only a single Stream
    List<Entry<String, List<StreamEntry>>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury1);
    assertEquals(1, streams1.size());
    assertEquals(1, streams1.get(0).getValue().size());

    // Read from two Streams
    Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
    streamQuery2.put("xreadGroup-stream1", new StreamEntryID());
    streamQuery2.put("xreadGroup-stream2", new StreamEntryID());
    List<Entry<String, List<StreamEntry>>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery2);
    assertEquals(2, streams2.size());

    // Read only fresh messages
    StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map);
    Map<String, StreamEntryID> streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Entry<String, List<StreamEntry>>> streamsFresh = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh);
    assertEquals(1, streamsFresh.size());
    assertEquals(id4, streamsFresh.get(0).getValue().get(0).getID());
  }

  @Test
  public void xreadGroupAsMap() {

    final String stream1 = "xreadGroup-stream1";
    Map<String, String> map = singletonMap("f1", "v1");

    StreamEntryID id1 = jedis.xadd(stream1, StreamEntryID.NEW_ENTRY, map);
    jedis.xgroupCreate(stream1, "xreadGroup-group", null, false);
    Map<String, StreamEntryID> streamQeury1 = singletonMap(stream1, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    Map<String, List<StreamEntry>> range = jedis.xreadGroupAsMap("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().noAck(), streamQeury1);
    assertEquals(singleton(stream1), range.keySet());
    List<StreamEntry> list = range.get(stream1);
    assertEquals(1, list.size());
    assertEquals(id1, list.get(0).getID());
    assertEquals(map, list.get(0).getFields());
  }

  @Test
  public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() {
    // Add two message to stream
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");

    Map<String, String> map2 = new HashMap<>();
    map2.put("f2", "v2");

    XAddParams xAddParams = XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY).maxLen(2);
    StreamEntryID firstMessageEntryId = jedis.xadd("xreadGroup-discard-stream1", xAddParams, map1);
    jedis.xadd("xreadGroup-discard-stream1", xAddParams, map2);

    jedis.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false);
    Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
            XReadGroupParams.xReadGroupParams().count(1), streamQuery1);
    assertEquals(1, range.size());
    assertEquals(1, range.get(0).getValue().size());

    assertEquals(firstMessageEntryId, range.get(0).getValue().get(0).getID());
    assertEquals(map1, range.get(0).getValue().get(0).getFields());

    // Add third message, the fields of pending message1 will be discarded by redis-server
    Map<String, String> map3 = new HashMap<>();
    map3.put("f3", "v3");
    jedis.xadd("xreadGroup-discard-stream1", xAddParams, map3);

    Map<String, StreamEntryID> streamQueryPending = singletonMap("xreadGroup-discard-stream1", new StreamEntryID());
    List<Entry<String, List<StreamEntry>>> pendingMessages = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
            XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQueryPending);

    assertEquals(1, pendingMessages.size());
    assertEquals(1, pendingMessages.get(0).getValue().size());

    assertEquals(firstMessageEntryId, pendingMessages.get(0).getValue().get(0).getID());
    assertNull(pendingMessages.get(0).getValue().get(0).getFields());
  }

  @Test
  public void xack() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xack-stream", (StreamEntryID) null, map);

    jedis.xgroupCreate("xack-stream", "xack-group", null, false);

    Map<String, StreamEntryID> streamQeury1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    // Empty Stream
    List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xack-group", "xack-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1), streamQeury1);
    assertEquals(1, range.size());

    assertEquals(1L,
      jedis.xack("xack-stream", "xack-group", range.get(0).getValue().get(0).getID()));
  }

  @Test
  public void xpendingWithParams() {
    final String stream = "xpendeing-stream";

    assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true));

    // Get the summary from empty stream
    StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group");
    assertEquals(0, emptySummary.getTotal());
    assertNull(emptySummary.getMinId());
    assertNull(emptySummary.getMaxId());
    assertNull(emptySummary.getConsumerMessageCount());

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map);

    Map<String, StreamEntryID> streamQeury1 = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    // Read the event from Stream put it on pending
    List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xpendeing-group",
            "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), streamQeury1);
    assertEquals(1, range.size());
    assertEquals(1, range.get(0).getValue().size());
    assertEquals(map, range.get(0).getValue().get(0).getFields());

    // Get the summary about the pending messages
    StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group");
    assertEquals(1, pendingSummary.getTotal());
    assertEquals(id1, pendingSummary.getMinId());
    assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue());

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
            new XPendingParams().count(3).consumer("xpendeing-consumer"));
    assertEquals(1, pendingRange.size());
    assertEquals(id1, pendingRange.get(0).getID());
    assertEquals(1, pendingRange.get(0).getDeliveredTimes());
    assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName());
    assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer"));

    // Without consumer
    pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3));
    assertEquals(1, pendingRange.size());
    assertEquals(id1, pendingRange.get(0).getID());
    assertEquals(1, pendingRange.get(0).getDeliveredTimes());
    assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName());

    // with idle
    pendingRange = jedis.xpending(stream, "xpendeing-group",
      new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3));
    assertEquals(0, pendingRange.size());
  }

  @Test
  public void xpendingRange() {
    final String stream = "xpendeing-stream";
    Map<String, String> map = new HashMap<>();
    map.put("foo", "bar");
    StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map);
    StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map);
    jedis.xgroupCreate(stream, "xpendeing-group", null, false);

    // read 1 message from the group with each consumer
    Map<String, StreamEntryID> streamQeury = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
    jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

    List<StreamPendingEntry> response = jedis.xpending(stream, "xpendeing-group",
        XPendingParams.xPendingParams("(0", "+", 5));
    assertEquals(2, response.size());
    assertEquals(m1, response.get(0).getID());
    assertEquals("consumer1", response.get(0).getConsumerName());
    assertEquals(m2, response.get(1).getID());
    assertEquals("consumer2", response.get(1).getConsumerName());

    response = jedis.xpending(stream, "xpendeing-group",
        XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5));
    assertEquals(2, response.size());
    assertEquals(m1, response.get(0).getID());
    assertEquals("consumer1", response.get(0).getConsumerName());
    assertEquals(m2, response.get(1).getID());
    assertEquals("consumer2", response.get(1).getConsumerName());
  }

  @Test
  public void xclaimWithParams() {
    final String stream = "xpendeing-stream";
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd(stream, (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
            singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
        XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));

    // Sleep for 100ms so we can claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    List<StreamEntry> streamEntrys = jedis.xclaim(stream, "xpendeing-group",
            "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
            pendingRange.get(0).getID());
    assertEquals(1, streamEntrys.size());
    assertEquals(pendingRange.get(0).getID(), streamEntrys.get(0).getID());
    assertEquals("v1", streamEntrys.get(0).getFields().get("f1"));
  }

  @Test
  public void xclaimJustId() {
    final String stream = "xpendeing-stream";
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd(stream, (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
        XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer"));
    // Sleep for 100ms so we can claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group",
      "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
      pendingRange.get(0).getID());
    assertEquals(1, streamEntryIDS.size());
    assertEquals(pendingRange.get(0).getID(), streamEntryIDS.get(0));
  }

  @Test
  public void xautoclaim() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xpending-stream", (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));
    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    // Auto claim pending events to different consumer
    Map.Entry<StreamEntryID, List<StreamEntry>> streamEntrys = jedis.xautoclaim("xpending-stream", "xpending-group",
            "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1));
    assertEquals(1, streamEntrys.getValue().size());
    assertEquals(pendingRange.get(0).getID(), streamEntrys.getValue().get(0).getID());
    assertEquals("v1", streamEntrys.getValue().get(0).getFields().get("f1"));
  }

  @Test
  public void xautoclaimBinary() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xpending-stream", XAddParams.xAddParams(), map);

    assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));
    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    // Auto claim pending events to different consumer
    List<Object> streamEntrys = jedis.xautoclaim(SafeEncoder.encode("xpending-stream"),
            SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"),
            50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1));
    Map.Entry<StreamEntryID, List<StreamEntry>> res = BuilderFactory.STREAM_AUTO_CLAIM_RESPONSE.build(streamEntrys);
    assertEquals(1, res.getValue().size());
    assertEquals(pendingRange.get(0).getID(), res.getValue().get(0).getID());
    assertEquals("v1", res.getValue().get(0).getFields().get("f1"));
  }

  @Test
  public void xautoclaimJustId() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xpending-stream", (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));
    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    // Auto claim pending events to different consumer
    Map.Entry<StreamEntryID, List<StreamEntryID>> streamEntrys = jedis.xautoclaimJustId("xpending-stream", "xpending-group",
            "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1));
    assertEquals(1, streamEntrys.getValue().size());
    assertEquals(pendingRange.get(0).getID().getTime(), streamEntrys.getValue().get(0).getTime());
    assertEquals(pendingRange.get(0).getID().getSequence(), streamEntrys.getValue().get(0).getSequence());
  }

  @Test
  public void xautoclaimJustIdBinary() {
    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    jedis.xadd("xpending-stream", XAddParams.xAddParams(), map);

    assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

    // Read the event from Stream put it on pending
    jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    List<StreamPendingEntry> pendingRange = jedis.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));
    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    // Auto claim pending events to different consumer
    List<Object> streamEntrys = jedis.xautoclaimJustId(SafeEncoder.encode("xpending-stream"),
            SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"),
            50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1));
    Map.Entry<StreamEntryID, List<StreamEntryID>> res = BuilderFactory.STREAM_AUTO_CLAIM_JUSTID_RESPONSE.build(streamEntrys);
    assertEquals(1, res.getValue().size());
    assertEquals(pendingRange.get(0).getID().getTime(), res.getValue().get(0).getTime());
    assertEquals(pendingRange.get(0).getID().getSequence(), res.getValue().get(0).getSequence());
  }

  @Test
  public void xinfo() throws InterruptedException {

    final String STREAM_NAME = "xadd-stream1";
    final String F1 = "f1";
    final String V1 = "v1";
    final String V2 = "v2";
    final String G1 = "G1";
    final String G2 = "G2";
    final String MY_CONSUMER = "myConsumer";
    final String MY_CONSUMER2 = "myConsumer2";

    final RedisVersion redisVersion = RedisVersionUtil.getRedisVersion(jedis);

    Map<String, String> map1 = new HashMap<>();
    map1.put(F1, V1);
    StreamEntryID id1 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1);
    map1.put(F1, V2);
    StreamEntryID id2 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1);
    assertNotNull(id1);
    StreamInfo streamInfo = jedis.xinfoStream(STREAM_NAME);
    assertNotNull(id2);

    jedis.xgroupCreate(STREAM_NAME, G1, StreamEntryID.XGROUP_LAST_ENTRY, false);
    Map<String, StreamEntryID> streamQeury11 = singletonMap(
        STREAM_NAME, new StreamEntryID("0-0"));
    jedis.xreadGroup(G1, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11);

    Thread.sleep(1);

    List<StreamGroupInfo> groupInfo = jedis.xinfoGroups(STREAM_NAME);
    List<StreamConsumersInfo> consumersInfo = jedis.xinfoConsumers(STREAM_NAME, G1);
    List<StreamConsumerInfo> consumerInfo = jedis.xinfoConsumers2(STREAM_NAME, G1);

    // Stream info test
    assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.LENGTH));
    assertEquals(1L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_KEYS));
    assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_NODES));
    assertEquals(0L, streamInfo.getStreamInfo().get(StreamInfo.GROUPS));
    assertEquals(V1, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.FIRST_ENTRY)).getFields().get(F1));
    assertEquals(V2, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.LAST_ENTRY)).getFields().get(F1));
    assertEquals(id2, streamInfo.getStreamInfo().get(StreamInfo.LAST_GENERATED_ID));

    // Using getters
    assertEquals(2, streamInfo.getLength());
    assertEquals(1, streamInfo.getRadixTreeKeys());
    assertEquals(2, streamInfo.getRadixTreeNodes());
    assertEquals(0, streamInfo.getGroups());
    assertEquals(V1, streamInfo.getFirstEntry().getFields().get(F1));
    assertEquals(V2, streamInfo.getLastEntry().getFields().get(F1));
    assertEquals(id2, streamInfo.getLastGeneratedId());

    // Group info test
    assertEquals(1, groupInfo.size());
    assertEquals(G1, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.NAME));
    assertEquals(1L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.CONSUMERS));
    assertEquals(0L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.PENDING));
    assertEquals(id2, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.LAST_DELIVERED));

    // Using getters
    assertEquals(1, groupInfo.size());
    assertEquals(G1, groupInfo.get(0).getName());
    assertEquals(1, groupInfo.get(0).getConsumers());
    assertEquals(0, groupInfo.get(0).getPending());
    assertEquals(id2, groupInfo.get(0).getLastDeliveredId());

    // Consumers info test
    assertEquals(MY_CONSUMER,
      consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.NAME));
    assertEquals(0L, consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING));
    assertTrue((Long) consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.IDLE) > 0);

    // Using getters
    assertEquals(MY_CONSUMER, consumersInfo.get(0).getName());
    assertEquals(0L, consumersInfo.get(0).getPending());
    assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L));

    if ( redisVersion.isGreaterThanOrEqualTo(RedisVersion.V7_2_0)) {
      assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class));
    }

    // Consumer info test
    assertEquals(MY_CONSUMER,
      consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.NAME));
    assertEquals(0L, consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.PENDING));
    assertTrue((Long) consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.IDLE) > 0);

    // Using getters
    assertEquals(MY_CONSUMER, consumerInfo.get(0).getName());
    assertEquals(0L, consumerInfo.get(0).getPending());
    assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L));
    if (redisVersion.isGreaterThanOrEqualTo(RedisVersion.V7_2_0)) {
      assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class));
    }

    // test with more groups and consumers
    jedis.xgroupCreate(STREAM_NAME, G2, StreamEntryID.XGROUP_LAST_ENTRY, false);
    jedis.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11);
    jedis.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11);
    jedis.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11);

    List<StreamGroupInfo> manyGroupsInfo = jedis.xinfoGroups(STREAM_NAME);
    List<StreamConsumersInfo> manyConsumersInfo = jedis.xinfoConsumers(STREAM_NAME, G2);
    List<StreamConsumerInfo> manyConsumerInfo = jedis.xinfoConsumers2(STREAM_NAME, G2);

    assertEquals(2, manyGroupsInfo.size());
    assertEquals(2, manyConsumersInfo.size());
    assertEquals(2, manyConsumerInfo.size());

    StreamFullInfo streamInfoFull = jedis.xinfoStreamFull(STREAM_NAME);

    assertEquals(2, streamInfoFull.getEntries().size());
    assertEquals(2, streamInfoFull.getGroups().size());
    assertEquals(2, streamInfoFull.getLength());
    assertEquals(1, streamInfoFull.getRadixTreeKeys());
    assertEquals(2, streamInfoFull.getRadixTreeNodes());
    assertEquals(0, streamInfo.getGroups());
    assertEquals(G1, streamInfoFull.getGroups().get(0).getName());
    assertEquals(G2, streamInfoFull.getGroups().get(1).getName());
    assertEquals(V1, streamInfoFull.getEntries().get(0).getFields().get(F1));
    assertEquals(V2, streamInfoFull.getEntries().get(1).getFields().get(F1));
    assertEquals(id2, streamInfoFull.getLastGeneratedId());

    streamInfoFull = jedis.xinfoStreamFull(STREAM_NAME, 10);
    assertEquals(G1, streamInfoFull.getGroups().get(0).getName());
    assertEquals(G2, streamInfoFull.getGroups().get(1).getName());
    assertEquals(V1, streamInfoFull.getEntries().get(0).getFields().get(F1));
    assertEquals(V2, streamInfoFull.getEntries().get(1).getFields().get(F1));
    assertEquals(id2, streamInfoFull.getLastGeneratedId());

    // Not existing key - redis cli return error so we expect exception
    try {
      jedis.xinfoStream("random");
      fail("Command should fail");
    } catch (JedisException e) {
      assertEquals("ERR no such key", e.getMessage());
    }
  }

  @Test
  public void xinfoStreamFullWithPending() {

    Map<String, String> map = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
    StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
    jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

    Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1), streamQeury1);
    assertEquals(1, range.size());
    assertEquals(1, range.get(0).getValue().size());

    StreamFullInfo full = jedis.xinfoStreamFull("streamfull2");
    assertEquals(1, full.getGroups().size());
    StreamGroupFullInfo group = full.getGroups().get(0);
    assertEquals("xreadGroup-group", group.getName());

    assertEquals(1, group.getPending().size());
    List<Object> groupPendingEntry = group.getPending().get(0);
    assertEquals(id1, groupPendingEntry.get(0));
    assertEquals("xreadGroup-consumer", groupPendingEntry.get(1));

    assertEquals(1, group.getConsumers().size());
    StreamConsumerFullInfo consumer = group.getConsumers().get(0);
    assertEquals("xreadGroup-consumer", consumer.getName());
    assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L));
    if (RedisVersionUtil.getRedisVersion(jedis).isGreaterThanOrEqualTo(RedisVersion.V7_2_0)) {
      assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L));
    }
    assertEquals(1, consumer.getPending().size());
    List<Object> consumerPendingEntry = consumer.getPending().get(0);
    assertEquals(id1, consumerPendingEntry.get(0));
  }

  @Test
  public void pipeline() {
    Map<String, String> map = new HashMap<>();
    map.put("a", "b");
    Pipeline p = jedis.pipelined();
    Response<StreamEntryID> id1 = p.xadd("stream1", StreamEntryID.NEW_ENTRY, map);
    Response<StreamEntryID> id2 = p.xadd("stream1", StreamEntryID.NEW_ENTRY, map);
    Response<List<StreamEntry>> results = p.xrange("stream1", (StreamEntryID) null, (StreamEntryID) null, 2);
    p.sync();

    List<StreamEntry> entries = results.get();
    assertEquals(2, entries.size());
    assertEquals(id1.get(), entries.get(0).getID());
    assertEquals(map, entries.get(0).getFields());
    assertEquals(id2.get(), entries.get(1).getID());
    assertEquals(map, entries.get(1).getFields());

    p = jedis.pipelined();
    Response<List<StreamEntry>> results2 = p.xrevrange("stream1", null, id1.get(), 2);
    p.sync();
    assertEquals(2, results2.get().size());
  }

  @Test
  public void transaction() {
    Map<String, String> map = new HashMap<>();
    map.put("a", "b");
    Transaction t = jedis.multi();
    Response<StreamEntryID> id1 = t.xadd("stream1", StreamEntryID.NEW_ENTRY, map);
    Response<StreamEntryID> id2 = t.xadd("stream1", StreamEntryID.NEW_ENTRY, map);
    Response<List<StreamEntry>> results = t.xrange("stream1", (StreamEntryID) null, (StreamEntryID) null, 2);
    t.exec();

    List<StreamEntry> entries = results.get();
    assertEquals(2, entries.size());
    assertEquals(id1.get(), entries.get(0).getID());
    assertEquals(map, entries.get(0).getFields());
    assertEquals(id2.get(), entries.get(1).getID());
    assertEquals(map, entries.get(1).getFields());
  }
}