CommandObjectsStreamCommandsTest.java

package redis.clients.jedis.commands.commandobjects;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.junit.jupiter.api.Test;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XAutoClaimParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.StreamConsumerInfo;
import redis.clients.jedis.resps.StreamConsumersInfo;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamFullInfo;
import redis.clients.jedis.resps.StreamGroupInfo;
import redis.clients.jedis.resps.StreamInfo;
import redis.clients.jedis.resps.StreamPendingEntry;
import redis.clients.jedis.resps.StreamPendingSummary;

/**
 * Tests related to <a href="https://redis.io/commands/?group=stream">Stream</a> commands.
 */
public class CommandObjectsStreamCommandsTest extends CommandObjectsStandaloneTestBase {

  public CommandObjectsStreamCommandsTest(RedisProtocol protocol) {
    super(protocol);
  }

  @Test
  public void testXaddAndXlen() {
    String streamKey = "testStream";
    StreamEntryID entryID = StreamEntryID.NEW_ENTRY;

    Map<String, String> entryData = new HashMap<>();
    entryData.put("field1", "value1");
    entryData.put("field2", "value2");

    StreamEntryID addedEntryId = exec(commandObjects.xadd(streamKey, entryID, entryData));
    assertThat(addedEntryId, notNullValue());

    XAddParams params = new XAddParams().maxLen(1000);
    StreamEntryID addedEntryIdWithParams = exec(commandObjects.xadd(streamKey, params, entryData));
    assertThat(addedEntryIdWithParams, notNullValue());

    Long streamLength = exec(commandObjects.xlen(streamKey));
    assertThat(streamLength, equalTo(2L));
  }

  @Test
  public void testXaddAndXlenBinary() {
    byte[] streamKey = "streamKey".getBytes();

    Map<byte[], byte[]> entryData = new HashMap<>();
    entryData.put("field1".getBytes(), "value1".getBytes());
    entryData.put("field2".getBytes(), "value2".getBytes());

    XAddParams params = new XAddParams().maxLen(1000);
    byte[] addedEntryId = exec(commandObjects.xadd(streamKey, params, entryData));
    assertThat(addedEntryId, notNullValue());

    Long streamLengthBytes = exec(commandObjects.xlen(streamKey));
    assertThat(streamLengthBytes, equalTo(1L));
  }

