StreamsPipelineCommandsTest.java

package redis.clients.jedis.commands.unified.pipeline;

import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.redis.test.annotations.SinceRedisVersion;
import io.redis.test.utils.RedisVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import redis.clients.jedis.util.RedisVersionUtil;
import org.hamcrest.Matchers;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.Response;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XAutoClaimParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.StreamConsumerFullInfo;
import redis.clients.jedis.resps.StreamConsumerInfo;
import redis.clients.jedis.resps.StreamConsumersInfo;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamFullInfo;
import redis.clients.jedis.resps.StreamGroupFullInfo;
import redis.clients.jedis.resps.StreamGroupInfo;
import redis.clients.jedis.resps.StreamInfo;
import redis.clients.jedis.resps.StreamPendingEntry;
import redis.clients.jedis.util.SafeEncoder;

@ParameterizedClass
@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions")
public class StreamsPipelineCommandsTest extends PipelineCommandsTestBase {

  public StreamsPipelineCommandsTest(RedisProtocol protocol) {
    super(protocol);
  }

  @Test
  public void xaddWrongNumberOfArguments() {
    Map<String, String> map1 = new HashMap<>();
    pipe.xadd("stream1", (StreamEntryID) null, map1);

    assertThat(pipe.syncAndReturnAll(),
        contains(
            both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments")))
        ));
  }

