StreamsCommandsTestBase.java
package redis.clients.jedis.commands.unified;
import io.redis.test.annotations.SinceRedisVersion;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import redis.clients.jedis.RedisProtocol;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.args.StreamDeletionPolicy;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XTrimParams;
import redis.clients.jedis.resps.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public abstract class StreamsCommandsTestBase extends UnifiedJedisCommandsTestBase {
protected static final String STREAM_KEY_1 = "{stream}-1";
protected static final String STREAM_KEY_2 = "{stream}-2";
protected static final String GROUP_NAME = "group-1";
protected static final String CONSUMER_NAME = "consumer-1";
protected static final String FIELD_KEY_1 = "field-1";
protected static final String VALUE_1 = "value-1";
protected static final String FIELD_KEY_2 = "field-2";
protected static final String VALUE_2 = "value-2";
protected static final Map<String, String> HASH_1 = singletonMap(FIELD_KEY_1, VALUE_1);
protected static final Map<String, String> HASH_2 = singletonMap(FIELD_KEY_2, VALUE_2);
public StreamsCommandsTestBase(RedisProtocol protocol) {
super(protocol);
}
/**
* Populates a test stream with values using the i-0 format
* @param streamKey The stream key to populate
* @param count Number of entries to add
* @param map Map of field-value pairs for each entry
*/
protected void populateTestStreamWithValues(String streamKey, int count,
Map<String, String> map) {
for (int i = 1; i <= count; i++) {
jedis.xadd(streamKey, XAddParams.xAddParams().id(new StreamEntryID(i + "-0")), map);
}
assertEquals(count, jedis.xlen(streamKey));
}
@BeforeEach
public void setUp() {
setUpTestStream();
}
private void setUpTestStream() {
jedis.del(STREAM_KEY_1);
jedis.del(STREAM_KEY_2);
try {
jedis.xgroupCreate(STREAM_KEY_1, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true);
} catch (JedisDataException e) {
if (!e.getMessage().contains("BUSYGROUP")) {
throw e;
}
}
try {
jedis.xgroupCreate(STREAM_KEY_2, GROUP_NAME, StreamEntryID.XGROUP_LAST_ENTRY, true);
} catch (JedisDataException e) {
if (!e.getMessage().contains("BUSYGROUP")) {
throw e;
}
}
}
// ========== XADD Command Tests ==========
@Test
public void xaddBasic() {
setUpTestStream();
// Test basic XADD with auto-generated ID
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1);
assertNotNull(id1);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
// Test XADD with multiple fields
Map<String, String> multiFieldHash = new HashMap<>();
multiFieldHash.put("field1", "value1");
multiFieldHash.put("field2", "value2");
multiFieldHash.put("field3", "value3");
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, multiFieldHash);
assertNotNull(id2);
assertTrue(id2.compareTo(id1) > 0);
assertEquals(2L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xaddWithSpecificId() {
setUpTestStream();
// Test XADD with specific ID
StreamEntryID specificId = new StreamEntryID("1000-0");
StreamEntryID resultId = jedis.xadd(STREAM_KEY_1, specificId, HASH_1);
assertEquals(specificId, resultId);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
// Test XADD with ID that must be greater than previous
StreamEntryID nextId = new StreamEntryID("1001-0");
StreamEntryID resultId2 = jedis.xadd(STREAM_KEY_1, nextId, HASH_2);
assertEquals(nextId, resultId2);
assertEquals(2L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xaddWithParams() {
setUpTestStream();
// Test XADD with maxLen parameter
populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1);
// Add with maxLen=3, should trim to 3 entries
StreamEntryID id6 = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3), HASH_2);
assertNotNull(id6);
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xaddErrorCases() {
setUpTestStream();
// Test XADD with empty hash should fail
try {
Map<String, String> emptyHash = new HashMap<>();
jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, emptyHash);
fail("Should throw JedisDataException for empty hash");
} catch (JedisDataException expected) {
assertTrue(expected.getMessage().contains("wrong number of arguments"));
}
// Test XADD with noMkStream on non-existent stream
StreamEntryID result = jedis.xadd("non-existent-stream", XAddParams.xAddParams().noMkStream(),
HASH_1);
assertNull(result);
}
@ParameterizedTest
@CsvSource({ "KEEP_REFERENCES,3", "DELETE_REFERENCES,0" })
@SinceRedisVersion("8.1.240")
public void xaddWithTrimmingMode(StreamDeletionPolicy trimMode, int expected) {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries to the stream
populateTestStreamWithValues(STREAM_KEY_1, 5, map);
// Create consumer group and read messages to create PEL entries
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3),
streamQuery);
// Verify PEL has entries
List<StreamPendingEntry> pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(3, pendingBefore.size());
// Add new entry with maxLen=3 and KEEP_REFERENCES mode
StreamEntryID newId = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3).trimmingMode(trimMode), map);
assertNotNull(newId);
// Stream should be trimmed to 3 entries
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(expected, pendingAfter.size());
}
@Test
@SinceRedisVersion("8.1.240")
public void xaddWithTrimmingModeAcknowledged() {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries to the stream
populateTestStreamWithValues(STREAM_KEY_1, 5, map);
// Create consumer group and read messages
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streamQuery);
// Acknowledge the first 2 messages
StreamEntryID id1 = messages.get(0).getValue().get(0).getID();
StreamEntryID id2 = messages.get(0).getValue().get(1).getID();
jedis.xack(STREAM_KEY_1, GROUP_NAME, id1, id2);
// Verify PEL state
List<StreamPendingEntry> pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(1, pendingBefore.size()); // Only 1 unacknowledged message
// Add new entry with maxLen=3 and ACKNOWLEDGED mode
StreamEntryID newId = jedis.xadd(STREAM_KEY_1, XAddParams.xAddParams()
.id(new StreamEntryID("6-0")).maxLen(3).trimmingMode(StreamDeletionPolicy.ACKNOWLEDGED),
map);
assertNotNull(newId);
// Stream length should respect acknowledgment status
long streamLen = jedis.xlen(STREAM_KEY_1);
assertEquals(4, streamLen); // Should not trim unacknowledged entries aggressively
// PEL should still contain unacknowledged entries
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(1, pendingAfter.size()); // Unacknowledged entries should remain
}
// ========== XTRIM Command Tests ==========
@Test
public void xtrimBasic() {
setUpTestStream();
// Add test entries
populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1);
// Test basic XTRIM with maxLen
long trimmed = jedis.xtrim(STREAM_KEY_1, 3, false);
assertEquals(2L, trimmed); // Should trim 2 entries
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xtrimWithParams() {
setUpTestStream();
// Add test entries with specific IDs
populateTestStreamWithValues(STREAM_KEY_1, 5, HASH_1);
// Test XTRIM with XTrimParams and exact trimming
long trimmed = jedis.xtrim(STREAM_KEY_1, XTrimParams.xTrimParams().maxLen(3).exactTrimming());
assertEquals(2L, trimmed);
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
// Test XTRIM with minId - use "4-0" since we have entries 1-0, 2-0, 3-0, 4-0, 5-0
long trimmed2 = jedis.xtrim(STREAM_KEY_1,
XTrimParams.xTrimParams().minId("4-0").exactTrimming());
assertEquals(1L, trimmed2); // Should trim entries with ID < 4-0 (only 3-0 should be trimmed)
assertEquals(2L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xtrimApproximate() {
setUpTestStream();
// Add many entries
populateTestStreamWithValues(STREAM_KEY_1, 10, HASH_1);
// Test approximate trimming
long trimmed = jedis.xtrim(STREAM_KEY_1, 5, true);
assertTrue(trimmed >= 0); // Approximate trimming may trim different amounts
assertTrue(jedis.xlen(STREAM_KEY_1) <= 10); // Should not exceed original length
}
@ParameterizedTest
@CsvSource({ "KEEP_REFERENCES,3", "DELETE_REFERENCES,1" })
@SinceRedisVersion("8.1.240")
public void xaddWithMinIdTrimmingMode(StreamDeletionPolicy trimMode, int expected) {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries with specific IDs
populateTestStreamWithValues(STREAM_KEY_1, 5, map);
// Create consumer group and read messages to create PEL entries
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3),
streamQuery);
// Verify PEL has entries
List<StreamPendingEntry> pendingBefore = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(3, pendingBefore.size());
// Add new entry with minId="3-0" and specified trimming mode
StreamEntryID newId = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("6-0")).minId("3-0").trimmingMode(trimMode),
map);
assertNotNull(newId);
// Stream should have entries >= 3-0 plus the new entry
long streamLen = jedis.xlen(STREAM_KEY_1);
assertTrue(streamLen >= 3);
// Check PEL entries based on trimming mode
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(expected, pendingAfter.size());
}
@Test
@SinceRedisVersion("8.1.240")
public void xaddWithApproximateTrimmingAndTrimmingMode() {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries
populateTestStreamWithValues(STREAM_KEY_1, 10, map);
// Create consumer group and read messages
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5),
streamQuery);
// Add new entry with approximate trimming and KEEP_REFERENCES mode
StreamEntryID newId = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("11-0")).maxLen(5).approximateTrimming()
.trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES),
map);
assertNotNull(newId);
// With approximate trimming, the exact length may vary but should be around the target
long streamLen = jedis.xlen(STREAM_KEY_1);
assertTrue(streamLen >= 5); // Should be approximately 5, but may be more due to approximation
// PEL should preserve references
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL
}
@Test
@SinceRedisVersion("8.1.240")
public void xaddWithExactTrimmingAndTrimmingMode() {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries
populateTestStreamWithValues(STREAM_KEY_1, 5, map);
// Create consumer group and read messages
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3),
streamQuery);
// Add new entry with exact trimming and DELETE_REFERENCES mode
StreamEntryID newId = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("6-0")).maxLen(3).exactTrimming()
.trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES),
map);
assertNotNull(newId);
// With exact trimming, stream should be exactly 3 entries
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
// PEL references should be cleaned up for trimmed entries
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
// Only entries that still exist in the stream should remain in PEL
assertTrue(pendingAfter.size() <= 3);
}
@Test
@SinceRedisVersion("8.1.240")
public void xaddWithLimitAndTrimmingMode() {
setUpTestStream();
Map<String, String> map = singletonMap("field", "value");
// Add initial entries
populateTestStreamWithValues(STREAM_KEY_1, 10, map);
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(5),
streamQuery);
// Add new entry with limit and KEEP_REFERENCES mode (limit requires approximate trimming)
StreamEntryID newId = jedis.xadd(STREAM_KEY_1,
XAddParams.xAddParams().id(new StreamEntryID("11-0")).maxLen(5).approximateTrimming() // Required
// for
// limit
// to
// work
.limit(2) // Limit the number of entries to examine for trimming
.trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES),
map);
assertNotNull(newId);
// With limit, trimming may be less aggressive
long streamLen = jedis.xlen(STREAM_KEY_1);
assertTrue(streamLen >= 5); // Should be at least 5, but may be more due to limit
// PEL should preserve references
List<StreamPendingEntry> pendingAfter = jedis.xpending(STREAM_KEY_1, GROUP_NAME,
XPendingParams.xPendingParams().count(10));
assertEquals(5, pendingAfter.size()); // All read messages should remain in PEL
}
// ========== XACK Command Tests ==========
@Test
public void xackBasic() {
setUpTestStream();
// Add a message to the stream
StreamEntryID messageId = jedis.xadd(STREAM_KEY_1, StreamEntryID.NEW_ENTRY, HASH_1);
assertNotNull(messageId);
// Consumer group already created in setUpTestStream(), just read message
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams);
assertEquals(1, messages.size());
assertEquals(1, messages.get(0).getValue().size());
StreamEntryID readMessageId = messages.get(0).getValue().get(0).getID();
// Test XACK
long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, readMessageId);
assertEquals(1L, acked);
}
@Test
public void xackMultipleMessages() {
setUpTestStream();
// Add multiple messages
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
// Consumer group already created in setUpTestStream(), just read messages
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams);
assertEquals(1, messages.size());
assertEquals(2, messages.get(0).getValue().size());
// Test XACK with multiple IDs
StreamEntryID readId1 = messages.get(0).getValue().get(0).getID();
StreamEntryID readId2 = messages.get(0).getValue().get(1).getID();
long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1, readId2);
assertEquals(2L, acked);
}
@Test
public void xackNonExistentMessage() {
setUpTestStream();
// Consumer group already created in setUpTestStream()
// Test XACK with non-existent message ID
StreamEntryID nonExistentId = new StreamEntryID("999-0");
long acked = jedis.xack(STREAM_KEY_1, GROUP_NAME, nonExistentId);
assertEquals(0L, acked); // Should return 0 for non-existent message
}
// ========== XDEL Command Tests ==========
@Test
public void xdelBasic() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
assertEquals(2L, jedis.xlen(STREAM_KEY_1));
// Test XDEL with single ID
long deleted = jedis.xdel(STREAM_KEY_1, id1);
assertEquals(1L, deleted);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xdelMultipleEntries() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1);
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
// Test XDEL with multiple IDs
long deleted = jedis.xdel(STREAM_KEY_1, id1, id3);
assertEquals(2L, deleted);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xdelNonExistentEntries() {
setUpTestStream();
// Add one entry
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
// Test XDEL with mix of existing and non-existent IDs
StreamEntryID nonExistentId = new StreamEntryID("999-0");
long deleted = jedis.xdel(STREAM_KEY_1, id1, nonExistentId);
assertEquals(1L, deleted); // Should only delete the existing entry
assertEquals(0L, jedis.xlen(STREAM_KEY_1));
}
@Test
public void xdelEmptyStream() {
setUpTestStream();
// Test XDEL on empty stream
StreamEntryID nonExistentId = new StreamEntryID("1-0");
long deleted = jedis.xdel(STREAM_KEY_1, nonExistentId);
assertEquals(0L, deleted);
}
// ========== XACKDEL Command Tests ==========
@Test
@SinceRedisVersion("8.1.240")
public void xackdelBasic() {
setUpTestStream();
// Add a message to the stream
StreamEntryID messageId = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
assertNotNull(messageId);
// Consumer group already created in setUpTestStream(), read message
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(1), streams);
assertEquals(1, messages.size());
assertEquals(1, messages.get(0).getValue().size());
StreamEntryID readMessageId = messages.get(0).getValue().get(0).getID();
// Test XACKDEL - should acknowledge and delete the message
List<StreamEntryDeletionResult> results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME,
readMessageId);
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
// Verify message is deleted from stream
assertEquals(0L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xackdelWithTrimMode() {
setUpTestStream();
// Add multiple messages
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
// Consumer group already created, read messages
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams);
assertEquals(1, messages.size());
assertEquals(2, messages.get(0).getValue().size());
// Test XACKDEL with KEEP_REFERENCES mode
StreamEntryID readId1 = messages.get(0).getValue().get(0).getID();
List<StreamEntryDeletionResult> results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME,
StreamDeletionPolicy.KEEP_REFERENCES, readId1);
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
// Verify one message is deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xackdelUnreadMessages() {
setUpTestStream();
// Add test entries but don't read them
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
// Test XACKDEL on unread messages - should return NOT_FOUND for PEL
List<StreamEntryDeletionResult> results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, id1);
assertThat(results, hasSize(1));
// Should return NOT_FOUND because message was never read by the consumer group
assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0));
// Stream should still contain the message
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xackdelMultipleMessages() {
setUpTestStream();
// Add multiple messages
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1);
// Read all messages
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(3), streams);
assertEquals(1, messages.size());
assertEquals(3, messages.get(0).getValue().size());
// Test XACKDEL with multiple IDs
StreamEntryID readId1 = messages.get(0).getValue().get(0).getID();
StreamEntryID readId2 = messages.get(0).getValue().get(1).getID();
List<StreamEntryDeletionResult> results = jedis.xackdel(STREAM_KEY_1, GROUP_NAME, readId1,
readId2);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(1));
// Verify two messages are deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
// ========== XDELEX Command Tests ==========
@Test
@SinceRedisVersion("8.1.240")
public void xdelexBasic() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
assertEquals(2L, jedis.xlen(STREAM_KEY_1));
// Test basic XDELEX without parameters (should behave like XDEL with KEEP_REFERENCES)
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, id1);
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
// Verify entry is deleted from stream
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexWithTrimMode() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
// Test XDELEX with DELETE_REFERENCES mode
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1,
StreamDeletionPolicy.DELETE_REFERENCES, id1);
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
// Verify entry is deleted from stream
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexMultipleEntries() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
StreamEntryID id3 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("3-0"), HASH_1);
assertEquals(3L, jedis.xlen(STREAM_KEY_1));
// Test XDELEX with multiple IDs
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, id1, id3);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(1));
// Verify two entries are deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexNonExistentEntries() {
setUpTestStream();
// Add one entry
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
// Test XDELEX with mix of existing and non-existent IDs
StreamEntryID nonExistentId = new StreamEntryID("999-0");
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, id1, nonExistentId);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // Existing entry
assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(1)); // Non-existent entry
// Verify existing entry is deleted
assertEquals(0L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexWithConsumerGroups() {
setUpTestStream();
// Add test entries
StreamEntryID id1 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), HASH_1);
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), HASH_2);
// Read messages to add them to PEL
Map<String, StreamEntryID> streams = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(GROUP_NAME,
CONSUMER_NAME, XReadGroupParams.xReadGroupParams().count(2), streams);
assertEquals(1, messages.size());
assertEquals(2, messages.get(0).getValue().size());
// Acknowledge only the first message
StreamEntryID readId1 = messages.get(0).getValue().get(0).getID();
StreamEntryID readId2 = messages.get(0).getValue().get(1).getID();
jedis.xack(STREAM_KEY_1, GROUP_NAME, readId1);
// Test XDELEX with ACKNOWLEDGED mode - should only delete acknowledged entries
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1,
StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
assertThat(results, hasSize(2));
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
results.get(1)); // id2 not
// acknowledged
// Verify only acknowledged entry was deleted
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexEmptyStream() {
setUpTestStream();
// Test XDELEX on empty stream
StreamEntryID nonExistentId = new StreamEntryID("1-0");
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, nonExistentId);
assertThat(results, hasSize(1));
assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0));
}
@Test
@SinceRedisVersion("8.1.240")
public void xdelexNotAcknowledged() {
setUpTestStream();
String groupName = "test_group";
// Add initial entries and create consumer group
Map<String, String> entry1 = singletonMap("field1", "value1");
jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), entry1);
jedis.xgroupCreate(STREAM_KEY_1, groupName, new StreamEntryID("0-0"), true);
// Read one message to create PEL entry
String consumerName = "consumer1";
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(1),
streamQuery);
// Add a new entry that was never delivered to any consumer
Map<String, String> entry2 = singletonMap("field4", "value4");
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), entry2);
// Verify initial state
StreamPendingSummary pending = jedis.xpending(STREAM_KEY_1, groupName);
assertEquals(1L, pending.getTotal()); // Only id1 is in PEL
StreamInfo info = jedis.xinfoStream(STREAM_KEY_1);
assertEquals(2L, info.getLength()); // Stream has 2 entries
// Test XDELEX with ACKNOWLEDGED policy on entry that was never delivered
// This should return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED since id2 was never
// delivered to any consumer
List<StreamEntryDeletionResult> result = jedis.xdelex(STREAM_KEY_1,
StreamDeletionPolicy.ACKNOWLEDGED, id2);
assertThat(result, hasSize(1));
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
result.get(0));
}
}