  @Test
  public void testXrangeWithIdParameters() {
    String key = "testStream";

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData2));

    List<StreamEntry> xrangeAll = exec(commandObjects.xrange(key, null, (StreamEntryID) null));
    assertThat(xrangeAll.size(), equalTo(2));
    assertThat(xrangeAll.get(0).getFields(), equalTo(entryData1));
    assertThat(xrangeAll.get(1).getFields(), equalTo(entryData2));

    List<StreamEntry> xrangeAllCount = exec(commandObjects.xrange(key, null, (StreamEntryID) null, 1));
    assertThat(xrangeAllCount.size(), equalTo(1));
    assertThat(xrangeAllCount.get(0).getFields(), equalTo(entryData1));

    List<StreamEntry> xrangeStartEnd = exec(commandObjects.xrange(key, startID, endID));
    assertThat(xrangeStartEnd.size(), equalTo(2));
    assertThat(xrangeStartEnd.get(0).getFields(), equalTo(entryData1));
    assertThat(xrangeStartEnd.get(1).getFields(), equalTo(entryData2));

    List<StreamEntry> xrangeStartEndCount = exec(commandObjects.xrange(key, startID, endID, 1));
    assertThat(xrangeStartEndCount.size(), equalTo(1));
    assertThat(xrangeStartEndCount.get(0).getFields(), equalTo(entryData1));

    List<StreamEntry> xrangeUnknown = exec(commandObjects.xrange("nonExistingStream", null, (StreamEntryID) null));
    assertThat(xrangeUnknown, empty());
  }

  @Test
  public void testXrangeWithStringParameters() {
    String key = "testStreamWithString";

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData2));

    String start = startID.toString();
    String end = endID.toString();

    List<StreamEntry> xrangeStartEnd = exec(commandObjects.xrange(key, start, end));
    assertThat(xrangeStartEnd.size(), equalTo(2));
    assertThat(xrangeStartEnd.get(0).getFields(), equalTo(entryData1));
    assertThat(xrangeStartEnd.get(1).getFields(), equalTo(entryData2));

    List<StreamEntry> xrangeStartEndCount = exec(commandObjects.xrange(key, start, end, 1));
    assertThat(xrangeStartEndCount.size(), equalTo(1));
    assertThat(xrangeStartEndCount.get(0).getFields(), equalTo(entryData1));

    List<StreamEntry> xrangeUnknown = exec(commandObjects.xrange("nonExistingStream", start, end));
    assertThat(xrangeUnknown, empty());
  }

  @Test
  public void testXrangeWithBinaryParameters() {
    String keyStr = "testStreamWithBytes";
    byte[] key = keyStr.getBytes();

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, entryData2));

    byte[] start = startID.toString().getBytes();
    byte[] end = endID.toString().getBytes();

    List<Object> xrangeAll = exec(commandObjects.xrange(key, null, null));
    assertThat(xrangeAll, hasSize(2));
    assertThat(xrangeAll.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrangeAll.get(0)).get(0), equalTo(start));
    assertThat(((List<?>) xrangeAll.get(1)).get(0), equalTo(end));

    List<Object> xrangeStartEnd = exec(commandObjects.xrange(key, start, end));
    assertThat(xrangeStartEnd, hasSize(2));
    assertThat(xrangeStartEnd.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrangeStartEnd.get(0)).get(0), equalTo(start));
    assertThat(((List<?>) xrangeStartEnd.get(1)).get(0), equalTo(end));

    List<Object> xrangeAllCount = exec(commandObjects.xrange(key, null, null, 1));
    assertThat(xrangeAllCount, hasSize(1));
    assertThat(xrangeAllCount.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrangeAllCount.get(0)).get(0), equalTo(start));

    List<Object> xrangeStartEndCount = exec(commandObjects.xrange(key, start, end, 1));
    assertThat(xrangeStartEndCount, hasSize(1));
    assertThat(xrangeStartEndCount.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrangeStartEndCount.get(0)).get(0), equalTo(start));

    List<Object> xrangeUnknown = exec(commandObjects.xrange("nonExistingStream".getBytes(), start, end));
    assertThat(xrangeUnknown, empty());
  }

  @Test
  public void testXrevrangeWithIdParameters() {
    String key = "testStreamForXrevrange";

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData2));

    List<StreamEntry> xrevrangeAll = exec(commandObjects.xrevrange(key, null, (StreamEntryID) null));
    assertThat(xrevrangeAll.size(), equalTo(2));
    assertThat(xrevrangeAll.get(0).getFields(), equalTo(entryData2)); // The latest entry comes first
    assertThat(xrevrangeAll.get(1).getFields(), equalTo(entryData1));

    List<StreamEntry> xrevrangeAllCount = exec(commandObjects.xrevrange(key, null, (StreamEntryID) null, 1));
    assertThat(xrevrangeAllCount.size(), equalTo(1));
    assertThat(xrevrangeAllCount.get(0).getFields(), equalTo(entryData2)); // Only the latest entry is returned

    List<StreamEntry> xrevrangeEndStart = exec(commandObjects.xrevrange(key, endID, startID));
    assertThat(xrevrangeEndStart.size(), equalTo(2));
    assertThat(xrevrangeEndStart.get(0).getFields(), equalTo(entryData2));
    assertThat(xrevrangeEndStart.get(1).getFields(), equalTo(entryData1));

    List<StreamEntry> xrevrangeStartEndCount = exec(commandObjects.xrevrange(key, endID, startID, 1));
    assertThat(xrevrangeStartEndCount.size(), equalTo(1));
    assertThat(xrevrangeStartEndCount.get(0).getFields(), equalTo(entryData2));

    List<StreamEntry> xrevrangeUnknown = exec(commandObjects.xrevrange("nonExistingStream", null, (StreamEntryID) null));
    assertThat(xrevrangeUnknown, empty());
  }

  @Test
  public void testXrevrangeWithStringParameters() {
    String key = "testStreamForXrevrangeString";

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData2));

    String start = startID.toString();
    String end = endID.toString();

    List<StreamEntry> xrevrangeAll = exec(commandObjects.xrevrange(key, null, (StreamEntryID) null));
    assertThat(xrevrangeAll.size(), equalTo(2));
    assertThat(xrevrangeAll.get(0).getFields(), equalTo(entryData2)); // The latest entry comes first
    assertThat(xrevrangeAll.get(1).getFields(), equalTo(entryData1));

    List<StreamEntry> xrevrangeEndStart = exec(commandObjects.xrevrange(key, end, start));
    assertThat(xrevrangeEndStart.size(), equalTo(2));
    assertThat(xrevrangeEndStart.get(0).getFields(), equalTo(entryData2));
    assertThat(xrevrangeEndStart.get(1).getFields(), equalTo(entryData1));

    List<StreamEntry> xrevrangeAllCount = exec(commandObjects.xrevrange(key, null, (StreamEntryID) null, 1));
    assertThat(xrevrangeAllCount.size(), equalTo(1));
    assertThat(xrevrangeAllCount.get(0).getFields(), equalTo(entryData2));

    List<StreamEntry> xrevrangeEndStartCount = exec(commandObjects.xrevrange(key, end, start, 1));
    assertThat(xrevrangeEndStartCount.size(), equalTo(1));
    assertThat(xrevrangeEndStartCount.get(0).getFields(), equalTo(entryData2));

    List<StreamEntry> xrevrangeUnknown = exec(commandObjects.xrevrange("nonExistingStream", end, start));
    assertThat(xrevrangeUnknown, empty());
  }

  @Test
  public void testXrevrangeWithBinaryParameters() {
    String keyStr = "testStreamForXrevrangeBytes";
    byte[] key = keyStr.getBytes();

    Map<String, String> entryData1 = new HashMap<>();
    entryData1.put("field1", "value1");

    Map<String, String> entryData2 = new HashMap<>();
    entryData2.put("field2", "value2");

    StreamEntryID startID = exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, entryData1));
    StreamEntryID endID = exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, entryData2));

    byte[] start = startID.toString().getBytes();
    byte[] end = endID.toString().getBytes();

    List<Object> xrevrangeAll = exec(commandObjects.xrevrange(key, null, null));
    assertThat(xrevrangeAll, hasSize(2));
    assertThat(xrevrangeAll.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrevrangeAll.get(0)).get(0), equalTo(end));
    assertThat(((List<?>) xrevrangeAll.get(1)).get(0), equalTo(start));

    List<Object> xrevrangeEndStart = exec(commandObjects.xrevrange(key, end, start));
    assertThat(xrevrangeEndStart, hasSize(2));
    assertThat(xrevrangeEndStart.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrevrangeEndStart.get(0)).get(0), equalTo(end));
    assertThat(((List<?>) xrevrangeEndStart.get(1)).get(0), equalTo(start));

    List<Object> xrevrangeAllCount = exec(commandObjects.xrevrange(key, null, null, 1));
    assertThat(xrevrangeAllCount, hasSize(1));
    assertThat(xrevrangeAllCount.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrevrangeAllCount.get(0)).get(0), equalTo(end));

    List<Object> xrevrangeEndStartCount = exec(commandObjects.xrevrange(key, end, start, 1));
    assertThat(xrevrangeEndStartCount, hasSize(1));
    assertThat(xrevrangeEndStartCount.get(0), instanceOf(List.class));
    assertThat(((List<?>) xrevrangeEndStartCount.get(0)).get(0), equalTo(end));

    List<Object> xrevrangeUnknown = exec(commandObjects.xrevrange("nonExistingStream".getBytes(), end, start));
    assertThat(xrevrangeUnknown, empty());
  }

  @Test
  public void testXaddWithNullId() {
    String key = "testStreamWithString";

    // Add two entries, don't specify the IDs
    StreamEntryID firstId = exec(commandObjects.xadd(key, (StreamEntryID) null, Collections.singletonMap("field", "value")));
    assertThat(firstId, notNullValue());

    StreamEntryID secondId = exec(commandObjects.xadd(key, (StreamEntryID) null, Collections.singletonMap("field", "value")));
    assertThat(secondId, notNullValue());

    assertThat(secondId, not(equalTo(firstId)));
    assertThat(secondId.getSequence(), greaterThanOrEqualTo(firstId.getSequence()));

    List<StreamEntry> xrangeAll = exec(commandObjects.xrange(key, (StreamEntryID) null, null));
    assertThat(xrangeAll.size(), equalTo(2));
    assertThat(xrangeAll.get(0).getID(), equalTo(firstId));
    assertThat(xrangeAll.get(1).getID(), equalTo(secondId));
  }

  @Test
  public void testXackXpending() {
    String key = "testStreamForXackEffect";
    String group = "testGroup";
    String consumer = "testConsumer";

    Map<String, String> entryData = new HashMap<>();
    entryData.put("field1", "value1");

    exec(commandObjects.xgroupCreate(key, group, new StreamEntryID(), true));

    StreamEntryID entryID = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, entryData));

    Map<String, StreamEntryID> streams = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    XReadGroupParams params = new XReadGroupParams();

    List<Map.Entry<String, List<StreamEntry>>> messages = exec(commandObjects.xreadGroup(group, consumer, params, streams));

    assertThat(messages, hasSize(1));
    assertThat(messages.get(0).getKey(), equalTo(key));
    assertThat(messages.get(0).getValue(), hasSize(1));
    assertThat(messages.get(0).getValue().get(0).getID(), equalTo(entryID));

    StreamPendingSummary pendingSummary = exec(commandObjects.xpending(key, group));
    assertThat(pendingSummary.getTotal(), equalTo(1L));

    XPendingParams xPendingParams = new XPendingParams()
        .start(StreamEntryID.MINIMUM_ID).end(StreamEntryID.MAXIMUM_ID).count(1000);
    List<StreamPendingEntry> pendingSummaryWithParams = exec(commandObjects.xpending(key, group, xPendingParams));

    assertThat(pendingSummaryWithParams, hasSize(1));
    assertThat(pendingSummaryWithParams.get(0).getConsumerName(), equalTo(consumer));
    assertThat(pendingSummaryWithParams.get(0).getID(), equalTo(entryID));

    Long ack = exec(commandObjects.xack(key, group, entryID));
    assertThat(ack, equalTo(1L));

    pendingSummary = exec(commandObjects.xpending(key, group));
    assertThat(pendingSummary.getTotal(), equalTo(0L));

    pendingSummaryWithParams = exec(commandObjects.xpending(key, group, xPendingParams));
    assertThat(pendingSummaryWithParams, empty());
  }

  @Test
  public void testXackXPendingBinary() {
    String keyStr = "testStreamForXackEffect";
    byte[] key = keyStr.getBytes();
    byte[] group = "testGroup".getBytes();
    byte[] consumer = "testConsumer".getBytes();

    Map<String, String> entryData = new HashMap<>();
    entryData.put("field1", "value1");

    exec(commandObjects.xgroupCreate(key, group, new StreamEntryID().toString().getBytes(), true));

    StreamEntryID entryID = exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, entryData));

    Map.Entry<byte[], byte[]> stream = new AbstractMap.SimpleEntry<>(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY.toString().getBytes());

    XReadGroupParams params = new XReadGroupParams();

    List<Object> messages = exec(commandObjects.xreadGroup(group, consumer, params, stream));
    assertThat(messages, hasSize(1));

    Object pendingSummary = exec(commandObjects.xpending(key, group));

    assertThat(pendingSummary, instanceOf(List.class));
    assertThat(((List<?>) pendingSummary).get(0), equalTo(1L));

    XPendingParams xPendingParams = new XPendingParams()
        .start(StreamEntryID.MINIMUM_ID).end(StreamEntryID.MAXIMUM_ID).count(1000);

    List<Object> pendingList = exec(commandObjects.xpending(key, group, xPendingParams));
    assertThat(pendingList, hasSize(1));

    Long ack = exec(commandObjects.xack(key, group, entryID.toString().getBytes()));
    assertThat(ack, equalTo(1L));

    pendingSummary = exec(commandObjects.xpending(key, group));
    assertThat(pendingSummary, instanceOf(List.class));
    assertThat(((List<?>) pendingSummary).get(0), equalTo(0L));

    pendingList = exec(commandObjects.xpending(key, group, xPendingParams));
    assertThat(pendingList, empty());
  }

  @Test
  public void testXGroupSetID() {
    String key = "testStream";
    String groupName = "testGroup";

    StreamEntryID initialId = new StreamEntryID();
    StreamEntryID newId = new StreamEntryID("0-1");
    StreamEntryID newId2 = new StreamEntryID("0-2");

    exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value")));

    exec(commandObjects.xgroupCreate(key, groupName, initialId, false));

    List<StreamGroupInfo> groupIdBefore = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdBefore, hasSize(1));
    assertThat(groupIdBefore.get(0).getName(), equalTo(groupName));
    assertThat(groupIdBefore.get(0).getLastDeliveredId(), equalTo(initialId));

    String xgroupSetId = exec(commandObjects.xgroupSetID(key, groupName, newId));
    assertThat(xgroupSetId, equalTo("OK"));

    List<StreamGroupInfo> groupIdAfter = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfter, hasSize(1));
    assertThat(groupIdAfter.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfter.get(0).getLastDeliveredId(), equalTo(newId));

    String xgroupSetIdBinary = exec(commandObjects.xgroupSetID(key.getBytes(), groupName.getBytes(), newId2.toString().getBytes()));
    assertThat(xgroupSetIdBinary, equalTo("OK"));

    List<StreamGroupInfo> groupIdAfterBinary = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfterBinary, hasSize(1));
    assertThat(groupIdAfterBinary.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfterBinary.get(0).getLastDeliveredId(), equalTo(newId2));

    List<Object> binaryGroupIdAfterBinary = exec(commandObjects.xinfoGroups(key.getBytes()));
    assertThat(binaryGroupIdAfterBinary, notNullValue());
  }

  @Test
  public void testXGroupDestroy() {
    String key = "testStream";
    String groupName = "testGroup";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value")));

    exec(commandObjects.xgroupCreate(key, groupName, initialId, false));

    List<StreamGroupInfo> groupIdBefore = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdBefore, hasSize(1));
    assertThat(groupIdBefore.get(0).getName(), equalTo(groupName));
    assertThat(groupIdBefore.get(0).getLastDeliveredId(), equalTo(initialId));

    Long xgroupDestroy = exec(commandObjects.xgroupDestroy(key, groupName));
    assertThat(xgroupDestroy, equalTo(1L));

    List<StreamGroupInfo> groupInfoAfter = exec(commandObjects.xinfoGroups(key));
    assertThat(groupInfoAfter, empty());

    // Re-create the group
    exec(commandObjects.xgroupCreate(key, groupName, initialId, false));

    List<StreamGroupInfo> groupIdBeforeBinary = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdBeforeBinary, hasSize(1));
    assertThat(groupIdBeforeBinary.get(0).getName(), equalTo(groupName));
    assertThat(groupIdBeforeBinary.get(0).getLastDeliveredId(), equalTo(initialId));

    Long xgroupDestroyBinary = exec(commandObjects.xgroupDestroy(key.getBytes(), groupName.getBytes()));
    assertThat(xgroupDestroyBinary, equalTo(1L));

    List<StreamGroupInfo> groupInfoAfterBinary = exec(commandObjects.xinfoGroups(key));
    assertThat(groupInfoAfterBinary, empty());
  }

  @Test
  public void testXGroupConsumer() {
    String key = "testStream";
    String groupName = "testGroup";
    String consumerName = "testConsumer";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value")));

    exec(commandObjects.xgroupCreate(key, groupName, initialId, false));

    List<StreamGroupInfo> groupIdBefore = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdBefore, hasSize(1));
    assertThat(groupIdBefore.get(0).getName(), equalTo(groupName));
    assertThat(groupIdBefore.get(0).getConsumers(), equalTo(0L));

    Boolean createConsumer = exec(commandObjects.xgroupCreateConsumer(key, groupName, consumerName));
    assertThat(createConsumer, equalTo(true));

    List<StreamGroupInfo> groupIdAfterCreateConsumer = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfterCreateConsumer, hasSize(1));
    assertThat(groupIdAfterCreateConsumer.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfterCreateConsumer.get(0).getConsumers(), equalTo(1L));

    Long deleteConsumer = exec(commandObjects.xgroupDelConsumer(key, groupName, consumerName));
    assertThat(deleteConsumer, equalTo(0L));

    List<StreamGroupInfo> groupIdAfterDeleteConsumer = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfterDeleteConsumer, hasSize(1));
    assertThat(groupIdAfterDeleteConsumer.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfterDeleteConsumer.get(0).getConsumers(), equalTo(0L));

    Boolean createConsumerBinary = exec(commandObjects.xgroupCreateConsumer(
        key.getBytes(), groupName.getBytes(), consumerName.getBytes()));
    assertThat(createConsumerBinary, equalTo(true));

    List<StreamGroupInfo> groupIdAfterCreateConsumerBinary = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfterCreateConsumerBinary, hasSize(1));
    assertThat(groupIdAfterCreateConsumerBinary.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfterCreateConsumerBinary.get(0).getConsumers(), equalTo(1L));

    Long deleteConsumerBinary = exec(commandObjects.xgroupDelConsumer(
        key.getBytes(), groupName.getBytes(), consumerName.getBytes()));
    assertThat(deleteConsumerBinary, equalTo(0L));

    List<StreamGroupInfo> groupIdAfterDeleteConsumerBinary = exec(commandObjects.xinfoGroups(key));

    assertThat(groupIdAfterDeleteConsumerBinary, hasSize(1));
    assertThat(groupIdAfterDeleteConsumerBinary.get(0).getName(), equalTo(groupName));
    assertThat(groupIdAfterDeleteConsumerBinary.get(0).getConsumers(), equalTo(0L));
  }

  @Test
  public void testXDelWithStreamSize() {
    String key = "testStream";

    StreamEntryID id1 = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field1", "value1")));
    StreamEntryID id2 = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field2", "value2")));

    Long sizeBefore = exec(commandObjects.xlen(key));
    assertThat(sizeBefore, equalTo(2L));

    Long xdel = exec(commandObjects.xdel(key, id1, id2));
    assertThat(xdel, equalTo(2L));

    Long sizeAfterStringDeletion = exec(commandObjects.xlen(key));
    assertThat(sizeAfterStringDeletion, equalTo(0L));

    StreamEntryID id3 = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field3", "value3")));
    StreamEntryID id4 = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field4", "value4")));

    Long sizeBeforeBinaryDeletion = exec(commandObjects.xlen(key));
    assertThat(sizeBeforeBinaryDeletion, equalTo(2L));

    Long xdelBinary = exec(commandObjects.xdel(
        key.getBytes(), id3.toString().getBytes(), id4.toString().getBytes()));
    assertThat(xdelBinary, equalTo(2L));

    Long sizeAfterBinaryDeletion = exec(commandObjects.xlen(key));
    assertThat(sizeAfterBinaryDeletion, equalTo(0L));
  }

  @Test
  public void testXTrimCommands() {
    String key = "testStream";

    // Populate the stream with more entries than we intend to keep
    for (int i = 0; i < 10; i++) {
      exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field" + i, "value" + i)));
    }

    Long sizeBeforeTrim = exec(commandObjects.xlen(key));
    assertThat(sizeBeforeTrim, equalTo(10L));

    Long xtrim = exec(commandObjects.xtrim(key, 6, false));
    assertThat(xtrim, equalTo(4L));

    Long sizeAfterTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterTrim, equalTo(6L));

    Long xtrimApproximate = exec(commandObjects.xtrim(key, 3, true));
    assertThat(xtrimApproximate, lessThanOrEqualTo(3L));

    Long sizeAfterApproximateTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterApproximateTrim, greaterThanOrEqualTo(3L));
  }

  @Test
  public void testXTrimCommandsBinary() {
    String keyStr = "testStream";
    byte[] key = keyStr.getBytes();

    // Populate the stream with more entries than we intend to keep
    for (int i = 0; i < 10; i++) {
      exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field" + i, "value" + i)));
    }

    Long sizeBeforeBinaryTrim = exec(commandObjects.xlen(key));
    assertThat(sizeBeforeBinaryTrim, equalTo(10L));

    Long xtrimBinary = exec(commandObjects.xtrim(key, 6, false));
    assertThat(xtrimBinary, equalTo(4L));

    Long sizeAfterBinaryTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterBinaryTrim, equalTo(6L));

    Long xtrimApproximateBinary = exec(commandObjects.xtrim(key, 3, true));
    assertThat(xtrimApproximateBinary, lessThanOrEqualTo(3L));

    Long sizeAfterApproximateBinaryTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterApproximateBinaryTrim, greaterThanOrEqualTo(3L));
  }

  @Test
  public void testXTrimWithParams() {
    String key = "testStream";

    // Populate the stream with more entries than we intend to keep
    for (int i = 0; i < 10; i++) {
      exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field" + i, "value" + i)));
    }

    Long sizeBeforeTrim = exec(commandObjects.xlen(key));
    assertThat(sizeBeforeTrim, equalTo(10L));

    XTrimParams params = new XTrimParams().maxLen(6);

    Long xtrim = exec(commandObjects.xtrim(key, params));
    assertThat(xtrim, equalTo(4L));

    Long sizeAfterTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterTrim, equalTo(6L));
  }

  @Test
  public void testXTrimWithParamsBinary() {
    String keyStr = "testStream";
    byte[] key = keyStr.getBytes();

    // Populate the stream with more entries than we intend to keep
    for (int i = 0; i < 10; i++) {
      exec(commandObjects.xadd(keyStr, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field" + i, "value" + i)));
    }

    Long sizeBeforeTrim = exec(commandObjects.xlen(key));
    assertThat(sizeBeforeTrim, equalTo(10L));

    XTrimParams params = new XTrimParams().maxLen(6);

    Long xtrimBinary = exec(commandObjects.xtrim(key, params));
    assertThat(xtrimBinary, equalTo(4L));

    Long sizeAfterTrim = exec(commandObjects.xlen(key));
    assertThat(sizeAfterTrim, equalTo(6L));
  }

  @Test
  public void testXClaim() throws InterruptedException {
    String key = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xgroupCreate(key, group, initialId, true));

    StreamEntryID messageId = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value2")));

    // Consumer1 reads the message to make it pending
    Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Map.Entry<String, List<StreamEntry>>> readEntries = exec(
        commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    assertThat(readEntries, hasSize(1));
    assertThat(readEntries.get(0).getKey(), equalTo(key));
    assertThat(readEntries.get(0).getValue(), hasSize(1));
    assertThat(readEntries.get(0).getValue().get(0).getID(), equalTo(messageId));

    Thread.sleep(200); // Wait a bit

    // Claim the message for consumer2
    List<StreamEntry> claimedMessages = exec(
        commandObjects.xclaim(key, group, consumer2, 1, new XClaimParams(), messageId));

    assertThat(claimedMessages, hasSize(1));
    assertThat(claimedMessages.get(0).getID(), equalTo(messageId));
  }

  @Test
  public void testXClaimBinary() throws InterruptedException {
    String key = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xgroupCreate(key, group, initialId, true));

    StreamEntryID messageId = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value2")));

    // Consumer1 reads the message to make it pending
    Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Map.Entry<String, List<StreamEntry>>> readEntries = exec(
        commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    assertThat(readEntries, hasSize(1));
    assertThat(readEntries.get(0).getKey(), equalTo(key));
    assertThat(readEntries.get(0).getValue(), hasSize(1));
    assertThat(readEntries.get(0).getValue().get(0).getID(), equalTo(messageId));

    Thread.sleep(200); // Wait a bit

    byte[] bMessageId = messageId.toString().getBytes();

    // Claim the message for consumer2
    List<byte[]> claimedMessagesBytes = exec(
        commandObjects.xclaim(key.getBytes(), group.getBytes(), consumer2.getBytes(), 1, new XClaimParams(), bMessageId));
    assertThat(claimedMessagesBytes, hasSize(1));
    // Good luck with asserting the content of this!
  }

  @Test
  public void testXClaimJustId() throws InterruptedException {
    String key = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xgroupCreate(key, group, initialId, true));

    StreamEntryID messageId = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value2")));

    // Consumer1 reads the message to make it pending
    Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Map.Entry<String, List<StreamEntry>>> readEntries = exec(
        commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    assertThat(readEntries, hasSize(1));
    assertThat(readEntries.get(0).getKey(), equalTo(key));
    assertThat(readEntries.get(0).getValue(), hasSize(1));
    assertThat(readEntries.get(0).getValue().get(0).getID(), equalTo(messageId));

    Thread.sleep(200); // Wait a bit

    // Claim the message for consumer2 with String parameters
    List<StreamEntryID> claimedMessagesString = exec(
        commandObjects.xclaimJustId(key, group, consumer2, 1, new XClaimParams(), messageId));
    assertThat(claimedMessagesString, hasSize(1));
    assertThat(claimedMessagesString.get(0), equalTo(messageId));
  }

  @Test
  public void testXClaimJustIdBinary() throws InterruptedException {
    String key = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    StreamEntryID initialId = new StreamEntryID();

    exec(commandObjects.xgroupCreate(key, group, initialId, true));

    StreamEntryID messageId = exec(commandObjects.xadd(key, StreamEntryID.NEW_ENTRY, Collections.singletonMap("field", "value2")));

    // Consumer1 reads the message to make it pending
    Map<String, StreamEntryID> stream = Collections.singletonMap(key, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    List<Map.Entry<String, List<StreamEntry>>> readEntries = exec(
        commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    assertThat(readEntries, hasSize(1));
    assertThat(readEntries.get(0).getKey(), equalTo(key));
    assertThat(readEntries.get(0).getValue(), hasSize(1));
    assertThat(readEntries.get(0).getValue().get(0).getID(), equalTo(messageId));

    Thread.sleep(200); // Wait a bit

    byte[] bMessageId = messageId.toString().getBytes();

    // Claim the message for consumer2 with byte[] parameters
    List<byte[]> claimedMessagesBytes = exec(
        commandObjects.xclaimJustId(key.getBytes(), group.getBytes(), consumer2.getBytes(), 1, new XClaimParams(), bMessageId));
    assertThat(claimedMessagesBytes, hasSize(1));
    // Good luck with asserting the content of this!
  }

  @Test
  public void testXAutoClaim() throws InterruptedException {
    String streamKey = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID(), true));

    Map<String, String> messageBody = Collections.singletonMap("field", "value");
    StreamEntryID initialEntryId = exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));

    Map<String, StreamEntryID> stream = Collections.singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    exec(commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    Thread.sleep(200); // Wait a bit

    StreamEntryID startId = new StreamEntryID(initialEntryId.getTime() - 1, initialEntryId.getSequence());
    XAutoClaimParams params = new XAutoClaimParams().count(1);

    // Auto claim message for consumer2
    Map.Entry<StreamEntryID, List<StreamEntry>> autoClaimResult = exec(
        commandObjects.xautoclaim(streamKey, group, consumer2, 1, startId, params));

    assertThat(autoClaimResult.getValue(), hasSize(1));
    assertThat(autoClaimResult.getValue().get(0).getFields(), equalTo(messageBody));
  }

  @Test
  public void testXAutoClaimBinary() throws InterruptedException {
    byte[] streamKey = "testStream".getBytes();
    byte[] group = "testGroup".getBytes();
    byte[] consumer1 = "consumer1".getBytes();
    byte[] consumer2 = "consumer2".getBytes();

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID().toString().getBytes(), true));

    Map<byte[], byte[]> messageBody = Collections.singletonMap("field".getBytes(), "value".getBytes());
    byte[] initialEntryId = exec(commandObjects.xadd(streamKey, new XAddParams(), messageBody));

    Map.Entry<byte[], byte[]> entry = new AbstractMap.SimpleEntry<>(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY.toString().getBytes());
    exec(commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), entry));

    Thread.sleep(200); // Wait a bit

    StreamEntryID initialStreamEntryID = new StreamEntryID(new String(initialEntryId));
    byte[] startId = new StreamEntryID(initialStreamEntryID.getTime() - 1, 0).toString().getBytes();
    XAutoClaimParams params = new XAutoClaimParams().count(1);

    // Auto claim message for consumer2 in binary
    List<Object> autoClaimResultBinary = exec(commandObjects.xautoclaim(streamKey, group, consumer2, 1, startId, params));
    assertThat(autoClaimResultBinary, not(empty()));
  }

  @Test
  public void testXAutoClaimJustId() throws InterruptedException {
    String streamKey = "testStream";
    String group = "testGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID(), true));

    Map<String, String> messageBody = Collections.singletonMap("fieldSingle", "valueSingle");
    StreamEntryID initialEntryId = exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));

    Map<String, StreamEntryID> stream = Collections.singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    exec(commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    Thread.sleep(200); // Wait a bit

    StreamEntryID startId = new StreamEntryID(initialEntryId.getTime() - 1, initialEntryId.getSequence());
    XAutoClaimParams params = new XAutoClaimParams().count(1);

    Map.Entry<StreamEntryID, List<StreamEntryID>> autoClaimResult = exec(
        commandObjects.xautoclaimJustId(streamKey, group, consumer2, 1, startId, params));

    assertThat(autoClaimResult.getValue(), hasSize(1));
    assertThat(autoClaimResult.getValue().get(0), equalTo(initialEntryId));
  }

  @Test
  public void testXAutoClaimJustIdBinary() throws InterruptedException {
    byte[] streamKey = "testStream".getBytes();
    byte[] group = "testGroup".getBytes();
    byte[] consumer1 = "consumer1".getBytes();
    byte[] consumer2 = "consumer2".getBytes();

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID().toString().getBytes(), true));

    Map<byte[], byte[]> messageBody = Collections.singletonMap("fieldBinary".getBytes(), "valueBinary".getBytes());
    byte[] initialEntryId = exec(commandObjects.xadd(streamKey, new XAddParams(), messageBody));

    Map.Entry<byte[], byte[]> stream = new AbstractMap.SimpleEntry<>(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY.toString().getBytes());
    exec(commandObjects.xreadGroup(group, consumer1, new XReadGroupParams().count(1), stream));

    Thread.sleep(200); // Wait a bit

    StreamEntryID initialStreamEntryID = new StreamEntryID(new String(initialEntryId));
    byte[] startId = new StreamEntryID(initialStreamEntryID.getTime() - 1, 0).toString().getBytes();
    XAutoClaimParams params = new XAutoClaimParams().count(1);

    List<Object> autoClaimResultBinary = exec(
        commandObjects.xautoclaimJustId(streamKey, group, consumer2, 1, startId, params));
    assertThat(autoClaimResultBinary, not(empty()));
  }

  @Test
  public void testXInfoStream() {
    String streamKey = "testStreamInfo";

    Map<String, String> messageBody = Collections.singletonMap("fieldInfo", "valueInfo");

    exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));

    StreamInfo streamInfo = exec(commandObjects.xinfoStream(streamKey));

    assertThat(streamInfo, notNullValue());
    assertThat(streamInfo.getLength(), equalTo(1L));
    assertThat(streamInfo.getFirstEntry().getFields(), equalTo(messageBody));

    Object streamInfoBinary = exec(commandObjects.xinfoStream(streamKey.getBytes()));
    assertThat(streamInfoBinary, notNullValue());
  }

  @Test
  public void testXInfoStreamFull() {
    String streamKey = "testStreamFullInfo";

    Map<String, String> messageBody = Collections.singletonMap("fieldFull", "valueFull");

    exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));

    StreamFullInfo streamFullInfo = exec(commandObjects.xinfoStreamFull(streamKey));

    assertThat(streamFullInfo, notNullValue());
    assertThat(streamFullInfo.getEntries(), not(empty()));
    assertThat(streamFullInfo.getEntries().get(0).getFields(), equalTo(messageBody));

    StreamFullInfo streamFullInfoWithCount = exec(commandObjects.xinfoStreamFull(streamKey, 1));
    assertThat(streamFullInfoWithCount, notNullValue());
    assertThat(streamFullInfoWithCount.getEntries(), hasSize(1));

    Object streamInfoBinaryFull = exec(commandObjects.xinfoStreamFull(streamKey.getBytes()));
    assertThat(streamInfoBinaryFull, notNullValue());

    Object streamInfoBinaryFullWithCount = exec(commandObjects.xinfoStreamFull(streamKey.getBytes(), 1));
    assertThat(streamInfoBinaryFullWithCount, notNullValue());
  }

  @Test
  @Deprecated
  public void testXInfoConsumersWithActiveConsumers() {
    String streamKey = "testStreamWithConsumers";
    String group = "testConsumerGroup";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    Map<String, String> messageBody1 = Collections.singletonMap("field1", "value1");
    Map<String, String> messageBody2 = Collections.singletonMap("field2", "value2");

    exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody1));
    exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody2));

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID(), true));

    XReadGroupParams xReadGroupParams = new XReadGroupParams().count(1);
    Map<String, StreamEntryID> stream = Collections.singletonMap(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
    exec(commandObjects.xreadGroup(group, consumer1, xReadGroupParams, stream));
    exec(commandObjects.xreadGroup(group, consumer2, xReadGroupParams, stream));

    List<StreamConsumersInfo> consumersInfoList = exec(commandObjects.xinfoConsumers(streamKey, group));
    assertThat(consumersInfoList, notNullValue());
    assertThat(consumersInfoList, hasSize(2));

    Optional<StreamConsumersInfo> consumersInfo1 = consumersInfoList.stream().filter(c -> c.getName().equals(consumer1)).findFirst();
    Optional<StreamConsumersInfo> consumersInfo2 = consumersInfoList.stream().filter(c -> c.getName().equals(consumer2)).findFirst();

    assertThat(consumersInfo1.isPresent(), equalTo(true));
    assertThat(consumersInfo1.get().getPending(), equalTo(1L));

    assertThat(consumersInfo2.isPresent(), equalTo(true));
    assertThat(consumersInfo2.get().getPending(), equalTo(1L));

    List<StreamConsumerInfo> consumerInfoList = exec(commandObjects.xinfoConsumers2(streamKey, group));
    assertThat(consumerInfoList, notNullValue());
    assertThat(consumerInfoList, hasSize(2));

    Optional<StreamConsumerInfo> consumerInfo1 = consumerInfoList.stream().filter(c -> c.getName().equals(consumer1)).findFirst();
    Optional<StreamConsumerInfo> consumerInfo2 = consumerInfoList.stream().filter(c -> c.getName().equals(consumer2)).findFirst();

    assertThat(consumerInfo1.isPresent(), equalTo(true));
    assertThat(consumerInfo1.get().getPending(), equalTo(1L));

    assertThat(consumerInfo2.isPresent(), equalTo(true));
    assertThat(consumerInfo2.get().getPending(), equalTo(1L));

    List<Object> consumersInfoBinary = exec(commandObjects.xinfoConsumers(streamKey.getBytes(), group.getBytes()));
    assertThat(consumersInfoBinary, notNullValue());
  }

  @Test
  public void testXRead() {
    String streamKey1 = "testStream1";
    String streamKey2 = "testStream2";

    Map<String, String> messageBody1 = Collections.singletonMap("field1", "value1");
    Map<String, String> messageBody2 = Collections.singletonMap("field2", "value2");

    StreamEntryID messageId1 = exec(commandObjects.xadd(streamKey1, StreamEntryID.NEW_ENTRY, messageBody1));
    StreamEntryID messageId2 = exec(commandObjects.xadd(streamKey2, StreamEntryID.NEW_ENTRY, messageBody2));

    XReadParams params = XReadParams.xReadParams().count(1).block(1000);
    Map<String, StreamEntryID> streams = new HashMap<>();
    streams.put(streamKey1, new StreamEntryID());
    streams.put(streamKey2, new StreamEntryID());

    List<Map.Entry<String, List<StreamEntry>>> xread = exec(commandObjects.xread(params, streams));

    assertThat(xread, not(empty()));
    assertThat(xread.size(), equalTo(2));
    assertThat(xread.get(0).getKey(), equalTo(streamKey1));
    assertThat(xread.get(1).getKey(), equalTo(streamKey2));
    assertThat(xread.get(0).getValue().get(0).getID(), equalTo(messageId1));
    assertThat(xread.get(1).getValue().get(0).getID(), equalTo(messageId2));
    assertThat(xread.get(0).getValue().get(0).getFields(), equalTo(messageBody1));
    assertThat(xread.get(1).getValue().get(0).getFields(), equalTo(messageBody2));

    byte[] streamKey1Binary = streamKey1.getBytes();
    byte[] streamKey2Binary = streamKey2.getBytes();
    Map.Entry<byte[], byte[]> stream1 = new AbstractMap.SimpleEntry<>(streamKey1Binary, new StreamEntryID().toString().getBytes());
    Map.Entry<byte[], byte[]> stream2 = new AbstractMap.SimpleEntry<>(streamKey2Binary, new StreamEntryID().toString().getBytes());

    List<Object> xreadBinary = exec(commandObjects.xread(params, stream1, stream2));
    assertThat(xreadBinary, not(empty()));
  }

  @Test
  public void testXReadAsMap() {
    String streamKey1 = "testStreamMap1";
    String streamKey2 = "testStreamMap2";

    Map<String, String> messageBody1 = Collections.singletonMap("fieldMap1", "valueMap1");
    Map<String, String> messageBody2 = Collections.singletonMap("fieldMap2", "valueMap2");

    exec(commandObjects.xadd(streamKey1, StreamEntryID.NEW_ENTRY, messageBody1));
    exec(commandObjects.xadd(streamKey2, StreamEntryID.NEW_ENTRY, messageBody2));

    XReadParams params = new XReadParams().count(1).block(1000);

    Map<String, StreamEntryID> streams = new HashMap<>();
    streams.put(streamKey1, new StreamEntryID());
    streams.put(streamKey2, new StreamEntryID());

    Map<String, List<StreamEntry>> xreadAsMap = exec(commandObjects.xreadAsMap(params, streams));
    assertThat(xreadAsMap, notNullValue());
    assertThat(xreadAsMap.keySet(), hasSize(2)); // Expecting keys for both streams
    assertThat(xreadAsMap.get(streamKey1).get(0).getFields(), equalTo(messageBody1));
    assertThat(xreadAsMap.get(streamKey2).get(0).getFields(), equalTo(messageBody2));
  }

  @Test
  public void testXReadGroupAsMap() {
    String streamKey = "testStreamGroupMap";
    String group = "testGroupMap";
    String consumer1 = "testConsumerMap1";
    String consumer2 = "testConsumerMap2";

    Map<String, String> messageBody = Collections.singletonMap("fieldGroupMap", "valueGroupMap");

    exec(commandObjects.xgroupCreate(streamKey, group, new StreamEntryID(), true));

    StreamEntryID initialMessageId = exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));
    StreamEntryID secondMessageId = exec(commandObjects.xadd(streamKey, StreamEntryID.NEW_ENTRY, messageBody));

    XReadGroupParams params = new XReadGroupParams().count(1);

    Map<String, StreamEntryID> streams = new HashMap<>();
    streams.put(streamKey, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);

    Map<String, List<StreamEntry>> xreadGroupConsumer1 = exec(commandObjects.xreadGroupAsMap(group, consumer1, params, streams));

    assertThat(xreadGroupConsumer1, notNullValue());
    assertThat(xreadGroupConsumer1.keySet(), hasSize(1));
    assertThat(xreadGroupConsumer1.get(streamKey), not(empty()));
    assertThat(xreadGroupConsumer1.get(streamKey).get(0).getID(), equalTo(initialMessageId));
    assertThat(xreadGroupConsumer1.get(streamKey).get(0).getFields(), equalTo(messageBody));

    Map<String, List<StreamEntry>> xreadGroupConsumer2 = exec(commandObjects.xreadGroupAsMap(group, consumer2, params, streams));

    assertThat(xreadGroupConsumer2, notNullValue());
    assertThat(xreadGroupConsumer2.keySet(), hasSize(1)); // Expecting keys for the stream
    assertThat(xreadGroupConsumer2.get(streamKey), not(empty())); // Expecting at least one message
    assertThat(xreadGroupConsumer2.get(streamKey).get(0).getID(), equalTo(secondMessageId));
    assertThat(xreadGroupConsumer2.get(streamKey).get(0).getFields(), equalTo(messageBody));
  }
}