  @Test
  public void xadd() {
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");
    pipe.xadd("xadd-stream1", (StreamEntryID) null, map1);

    Map<String, String> map2 = new HashMap<>();
    map2.put("f1", "v1");
    map2.put("f2", "v2");
    pipe.xadd("xadd-stream1", (StreamEntryID) null, map2);

    List<?> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class)
    ));

    assertThat((StreamEntryID) results.get(1),
        greaterThan((StreamEntryID) results.get(0)));
  }

  @Test
  public void xaddMaxLen() {
    Map<String, String> map4 = new HashMap<>();
    map4.put("f2", "v2");
    map4.put("f3", "v3");
    StreamEntryID idIn = new StreamEntryID(1000, 1L);
    pipe.xadd("xadd-stream2", idIn, map4);

    Map<String, String> map5 = new HashMap<>();
    map5.put("f4", "v4");
    map5.put("f5", "v5");
    pipe.xadd("xadd-stream2", (StreamEntryID) null, map5);

    pipe.xlen("xadd-stream2");

    Map<String, String> map6 = new HashMap<>();
    map6.put("f4", "v4");
    map6.put("f5", "v5");
    pipe.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(2));

    pipe.xlen("xadd-stream2");

    List<?> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        equalTo(idIn),
        instanceOf(StreamEntryID.class),
        equalTo(2L),
        instanceOf(StreamEntryID.class),
        equalTo(2L)
    ));

    assertThat((StreamEntryID) results.get(1),
        greaterThan((StreamEntryID) results.get(0)));

    assertThat((StreamEntryID) results.get(3),
        greaterThan((StreamEntryID) results.get(1)));
  }

  @Test
  public void xaddWithParamsWrongNumberOfArguments() {
    pipe.xadd("stream1", new HashMap<>(), XAddParams.xAddParams());
    pipe.xadd("stream1", XAddParams.xAddParams(), new HashMap<>());

    assertThat(pipe.syncAndReturnAll(),
        contains(
            both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments"))),
            both(instanceOf(JedisDataException.class)).and(hasToString(containsString("wrong number of arguments")))
        ));
  }

  @Test
  public void xaddWithParams() {
    pipe.xadd("xadd-stream1", (StreamEntryID) null, singletonMap("f1", "v1"));

    Map<String, String> map2 = new HashMap<>();
    map2.put("f1", "v1");
    map2.put("f2", "v2");
    pipe.xadd("xadd-stream1", map2, XAddParams.xAddParams());

    List<?> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class)
    ));

    assertThat((StreamEntryID) results.get(1),
        greaterThan((StreamEntryID) results.get(0)));
  }

  @Test
  public void xaddWithParamsTrim() {
    Map<String, String> map3 = new HashMap<>();
    map3.put("f2", "v2");
    map3.put("f3", "v3");
    StreamEntryID idIn = new StreamEntryID(1000, 1L);
    pipe.xadd("xadd-stream2", XAddParams.xAddParams().id(idIn), map3);

    Map<String, String> map4 = new HashMap<>();
    map4.put("f2", "v2");
    map4.put("f3", "v3");
    StreamEntryID idIn2 = new StreamEntryID(2000, 1L);
    pipe.xadd("xadd-stream2", map4, XAddParams.xAddParams().id(idIn2));

    Map<String, String> map5 = new HashMap<>();
    map5.put("f4", "v4");
    map5.put("f5", "v5");
    pipe.xadd("xadd-stream2", XAddParams.xAddParams(), map5);

    pipe.xlen("xadd-stream2");

    Map<String, String> map6 = new HashMap<>();
    map6.put("f4", "v4");
    map6.put("f5", "v5");
    pipe.xadd("xadd-stream2", map6, XAddParams.xAddParams().maxLen(3).exactTrimming());

    pipe.xlen("xadd-stream2");

    List<?> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        equalTo(idIn),
        equalTo(idIn2),
        instanceOf(StreamEntryID.class),
        equalTo(3L),
        instanceOf(StreamEntryID.class),
        equalTo(3L)
    ));

    assertThat((StreamEntryID) results.get(2),
        greaterThan((StreamEntryID) results.get(1)));

    assertThat((StreamEntryID) results.get(4),
        greaterThan((StreamEntryID) results.get(2)));
  }

  @Test
  public void xaddWithParamsNoMkStream() {
    pipe.xadd("xadd-stream3", XAddParams.xAddParams().noMkStream().maxLen(3).exactTrimming(), singletonMap("f1", "v1"));

    assertThat(pipe.syncAndReturnAll(),
        contains(
            nullValue()
        ));

    assertFalse(jedis.exists("xadd-stream3"));
  }

  @Test
  public void xaddWithParamsMinId() {
    Map<String, String> map6 = new HashMap<>();
    map6.put("f4", "v4");
    map6.put("f5", "v5");

    StreamEntryID id = new StreamEntryID(2);
    pipe.xadd("xadd-stream3", map6, XAddParams.xAddParams().minId("2").id(id));

    pipe.xlen("xadd-stream3");

    StreamEntryID id1 = new StreamEntryID(3);
    pipe.xadd("xadd-stream3", XAddParams.xAddParams().minId("4").id(id1), map6);

    pipe.xlen("xadd-stream3");

    List<?> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        equalTo(id),
        equalTo(1L),
        equalTo(id1),
        equalTo(0L)
    ));
  }

  @Test
  @SinceRedisVersion(value = "7.0.0", message = "Added support for XADD ID auto sequence is introduced in 7.0.0")
  public void xaddParamsId() {
    String key = "kk";
    Map<String, String> map = singletonMap("ff", "vv");

    pipe.xadd(key, XAddParams.xAddParams().id(new StreamEntryID(0, 1)), map);
    pipe.xadd(key, XAddParams.xAddParams().id(2, 3), map);
    pipe.xadd(key, XAddParams.xAddParams().id(4), map);
    pipe.xadd(key, XAddParams.xAddParams().id("5-6"), map);
    pipe.xadd(key, XAddParams.xAddParams().id("7-8".getBytes()), map);
    pipe.xadd(key, XAddParams.xAddParams(), map);

    List<Object> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        equalTo(new StreamEntryID(0, 1)),
        equalTo(new StreamEntryID(2, 3)),
        equalTo(new StreamEntryID(4, 0)),
        equalTo(new StreamEntryID(5, 6)),
        equalTo(new StreamEntryID(7, 8)),
        instanceOf(StreamEntryID.class)
    ));

    assertThat((StreamEntryID) results.get(5),
        greaterThan((StreamEntryID) results.get(4)));
  }

  @Test
  public void xdel() {
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");

    pipe.xadd("xdel-stream", (StreamEntryID) null, map1);
    pipe.xadd("xdel-stream", (StreamEntryID) null, map1);

    List<Object> results = pipe.syncAndReturnAll();

    assertThat(results, contains(
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class)
    ));

    StreamEntryID id1 = (StreamEntryID) results.get(1);

    pipe.xlen("xdel-stream");
    pipe.xdel("xdel-stream", id1);
    pipe.xlen("xdel-stream");

    assertThat(pipe.syncAndReturnAll(), contains(
        2L,
        1L,
        1L
    ));
  }

  @Test
  public void xlen() {
    pipe.xlen("xlen-stream");

    Map<String, String> map = new HashMap<>();
    map.put("f1", "v1");
    pipe.xadd("xlen-stream", (StreamEntryID) null, map);

    pipe.xlen("xlen-stream");

    pipe.xadd("xlen-stream", (StreamEntryID) null, map);

    pipe.xlen("xlen-stream");

    assertThat(pipe.syncAndReturnAll(), contains(
        equalTo(0L),
        instanceOf(StreamEntryID.class),
        equalTo(1L),
        instanceOf(StreamEntryID.class),
        equalTo(2L)
    ));
  }

  @Test
  public void xrange() {
    Response<List<StreamEntry>> range = pipe.xrange("xrange-stream", null, (StreamEntryID) null, Integer.MAX_VALUE);

    Map<String, String> map1 = singletonMap("f1", "v1");
    Map<String, String> map2 = singletonMap("f2", "v2");
    Map<String, String> map3 = singletonMap("f3", "v3");

    Response<StreamEntryID> id1Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map1);

    Response<StreamEntryID> id2Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map2);

    pipe.sync();

    assertThat(range.get(), empty());
    assertThat(id1Response.get(), notNullValue());
    assertThat(id2Response.get(), notNullValue());

    StreamEntryID id1 = id1Response.get();
    StreamEntryID id2 = id2Response.get();

    Response<List<StreamEntry>> range2 = pipe.xrange("xrange-stream", null, (StreamEntryID) null, 3);
    Response<List<StreamEntry>> range3 = pipe.xrange("xrange-stream", id1, null, 2);
    Response<List<StreamEntry>> range4 = pipe.xrange("xrange-stream", id1, id2, 2);
    Response<List<StreamEntry>> range5 = pipe.xrange("xrange-stream", id1, id2, 1);
    Response<List<StreamEntry>> range6 = pipe.xrange("xrange-stream", id2, null, 4);

    Response<StreamEntryID> id3Response = pipe.xadd("xrange-stream", (StreamEntryID) null, map3);

    pipe.sync();

    assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));
    assertThat(range3.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));
    assertThat(range4.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));
    assertThat(range5.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1));
    assertThat(range6.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));

    assertThat(id3Response.get(), notNullValue());

    StreamEntryID id3 = id3Response.get();

    Response<List<StreamEntry>> range7 = pipe.xrange("xrange-stream", id3, id3, 4);
    Response<List<StreamEntry>> range8 = pipe.xrange("xrange-stream", null, (StreamEntryID) null);
    Response<List<StreamEntry>> range9 = pipe.xrange("xrange-stream", StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID);

    pipe.sync();

    assertThat(range7.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3));
    assertThat(range8.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2, id3));
    assertThat(range9.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2, id3));
  }

  @Test
  public void xrangeExclusive() {
    StreamEntryID id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f1", "v1"));
    StreamEntryID id2 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f2", "v2"));

    Response<List<StreamEntry>> range1 = pipe.xrange("xrange-stream", id1.toString(), "+", 2);
    Response<List<StreamEntry>> range2 = pipe.xrange("xrange-stream", "(" + id1, "+", 2);

    pipe.sync();

    assertThat(range1.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));
    assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));
  }

  @Test
  public void xreadWithParams() {

    final String stream1 = "xread-stream1";
    final String stream2 = "xread-stream2";

    Map<String, StreamEntryID> streamQuery1 = singletonMap(stream1, new StreamEntryID());

    // Before creating Stream
    pipe.xread(XReadParams.xReadParams().block(1), streamQuery1);
    pipe.xread(XReadParams.xReadParams(), streamQuery1);

    assertThat(pipe.syncAndReturnAll(), contains(
        nullValue(),
        nullValue()
    ));

    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd(stream1, (StreamEntryID) null, map1);

    Map<String, String> map2 = singletonMap("f2", "v2");
    StreamEntryID id2 = jedis.xadd(stream2, (StreamEntryID) null, map2);

    // Read only a single Stream
    Response<List<Entry<String, List<StreamEntry>>>> streams1 =
        pipe.xread(XReadParams.xReadParams().count(1).block(1), streamQuery1);

    Response<List<Entry<String, List<StreamEntry>>>> streams2 =
        pipe.xread(XReadParams.xReadParams().block(1), singletonMap(stream1, id1));

    Response<List<Entry<String, List<StreamEntry>>>> streams3 =
        pipe.xread(XReadParams.xReadParams(), singletonMap(stream1, id1));

    pipe.sync();

    assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains(stream1));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1));

    assertThat(streams2.get(), nullValue());

    assertThat(streams3.get(), nullValue());

    // Read from two Streams
    Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
    streamQuery2.put(stream1, new StreamEntryID());
    streamQuery2.put(stream2, new StreamEntryID());

    Response<List<Entry<String, List<StreamEntry>>>> streams4 =
        pipe.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2);

    pipe.sync();

    assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains(stream1, stream2));

    assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));

    assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1, map2));
  }

  @Test
  public void xreadBlockZero() throws InterruptedException {
    final AtomicReference<List<Entry<String, List<StreamEntry>>>> readRef = new AtomicReference<>();
    Thread t = new Thread(new Runnable() {
      @Override
      public void run() {
        long startTime = System.currentTimeMillis();
        Pipeline blockPipe = jedis.pipelined();
        Map<String, StreamEntryID> streamQuery = singletonMap("block0-stream", new StreamEntryID());
        Response<List<Entry<String, List<StreamEntry>>>> read =
            blockPipe.xread(XReadParams.xReadParams().block(0), streamQuery);
        blockPipe.sync();
        long endTime = System.currentTimeMillis();
        assertTrue(endTime - startTime > 500);
        assertNotNull(read);
        readRef.set(read.get());
      }
    }, "xread-block-0-thread");
    t.start();
    Thread.sleep(1000);
    StreamEntryID addedId = jedis.xadd("block0-stream", (StreamEntryID) null, singletonMap("foo", "bar"));
    t.join();

    assertThat(readRef.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("block0-stream"));

    assertThat(readRef.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(addedId));
  }

  @Test
  public void xtrim() {
    Map<String, String> map1 = new HashMap<String, String>();
    map1.put("f1", "v1");

    for (int i = 1; i <= 5; i++) {
      pipe.xadd("xtrim-stream", (StreamEntryID) null, map1);
    }

    pipe.xlen("xtrim-stream");

    pipe.xtrim("xtrim-stream", 3, false);

    pipe.xlen("xtrim-stream");

    assertThat(pipe.syncAndReturnAll(), contains(
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        equalTo(5L),
        equalTo(2L),
        equalTo(3L)
    ));
  }

  @Test
  public void xtrimWithParams() {
    Map<String, String> map1 = new HashMap<>();
    map1.put("f1", "v1");
    for (int i = 1; i <= 5; i++) {
      pipe.xadd("xtrim-stream", new StreamEntryID("0-" + i), map1);
    }

    pipe.xlen("xtrim-stream");

    pipe.xtrim("xtrim-stream", XTrimParams.xTrimParams().maxLen(3).exactTrimming());

    pipe.xlen("xtrim-stream");

    // minId
    pipe.xtrim("xtrim-stream", XTrimParams.xTrimParams().minId("0-4").exactTrimming());

    pipe.xlen("xtrim-stream");

    assertThat(pipe.syncAndReturnAll(), contains(
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        instanceOf(StreamEntryID.class),
        equalTo(5L),
        equalTo(2L),
        equalTo(3L),
        equalTo(1L),
        equalTo(2L)
    ));
  }

  @Test
  public void xrevrange() {
    Response<List<StreamEntry>> range = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null, Integer.MAX_VALUE);

    Map<String, String> map1 = singletonMap("f1", "v1");
    Response<StreamEntryID> id1Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map1);
    Map<String, String> map2 = singletonMap("f2", "v2");
    Response<StreamEntryID> id2Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map2);

    pipe.sync();

    assertThat(range.get(), empty());
    assertThat(id1Response.get(), notNullValue());
    assertThat(id2Response.get(), notNullValue());

    StreamEntryID id1 = id1Response.get();
    StreamEntryID id2 = id2Response.get();

    Response<List<StreamEntry>> range2 = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null, 3);
    Response<List<StreamEntry>> range3 = pipe.xrevrange("xrevrange-stream", null, id1, 2);
    Response<List<StreamEntry>> range4 = pipe.xrevrange("xrevrange-stream", id2, id1, 2);
    Response<List<StreamEntry>> range5 = pipe.xrevrange("xrevrange-stream", id2, id1, 1);
    Response<List<StreamEntry>> range6 = pipe.xrevrange("xrevrange-stream", null, id2, 4);

    Map<String, String> map3 = singletonMap("f3", "v3");
    Response<StreamEntryID> id3Response = pipe.xadd("xrevrange-stream", (StreamEntryID) null, map3);

    pipe.sync();

    assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1));
    assertThat(range3.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1));
    assertThat(range4.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1));
    assertThat(range5.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));
    assertThat(range6.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));

    assertThat(id3Response.get(), notNullValue());

    StreamEntryID id3 = id3Response.get();

    Response<List<StreamEntry>> range7 = pipe.xrevrange("xrevrange-stream", id3, id3, 4);
    Response<List<StreamEntry>> range8 = pipe.xrevrange("xrevrange-stream", null, (StreamEntryID) null);
    Response<List<StreamEntry>> range9 = pipe.xrevrange("xrevrange-stream", StreamEntryID.MAXIMUM_ID, StreamEntryID.MINIMUM_ID);

    pipe.sync();

    assertThat(range7.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3));
    assertThat(range8.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3, id2, id1));
    assertThat(range9.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id3, id2, id1));
  }

  @Test
  public void xrevrangeExclusive() {
    StreamEntryID id1 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f1", "v1"));
    StreamEntryID id2 = jedis.xadd("xrange-stream", (StreamEntryID) null, singletonMap("f2", "v2"));

    Response<List<StreamEntry>> range1 = pipe.xrevrange("xrange-stream", "+", id1.toString(), 2);
    Response<List<StreamEntry>> range2 = pipe.xrevrange("xrange-stream", "+", "(" + id1, 2);

    pipe.sync();

    assertThat(range1.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2, id1));
    assertThat(range2.get().stream().map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));
  }

  @Test
  public void xgroup() {
    StreamEntryID id1 = jedis.xadd("xgroup-stream", (StreamEntryID) null, singletonMap("f1", "v1"));

    pipe.xgroupCreate("xgroup-stream", "consumer-group-name", null, false);
    pipe.xgroupSetID("xgroup-stream", "consumer-group-name", id1);
    pipe.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.XGROUP_LAST_ENTRY, false);

    pipe.xgroupDestroy("xgroup-stream", "consumer-group-name");
    pipe.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer1");
    pipe.xgroupCreateConsumer("xgroup-stream", "consumer-group-name1", "myconsumer2");
    pipe.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer2");

    assertThat(pipe.syncAndReturnAll(), contains(
        "OK",
        "OK",
        "OK",
        1L,
        0L,
        true,
        0L
    ));
  }

  @Test
  public void xreadGroupWithParams() {
    // Simple xreadGroup with NOACK
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map1);

    jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false);

    Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    Response<List<Entry<String, List<StreamEntry>>>> streams1 =
        pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
            XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery1);

    pipe.sync();

    assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-stream1"));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1));

    Map<String, String> map2 = singletonMap("f2", "v2");
    StreamEntryID id2 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map2);

    Map<String, String> map3 = singletonMap("f3", "v3");
    StreamEntryID id3 = jedis.xadd("xreadGroup-stream2", (StreamEntryID) null, map3);

    jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false);

    // Read only a single Stream
    Response<List<Entry<String, List<StreamEntry>>>> streams2 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery1);

    // Read from two Streams
    Map<String, StreamEntryID> streamQuery2 = new LinkedHashMap<>();
    streamQuery2.put("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    streamQuery2.put("xreadGroup-stream2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    Response<List<Entry<String, List<StreamEntry>>>> streams3 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery2);

    pipe.sync();

    assertThat(streams2.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-stream1"));

    assertThat(streams2.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id2));

    assertThat(streams2.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map2));

    assertThat(streams3.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-stream2"));

    assertThat(streams3.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id3));

    assertThat(streams3.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map3));

    // Read only fresh messages
    Map<String, String> map4 = singletonMap("f4", "v4");
    StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map4);

    Map<String, StreamEntryID> streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    Response<List<Entry<String, List<StreamEntry>>>> streams4 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQueryFresh);

    pipe.sync();

    assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-stream1"));

    assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(id4));

    assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map4));
  }

  @Test
  public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() {
    // Add two message to stream
    Map<String, String> map1 = singletonMap("f1", "v1");

    XAddParams xAddParams = XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY).maxLen(2);
    StreamEntryID firstMessageEntryId = jedis.xadd("xreadGroup-discard-stream1", xAddParams, map1);

    jedis.xadd("xreadGroup-discard-stream1", xAddParams, singletonMap("f2", "v2"));

    pipe.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false);

    Map<String, StreamEntryID> streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    Response<List<Entry<String, List<StreamEntry>>>> streams1 =
        pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
            XReadGroupParams.xReadGroupParams().count(1), streamQuery1);

    pipe.sync();

    assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-discard-stream1"));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(firstMessageEntryId));

    assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(map1));

    // Add third message, the fields of pending message1 will be discarded by redis-server
    jedis.xadd("xreadGroup-discard-stream1", xAddParams, singletonMap("f3", "v3"));

    Map<String, StreamEntryID> streamQueryPending = singletonMap("xreadGroup-discard-stream1", new StreamEntryID());

    Response<List<Entry<String, List<StreamEntry>>>> pendingMessages =
        pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
            XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQueryPending);

    pipe.sync();

    assertThat(pendingMessages.get().stream().map(Entry::getKey).collect(Collectors.toList()),
        contains("xreadGroup-discard-stream1"));

    assertThat(pendingMessages.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList()), contains(firstMessageEntryId));

    assertThat(pendingMessages.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getFields).collect(Collectors.toList()), contains(nullValue()));
  }

  @Test
  public void xack() {
    pipe.xadd("xack-stream", (StreamEntryID) null, singletonMap("f1", "v1"));

    pipe.xgroupCreate("xack-stream", "xack-group", null, false);

    Map<String, StreamEntryID> streamQuery1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    // Empty Stream
    Response<List<Entry<String, List<StreamEntry>>>> streams1 =
        pipe.xreadGroup("xack-group", "xack-consumer",
            XReadGroupParams.xReadGroupParams().count(1).block(1), streamQuery1);

    pipe.sync();

    List<StreamEntryID> ids = streams1.get().stream().map(Entry::getValue).flatMap(List::stream)
        .map(StreamEntry::getID).collect(Collectors.toList());
    assertThat(ids, hasSize(1));

    Response<Long> xackResponse = pipe.xack("xack-stream", "xack-group", ids.get(0));

    pipe.sync();

    assertThat(xackResponse.get(), equalTo(1L));
  }

  @Test
  public void xpendingWithParams() {
    Map<String, String> map = singletonMap("f1", "v1");

    StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map);

    assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false));

    Map<String, StreamEntryID> streamQeury1 = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    // Read the event from Stream put it on pending
    Response<List<Entry<String, List<StreamEntry>>>> range = pipe.xreadGroup("xpending-group",
        "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), streamQeury1);

    // Get the pending event
    Response<List<StreamPendingEntry>> pending1 =
        pipe.xpending("xpending-stream", "xpending-group",
            new XPendingParams().count(3).consumer("xpending-consumer"));

    // Without consumer
    Response<List<StreamPendingEntry>> pending2 =
        pipe.xpending("xpending-stream", "xpending-group",
            new XPendingParams().count(3));

    // with idle
    Response<List<StreamPendingEntry>> pending3 =
        pipe.xpending("xpending-stream", "xpending-group",
            new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3));

    pipe.sync();

    assertThat(pending1.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending1.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(pending1.get().stream().map(StreamPendingEntry::getDeliveredTimes).collect(Collectors.toList()),
        contains(1L));

    assertThat(pending2.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending2.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(pending2.get().stream().map(StreamPendingEntry::getDeliveredTimes).collect(Collectors.toList()),
        contains(1L));

    assertThat(pending3.get(), empty());
  }

  @Test
  public void xpendingRange() {
    StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, singletonMap("f1", "v1"));
    StreamEntryID id2 = jedis.xadd("xpending-stream", (StreamEntryID) null, singletonMap("f2", "v2"));

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // read 1 message from the group with each consumer
    Map<String, StreamEntryID> streamQeury = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    pipe.xreadGroup("xpending-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
    pipe.xreadGroup("xpending-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);

    Response<List<StreamPendingEntry>> pending1 = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams("(0", "+", 5));

    Response<List<StreamPendingEntry>> pending2 = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5));

    pipe.sync();

    assertThat(pending1.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("consumer1", "consumer2"));

    assertThat(pending1.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1, id2));

    assertThat(pending2.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("consumer1", "consumer2"));

    assertThat(pending2.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1, id2));
  }

  @Test
  public void xclaimWithParams() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending =
        pipe.xpending("xpending-stream", "xpending-group",
            XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    // must sync before the sleep
    pipe.sync();

    // Sleep a bit so we can claim events pending for more than 50ms
    Thread.sleep(100);

    Response<List<StreamEntry>> claimed =
        pipe.xclaim("xpending-stream", "xpending-group", "xpending-consumer2", 50,
            XClaimParams.xClaimParams().idle(0).retryCount(0), id1);

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(claimed.get().stream().map(StreamEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(claimed.get().stream().map(StreamEntry::getFields).collect(Collectors.toList()),
        contains(map1));
  }

  @Test
  public void xclaimJustId() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending =
        pipe.xpending("xpending-stream", "xpending-group",
            XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    // must sync before the sleep
    pipe.sync();

    // Sleep for 100ms so we can claim events pending for more than 50ms
    Thread.sleep(100);

    Response<List<StreamEntryID>> claimedIds =
        pipe.xclaimJustId("xpending-stream", "xpending-group", "xpending-consumer2", 50,
            XClaimParams.xClaimParams().idle(0).retryCount(0), id1);

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(claimedIds.get(), contains(id1));
  }

  @Test
  public void xautoclaim() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", (StreamEntryID) null, map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    pipe.sync();

    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    Thread.sleep(100);

    // Auto claim pending events to different consumer
    Response<Entry<StreamEntryID, List<StreamEntry>>> autoclaimed = pipe.xautoclaim("xpending-stream", "xpending-group",
        "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1));

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(autoclaimed.get().getValue().stream().map(StreamEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(autoclaimed.get().getValue().stream().map(StreamEntry::getFields).collect(Collectors.toList()),
        contains(map1));
  }

  @Test
  public void xautoclaimBinary() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    pipe.sync();

    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    Thread.sleep(100);

    // Auto claim pending events to different consumer
    Response<List<Object>> autoclaimed = pipe.xautoclaim(SafeEncoder.encode("xpending-stream"),
        SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"),
        50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1));

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    Map.Entry<StreamEntryID, List<StreamEntry>> autoclaimedParsed =
        BuilderFactory.STREAM_AUTO_CLAIM_RESPONSE.build(autoclaimed.get());

    assertThat(autoclaimedParsed.getValue().stream().map(StreamEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(autoclaimedParsed.getValue().stream().map(StreamEntry::getFields).collect(Collectors.toList()),
        contains(map1));
  }

  @Test
  public void xautoclaimJustId() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    pipe.sync();

    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    Thread.sleep(100);

    // Auto claim pending events to different consumer
    Response<Entry<StreamEntryID, List<StreamEntryID>>> claimedIds = pipe.xautoclaimJustId("xpending-stream", "xpending-group",
        "xpending-consumer2", 50, new StreamEntryID(), new XAutoClaimParams().count(1));

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    assertThat(claimedIds.get().getValue(), contains(id1));
  }

  @Test
  public void xautoclaimJustIdBinary() throws InterruptedException {
    Map<String, String> map1 = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("xpending-stream", XAddParams.xAddParams(), map1);

    pipe.xgroupCreate("xpending-stream", "xpending-group", null, false);

    // Read the event from Stream put it on pending
    pipe.xreadGroup("xpending-group", "xpending-consumer",
        XReadGroupParams.xReadGroupParams().count(1).block(1),
        singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY));

    // Get the pending event
    Response<List<StreamPendingEntry>> pending = pipe.xpending("xpending-stream", "xpending-group",
        XPendingParams.xPendingParams().count(3).consumer("xpending-consumer"));

    pipe.sync();

    // Sleep for 100ms so we can auto claim events pending for more than 50ms
    Thread.sleep(100);

    // Auto claim pending events to different consumer
    Response<List<Object>> autoclaimed = pipe.xautoclaimJustId(SafeEncoder.encode("xpending-stream"),
        SafeEncoder.encode("xpending-group"), SafeEncoder.encode("xpending-consumer2"),
        50, SafeEncoder.encode(new StreamEntryID().toString()), new XAutoClaimParams().count(1));

    pipe.sync();

    assertThat(pending.get().stream().map(StreamPendingEntry::getConsumerName).collect(Collectors.toList()),
        contains("xpending-consumer"));

    assertThat(pending.get().stream().map(StreamPendingEntry::getID).collect(Collectors.toList()),
        contains(id1));

    Entry<StreamEntryID, List<StreamEntryID>> autoclaimedParsed =
        BuilderFactory.STREAM_AUTO_CLAIM_JUSTID_RESPONSE.build(autoclaimed.get());

    assertThat(autoclaimedParsed.getValue(), contains(id1));
  }

  @Test
  public void xinfo() throws InterruptedException {
    final String STREAM_NAME = "xadd-stream1";
    final String F1 = "f1";
    final String V1 = "v1";
    final String V2 = "v2";
    final String G1 = "G1";
    final String G2 = "G2";
    final String MY_CONSUMER = "myConsumer";
    final String MY_CONSUMER2 = "myConsumer2";

    Map<String, String> map1 = new HashMap<>();
    map1.put(F1, V1);
    StreamEntryID id1 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1);
    map1.put(F1, V2);
    StreamEntryID id2 = jedis.xadd(STREAM_NAME, (StreamEntryID) null, map1);

    Response<StreamInfo> streamInfoResponse = pipe.xinfoStream(STREAM_NAME);

    pipe.xgroupCreate(STREAM_NAME, G1, StreamEntryID.XGROUP_LAST_ENTRY, false);

    Map<String, StreamEntryID> streamQuery1 = singletonMap(STREAM_NAME, new StreamEntryID("0-0"));

    pipe.xreadGroup(G1, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQuery1);

    pipe.sync();

    Thread.sleep(1);

    Response<List<StreamGroupInfo>> groupInfoResponse = pipe.xinfoGroups(STREAM_NAME);
    Response<List<StreamConsumersInfo>> consumersInfoResponse = pipe.xinfoConsumers(STREAM_NAME, G1);
    Response<List<StreamConsumerInfo>> consumerInfoResponse = pipe.xinfoConsumers2(STREAM_NAME, G1);

    pipe.sync();

    // Stream info test
    StreamInfo streamInfo = streamInfoResponse.get();

    assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.LENGTH));
    assertEquals(1L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_KEYS));
    assertEquals(2L, streamInfo.getStreamInfo().get(StreamInfo.RADIX_TREE_NODES));
    assertEquals(0L, streamInfo.getStreamInfo().get(StreamInfo.GROUPS));
    assertEquals(V1, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.FIRST_ENTRY)).getFields().get(F1));
    assertEquals(V2, ((StreamEntry) streamInfo.getStreamInfo().get(StreamInfo.LAST_ENTRY)).getFields().get(F1));
    assertEquals(id2, streamInfo.getStreamInfo().get(StreamInfo.LAST_GENERATED_ID));

    // Using getters
    assertEquals(2, streamInfo.getLength());
    assertEquals(1, streamInfo.getRadixTreeKeys());
    assertEquals(2, streamInfo.getRadixTreeNodes());
    assertEquals(0, streamInfo.getGroups());
    assertEquals(V1, streamInfo.getFirstEntry().getFields().get(F1));
    assertEquals(V2, streamInfo.getLastEntry().getFields().get(F1));
    assertEquals(id2, streamInfo.getLastGeneratedId());

    // Group info test
    List<StreamGroupInfo> groupInfo = groupInfoResponse.get();

    assertEquals(1, groupInfo.size());
    assertEquals(G1, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.NAME));
    assertEquals(1L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.CONSUMERS));
    assertEquals(0L, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.PENDING));
    assertEquals(id2, groupInfo.get(0).getGroupInfo().get(StreamGroupInfo.LAST_DELIVERED));

    // Using getters
    assertEquals(1, groupInfo.size());
    assertEquals(G1, groupInfo.get(0).getName());
    assertEquals(1, groupInfo.get(0).getConsumers());
    assertEquals(0, groupInfo.get(0).getPending());
    assertEquals(id2, groupInfo.get(0).getLastDeliveredId());

    // Consumers info test
    List<StreamConsumersInfo> consumersInfo = consumersInfoResponse.get();

    assertEquals(MY_CONSUMER,
        consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.NAME));
    assertEquals(0L, consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING));
    assertTrue((Long) consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.IDLE) > 0);

    // Using getters
    assertEquals(MY_CONSUMER, consumersInfo.get(0).getName());
    assertEquals(0L, consumersInfo.get(0).getPending());
    assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L));
    if (RedisVersionUtil.getRedisVersion(jedis).isGreaterThanOrEqualTo(RedisVersion.V7_0_0)) {
      assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class));
    }

    // Consumer info test
    List<StreamConsumerInfo> consumerInfo = consumerInfoResponse.get();

    assertEquals(MY_CONSUMER,
        consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.NAME));
    assertEquals(0L, consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.PENDING));
    assertTrue((Long) consumerInfo.get(0).getConsumerInfo().get(StreamConsumerInfo.IDLE) > 0);

    // Using getters
    assertEquals(MY_CONSUMER, consumerInfo.get(0).getName());
    assertEquals(0L, consumerInfo.get(0).getPending());
    assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L));
    if (RedisVersionUtil.getRedisVersion(jedis).isGreaterThanOrEqualTo(RedisVersion.V7_0_0)) {
      assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class));
    }

    // test with more groups and consumers
    pipe.xgroupCreate(STREAM_NAME, G2, StreamEntryID.XGROUP_LAST_ENTRY, false);
    pipe.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1);
    pipe.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQuery1);
    pipe.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1);

    Response<List<StreamGroupInfo>> manyGroupsInfoResponse = pipe.xinfoGroups(STREAM_NAME);
    Response<List<StreamConsumersInfo>> manyConsumersInfoResponse = pipe.xinfoConsumers(STREAM_NAME, G2);
    Response<List<StreamConsumerInfo>> manyConsumerInfoResponse = pipe.xinfoConsumers2(STREAM_NAME, G2);
    Response<StreamFullInfo> streamInfoFullResponse = pipe.xinfoStreamFull(STREAM_NAME);
    Response<StreamFullInfo> streamInfoFull10Response = pipe.xinfoStreamFull(STREAM_NAME, 10);

    pipe.sync();

    List<StreamGroupInfo> manyGroupsInfo = manyGroupsInfoResponse.get();
    List<StreamConsumersInfo> manyConsumersInfo = manyConsumersInfoResponse.get();
    List<StreamConsumerInfo> manyConsumerInfo = manyConsumerInfoResponse.get();
    StreamFullInfo streamInfoFull = streamInfoFullResponse.get();
    StreamFullInfo streamInfoFull10 = streamInfoFull10Response.get();

    assertEquals(2, manyGroupsInfo.size());
    assertEquals(2, manyConsumersInfo.size());
    assertEquals(2, manyConsumerInfo.size());

    assertEquals(2, streamInfoFull.getEntries().size());
    assertEquals(2, streamInfoFull.getGroups().size());
    assertEquals(2, streamInfoFull.getLength());
    assertEquals(1, streamInfoFull.getRadixTreeKeys());
    assertEquals(2, streamInfoFull.getRadixTreeNodes());
    assertEquals(0, streamInfo.getGroups());
    assertEquals(G1, streamInfoFull.getGroups().get(0).getName());
    assertEquals(G2, streamInfoFull.getGroups().get(1).getName());
    assertEquals(V1, streamInfoFull.getEntries().get(0).getFields().get(F1));
    assertEquals(V2, streamInfoFull.getEntries().get(1).getFields().get(F1));
    assertEquals(id2, streamInfoFull.getLastGeneratedId());

    assertEquals(G1, streamInfoFull10.getGroups().get(0).getName());
    assertEquals(G2, streamInfoFull10.getGroups().get(1).getName());
    assertEquals(V1, streamInfoFull10.getEntries().get(0).getFields().get(F1));
    assertEquals(V2, streamInfoFull10.getEntries().get(1).getFields().get(F1));
    assertEquals(id2, streamInfoFull10.getLastGeneratedId());

    // Not existing key - redis cli return error so we expect exception
    pipe.xinfoStream("random");

    assertThat(pipe.syncAndReturnAll(), contains(
        both(instanceOf(JedisDataException.class)).and(hasToString(containsString("ERR no such key")))
    ));
  }

  @Test
  public void xinfoStreamFullWithPending() {
    Map<String, String> map = singletonMap("f1", "v1");
    StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
    StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
    jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

    Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    Response<List<Entry<String, List<StreamEntry>>>> pending = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
        XReadGroupParams.xReadGroupParams().count(1), streamQeury1);

    Response<StreamFullInfo> fullResult = pipe.xinfoStreamFull("streamfull2");

    pipe.sync();

    assertThat(pending.get(), hasSize(1));

    StreamFullInfo full = fullResult.get();
    assertEquals(1, full.getGroups().size());
    StreamGroupFullInfo group = full.getGroups().get(0);
    assertEquals("xreadGroup-group", group.getName());

    assertEquals(1, group.getPending().size());
    List<Object> groupPendingEntry = group.getPending().get(0);
    assertEquals(id1, groupPendingEntry.get(0));
    assertEquals("xreadGroup-consumer", groupPendingEntry.get(1));

    assertEquals(1, group.getConsumers().size());
    StreamConsumerFullInfo consumer = group.getConsumers().get(0);
    assertEquals("xreadGroup-consumer", consumer.getName());
    assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L));
    if (RedisVersionUtil.getRedisVersion(jedis).isGreaterThanOrEqualTo(RedisVersion.V7_0_0)) {
      assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L));
    }
    assertEquals(1, consumer.getPending().size());
    List<Object> consumerPendingEntry = consumer.getPending().get(0);
    assertEquals(id1, consumerPendingEntry.get(0));
  }
}